diff --git a/plugins/memory/honcho/sync_worker.py b/plugins/memory/honcho/sync_worker.py new file mode 100644 index 0000000000..69a3f7ed4c --- /dev/null +++ b/plugins/memory/honcho/sync_worker.py @@ -0,0 +1,394 @@ +"""Background sync worker for the Honcho memory provider. + +The post-response sync path was previously a per-turn thread spawn with a +best-effort ``join(timeout=5.0)`` on the prior turn's thread. That forced +``run_conversation`` to wait up to 5 seconds at the start of every turn if +the previous turn's sync was still in flight, and it serialized all sync +work on a single transient thread that the caller had to coordinate with. + +This module replaces that pattern with: + + - :class:`SyncWorker` — persistent daemon thread draining a bounded + queue of sync/write tasks. Tasks are submitted with ``enqueue()`` and + return immediately; the caller is never blocked by Honcho latency. + + - :class:`HonchoLatencyTracker` — rolling p95 observer that gives the + client an adaptive timeout with sensible cold-start defaults (Layer 2 + of the timeout-ceiling rework). + + - :class:`CircuitBreaker` — consecutive-failure tripwire that flips to + a degraded state after repeated timeouts and probes for recovery in + the background (Layer 3). While open, sync tasks are persisted to a + local backlog so the outage's worth of writes can be drained once + Honcho is reachable again. + +The three primitives compose: ``SyncWorker`` consults the breaker before +each task, records the outcome in the latency tracker, and feeds timeout ++ failure observations back to the breaker. Nothing here depends on +``HonchoMemoryProvider`` — the worker takes plain callables so tests can +exercise each primitive in isolation. +""" + +from __future__ import annotations + +import collections +import logging +import queue +import threading +import time +from dataclasses import dataclass +from typing import Callable, Deque, Optional + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Latency tracker — Layer 2 +# --------------------------------------------------------------------------- + + +class HonchoLatencyTracker: + """Rolling p95 observer for Honcho call latencies. + + Provides an adaptive HTTP timeout that scales with observed backend + latency. Hosted Honcho settles to ~1-3s; self-hosted instances with + slow cold starts naturally scale up. Thread-safe: the worker thread + records observations, any thread may read the current timeout. + """ + + def __init__( + self, + *, + window: int = 20, + default: float = 30.0, + floor: float = 5.0, + headroom: float = 3.0, + warmup_samples: int = 5, + ) -> None: + self._samples: Deque[float] = collections.deque(maxlen=window) + self._default = float(default) + self._floor = float(floor) + self._headroom = float(headroom) + self._warmup = int(warmup_samples) + self._lock = threading.Lock() + + def observe(self, seconds: float) -> None: + """Record a successful call's wall-clock latency (seconds).""" + if seconds < 0 or seconds != seconds: # NaN check + return + with self._lock: + self._samples.append(float(seconds)) + + def timeout(self) -> float: + """Return the adaptive timeout for the next call. + + During warmup (< warmup_samples observations) returns the default. + Once warm, returns ``max(floor, headroom × p95(samples))``. + """ + with self._lock: + n = len(self._samples) + if n < self._warmup: + return self._default + sorted_samples = sorted(self._samples) + # Nearest-rank p95: index = ceil(0.95 * n) - 1, clamped. + idx = min(n - 1, max(0, int(round(0.95 * (n - 1))))) + p95 = sorted_samples[idx] + return max(self._floor, self._headroom * p95) + + def reset(self) -> None: + with self._lock: + self._samples.clear() + + +# --------------------------------------------------------------------------- +# Circuit breaker — Layer 3 +# --------------------------------------------------------------------------- + + +class CircuitBreaker: + """Consecutive-failure tripwire with periodic probe recovery. + + States: + - ``closed`` — traffic flows normally (the happy path) + - ``open`` — recent consecutive failures > threshold; skip calls + - ``half_open`` — probe window; one test call is allowed through + + Transitions: + - closed → open after ``failure_threshold`` consecutive failures + - open → half_open after ``probe_interval`` seconds + - half_open → closed on a successful probe + - half_open → open on a failed probe + + Thread-safe. The worker consults ``allow()`` before each task and + reports the outcome via ``record_success()`` / ``record_failure()``. + """ + + STATE_CLOSED = "closed" + STATE_OPEN = "open" + STATE_HALF_OPEN = "half_open" + + def __init__( + self, + *, + failure_threshold: int = 3, + probe_interval: float = 60.0, + time_fn: Callable[[], float] = time.monotonic, + ) -> None: + self._failure_threshold = int(failure_threshold) + self._probe_interval = float(probe_interval) + self._time_fn = time_fn + self._lock = threading.Lock() + self._state = self.STATE_CLOSED + self._consecutive_failures = 0 + self._opened_at: Optional[float] = None + + @property + def state(self) -> str: + with self._lock: + self._maybe_transition_to_probe() + return self._state + + def allow(self) -> bool: + """Return True iff a call should proceed now.""" + with self._lock: + self._maybe_transition_to_probe() + return self._state != self.STATE_OPEN + + def record_success(self) -> None: + with self._lock: + self._consecutive_failures = 0 + if self._state != self.STATE_CLOSED: + logger.info("Honcho circuit breaker: recovered → closed") + self._state = self.STATE_CLOSED + self._opened_at = None + + def record_failure(self) -> None: + with self._lock: + self._consecutive_failures += 1 + if self._state == self.STATE_HALF_OPEN: + self._state = self.STATE_OPEN + self._opened_at = self._time_fn() + logger.warning("Honcho circuit breaker: probe failed → open") + return + if ( + self._state == self.STATE_CLOSED + and self._consecutive_failures >= self._failure_threshold + ): + self._state = self.STATE_OPEN + self._opened_at = self._time_fn() + logger.warning( + "Honcho circuit breaker: %d consecutive failures → open", + self._consecutive_failures, + ) + + def reset(self) -> None: + with self._lock: + self._state = self.STATE_CLOSED + self._consecutive_failures = 0 + self._opened_at = None + + def _maybe_transition_to_probe(self) -> None: + # Caller must hold the lock. + if self._state == self.STATE_OPEN and self._opened_at is not None: + if self._time_fn() - self._opened_at >= self._probe_interval: + self._state = self.STATE_HALF_OPEN + logger.info( + "Honcho circuit breaker: probe window → half_open" + ) + + +# --------------------------------------------------------------------------- +# Sync worker — Layer 1 +# --------------------------------------------------------------------------- + + +@dataclass +class SyncTask: + """A unit of work for the sync worker. + + ``fn`` runs on the worker thread. ``name`` is a human-readable label + used in logs and for backlog replay. ``on_failure`` is optional: if + set, it's called with the exception on breaker-open deferral or when + all retries are exhausted so callers can persist the task to a + durable backlog. + """ + + fn: Callable[[], None] + name: str = "sync" + on_failure: Optional[Callable[[BaseException], None]] = None + + +class SyncWorker: + """Persistent daemon thread draining a bounded task queue. + + This replaces the per-turn ``threading.Thread(target=_sync).start()`` + pattern so ``sync_turn`` returns immediately instead of coordinating + thread handoff on every turn. Runs ``SyncTask`` callables serially + on a dedicated thread — serialization is intentional because Honcho + session writes must be ordered per-session to avoid re-ordering + messages, and the worker handles one session per provider. + + Queue overflow (producer faster than Honcho can drain) drops the + OLDEST task rather than blocking the producer. This favors user- + facing responsiveness over write fidelity in the pathological case, + and the dropped task still has its ``on_failure`` callback invoked + so it can be appended to a durable backlog. + + The worker is lazy: the thread starts on first ``enqueue()`` call + and runs until ``shutdown()``. ``shutdown()`` is idempotent. + """ + + def __init__( + self, + *, + max_queue: int = 64, + latency_tracker: Optional[HonchoLatencyTracker] = None, + breaker: Optional[CircuitBreaker] = None, + thread_name: str = "honcho-sync-worker", + ) -> None: + self._queue: queue.Queue[Optional[SyncTask]] = queue.Queue(maxsize=max_queue) + self._thread: Optional[threading.Thread] = None + self._thread_name = thread_name + self._shutdown = False + self._lock = threading.Lock() + self._latency_tracker = latency_tracker + self._breaker = breaker + self._dropped = 0 + + # -- lifecycle ----------------------------------------------------------- + + def _ensure_started(self) -> None: + with self._lock: + if self._thread is not None and self._thread.is_alive(): + return + if self._shutdown: + return + self._thread = threading.Thread( + target=self._run, + name=self._thread_name, + daemon=True, + ) + self._thread.start() + + def shutdown(self, *, timeout: float = 5.0) -> None: + """Signal the worker to drain and exit; wait up to ``timeout`` seconds.""" + with self._lock: + if self._shutdown: + return + self._shutdown = True + thread = self._thread + try: + # Sentinel triggers clean exit from the loop. + self._queue.put_nowait(None) + except queue.Full: + pass + if thread is not None: + thread.join(timeout=timeout) + + # -- producer interface -------------------------------------------------- + + def enqueue(self, task: SyncTask) -> bool: + """Submit a task. Returns False if the task was dropped. + + Breaker-open tasks are dropped synchronously and ``on_failure`` is + called so the caller can persist them. Queue-full tasks evict + the oldest task (which also gets its ``on_failure`` called) to + keep the pipeline moving under load. + """ + if self._shutdown: + if task.on_failure is not None: + try: + task.on_failure(RuntimeError("sync worker is shutting down")) + except Exception: + pass + return False + + breaker = self._breaker + if breaker is not None and not breaker.allow(): + if task.on_failure is not None: + try: + task.on_failure(RuntimeError("circuit breaker open")) + except Exception: + pass + return False + + self._ensure_started() + + try: + self._queue.put_nowait(task) + return True + except queue.Full: + # Evict the oldest queued task to make room; its failure + # callback still fires so the backlog can capture it. + try: + victim = self._queue.get_nowait() + self._dropped += 1 + if victim is not None and victim.on_failure is not None: + try: + victim.on_failure( + RuntimeError("sync queue overflow — task dropped") + ) + except Exception: + pass + except queue.Empty: + pass + try: + self._queue.put_nowait(task) + return True + except queue.Full: + if task.on_failure is not None: + try: + task.on_failure(RuntimeError("sync queue full")) + except Exception: + pass + return False + + # -- worker loop --------------------------------------------------------- + + def _run(self) -> None: + while True: + try: + task = self._queue.get(timeout=1.0) + except queue.Empty: + if self._shutdown: + return + continue + + if task is None: # sentinel + return + + started = time.monotonic() + ok = False + error: Optional[BaseException] = None + try: + task.fn() + ok = True + except BaseException as e: # task bodies may raise anything + error = e + logger.debug("Honcho sync task %s failed: %s", task.name, e) + + elapsed = time.monotonic() - started + if ok: + if self._latency_tracker is not None: + self._latency_tracker.observe(elapsed) + if self._breaker is not None: + self._breaker.record_success() + else: + if self._breaker is not None: + self._breaker.record_failure() + if task.on_failure is not None and error is not None: + try: + task.on_failure(error) + except Exception: + pass + + # -- introspection (for hermes honcho status etc.) ----------------------- + + def qsize(self) -> int: + return self._queue.qsize() + + def dropped(self) -> int: + return self._dropped + + def is_running(self) -> bool: + return self._thread is not None and self._thread.is_alive() diff --git a/tests/honcho_plugin/test_sync_worker.py b/tests/honcho_plugin/test_sync_worker.py new file mode 100644 index 0000000000..0163ff1420 --- /dev/null +++ b/tests/honcho_plugin/test_sync_worker.py @@ -0,0 +1,342 @@ +"""Tests for plugins/memory/honcho/sync_worker.py — Layers 1/2/3.""" + +from __future__ import annotations + +import threading +import time +from typing import List + +import pytest + +from plugins.memory.honcho.sync_worker import ( + CircuitBreaker, + HonchoLatencyTracker, + SyncTask, + SyncWorker, +) + + +# --------------------------------------------------------------------------- +# HonchoLatencyTracker +# --------------------------------------------------------------------------- + + +class TestHonchoLatencyTracker: + def test_returns_default_during_warmup(self): + t = HonchoLatencyTracker(default=30.0, warmup_samples=5) + for _ in range(4): + t.observe(1.0) + assert t.timeout() == 30.0 + + def test_adapts_to_observed_p95(self): + t = HonchoLatencyTracker(default=30.0, floor=5.0, headroom=3.0, warmup_samples=5) + # 10 samples at 1s, 10 samples at 2s — p95 should land at the 2s end + for _ in range(10): + t.observe(1.0) + for _ in range(10): + t.observe(2.0) + t_out = t.timeout() + assert 5.0 <= t_out <= 7.0 # 3 × 2.0 with some rounding latitude + + def test_respects_floor(self): + t = HonchoLatencyTracker(default=30.0, floor=5.0, headroom=3.0, warmup_samples=3) + # Very fast samples — 3 × 0.1 = 0.3 < floor → floor applies + for _ in range(10): + t.observe(0.1) + assert t.timeout() == 5.0 + + def test_rejects_nan_and_negative(self): + t = HonchoLatencyTracker(warmup_samples=1) + t.observe(float("nan")) + t.observe(-1.0) + # No valid samples → still default + assert t.timeout() == t._default + + def test_rolling_window_discards_old(self): + t = HonchoLatencyTracker(window=5, default=30.0, floor=0.1, headroom=1.0, warmup_samples=1) + for _ in range(5): + t.observe(100.0) + assert t.timeout() >= 50.0 # dominated by 100s samples + for _ in range(5): + t.observe(0.5) + # Old samples rolled out, now dominated by 0.5s + assert t.timeout() <= 1.0 + + def test_thread_safe_concurrent_observations(self): + t = HonchoLatencyTracker(window=1000, warmup_samples=1) + + def observer(val: float): + for _ in range(200): + t.observe(val) + + threads = [ + threading.Thread(target=observer, args=(i * 0.1,)) for i in range(5) + ] + for th in threads: + th.start() + for th in threads: + th.join() + # No crash + timeout() returns a real number + assert t.timeout() > 0 + + +# --------------------------------------------------------------------------- +# CircuitBreaker +# --------------------------------------------------------------------------- + + +class _Clock: + """Test double for time.monotonic — manually advanced.""" + + def __init__(self, start: float = 0.0) -> None: + self.now = start + + def __call__(self) -> float: + return self.now + + +class TestCircuitBreaker: + def test_starts_closed(self): + cb = CircuitBreaker() + assert cb.state == cb.STATE_CLOSED + assert cb.allow() is True + + def test_opens_after_threshold_failures(self): + cb = CircuitBreaker(failure_threshold=3) + for _ in range(2): + cb.record_failure() + assert cb.state == cb.STATE_CLOSED + cb.record_failure() + assert cb.state == cb.STATE_OPEN + assert cb.allow() is False + + def test_resets_counter_on_success(self): + cb = CircuitBreaker(failure_threshold=3) + cb.record_failure() + cb.record_failure() + cb.record_success() + cb.record_failure() + assert cb.state == cb.STATE_CLOSED + + def test_transitions_to_half_open_after_probe_interval(self): + clock = _Clock() + cb = CircuitBreaker(failure_threshold=2, probe_interval=60.0, time_fn=clock) + cb.record_failure() + cb.record_failure() + assert cb.state == cb.STATE_OPEN + clock.now = 30.0 + assert cb.state == cb.STATE_OPEN # still within probe window + clock.now = 61.0 + assert cb.state == cb.STATE_HALF_OPEN + assert cb.allow() is True # probe permitted + + def test_half_open_success_closes_breaker(self): + clock = _Clock() + cb = CircuitBreaker(failure_threshold=2, probe_interval=60.0, time_fn=clock) + cb.record_failure() + cb.record_failure() + clock.now = 61.0 + _ = cb.state # transition + cb.record_success() + assert cb.state == cb.STATE_CLOSED + + def test_half_open_failure_reopens_breaker(self): + clock = _Clock() + cb = CircuitBreaker(failure_threshold=2, probe_interval=60.0, time_fn=clock) + cb.record_failure() + cb.record_failure() + clock.now = 61.0 + _ = cb.state + cb.record_failure() + assert cb.state == cb.STATE_OPEN + + def test_reset_returns_to_closed(self): + cb = CircuitBreaker(failure_threshold=1) + cb.record_failure() + assert cb.state == cb.STATE_OPEN + cb.reset() + assert cb.state == cb.STATE_CLOSED + assert cb.allow() is True + + +# --------------------------------------------------------------------------- +# SyncWorker +# --------------------------------------------------------------------------- + + +def _wait_until(predicate, timeout: float = 2.0, interval: float = 0.01) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(interval) + return False + + +class TestSyncWorkerBasics: + def test_enqueue_runs_task_on_worker_thread(self): + worker = SyncWorker() + try: + results: List[str] = [] + worker.enqueue(SyncTask(fn=lambda: results.append("ran"), name="t")) + assert _wait_until(lambda: results == ["ran"]) + finally: + worker.shutdown(timeout=2.0) + + def test_enqueue_returns_immediately(self): + worker = SyncWorker() + try: + slow_event = threading.Event() + + def slow_task(): + slow_event.wait(timeout=2.0) + + t0 = time.monotonic() + worker.enqueue(SyncTask(fn=slow_task, name="slow")) + elapsed = time.monotonic() - t0 + assert elapsed < 0.1, f"enqueue blocked for {elapsed}s" + slow_event.set() + finally: + worker.shutdown(timeout=2.0) + + def test_tasks_execute_in_fifo_order(self): + worker = SyncWorker() + try: + results: List[int] = [] + for i in range(10): + worker.enqueue(SyncTask(fn=lambda i=i: results.append(i), name=f"t{i}")) + assert _wait_until(lambda: len(results) == 10) + assert results == list(range(10)) + finally: + worker.shutdown(timeout=2.0) + + def test_task_exception_does_not_kill_worker(self): + worker = SyncWorker() + try: + survived: List[str] = [] + worker.enqueue(SyncTask(fn=lambda: (_ for _ in ()).throw(RuntimeError("boom")), name="boom")) + worker.enqueue(SyncTask(fn=lambda: survived.append("ok"), name="ok")) + assert _wait_until(lambda: survived == ["ok"]) + finally: + worker.shutdown(timeout=2.0) + + def test_on_failure_callback_invoked_when_task_raises(self): + worker = SyncWorker() + try: + failures: List[BaseException] = [] + worker.enqueue( + SyncTask( + fn=lambda: (_ for _ in ()).throw(ValueError("nope")), + name="fail", + on_failure=failures.append, + ) + ) + assert _wait_until(lambda: len(failures) == 1) + assert isinstance(failures[0], ValueError) + finally: + worker.shutdown(timeout=2.0) + + +class TestSyncWorkerBackpressure: + def test_queue_overflow_drops_oldest_task(self): + worker = SyncWorker(max_queue=3) + try: + block = threading.Event() + ran: List[int] = [] + dropped: List[int] = [] + + # Fill the queue with a blocker + 3 more waiting tasks. + worker.enqueue(SyncTask(fn=lambda: block.wait(timeout=3.0), name="blocker")) + for i in range(3): + worker.enqueue( + SyncTask( + fn=lambda i=i: ran.append(i), + name=f"t{i}", + on_failure=lambda e, i=i: dropped.append(i), + ) + ) + # Now try to enqueue a 4th task — should evict the oldest queued + # (task 0) to make room. + worker.enqueue(SyncTask(fn=lambda: ran.append(99), name="overflow")) + + # Queue overflow dropped exactly one task (task 0). + block.set() + assert _wait_until(lambda: 99 in ran) + assert 0 in dropped or ran == [1, 2, 99] or ran == [1, 2, 3, 99] + finally: + worker.shutdown(timeout=3.0) + + +class TestSyncWorkerIntegration: + def test_breaker_open_skips_task(self): + breaker = CircuitBreaker(failure_threshold=1) + breaker.record_failure() + assert breaker.state == breaker.STATE_OPEN + + worker = SyncWorker(breaker=breaker) + try: + failures: List[BaseException] = [] + ran: List[str] = [] + worker.enqueue( + SyncTask( + fn=lambda: ran.append("should_not_run"), + name="blocked", + on_failure=failures.append, + ) + ) + # Give the worker a moment — but the task should never run. + time.sleep(0.2) + assert ran == [] + assert len(failures) == 1 + assert "circuit breaker open" in str(failures[0]) + finally: + worker.shutdown(timeout=2.0) + + def test_successful_task_feeds_latency_tracker_and_resets_breaker(self): + tracker = HonchoLatencyTracker(warmup_samples=1) + breaker = CircuitBreaker(failure_threshold=2) + breaker.record_failure() + assert breaker._consecutive_failures == 1 + + worker = SyncWorker(latency_tracker=tracker, breaker=breaker) + try: + worker.enqueue(SyncTask(fn=lambda: time.sleep(0.05), name="t")) + assert _wait_until(lambda: len(tracker._samples) >= 1) + assert breaker.state == breaker.STATE_CLOSED + assert breaker._consecutive_failures == 0 + finally: + worker.shutdown(timeout=2.0) + + def test_failed_task_increments_breaker(self): + breaker = CircuitBreaker(failure_threshold=2) + worker = SyncWorker(breaker=breaker) + try: + for _ in range(2): + worker.enqueue( + SyncTask( + fn=lambda: (_ for _ in ()).throw(RuntimeError("x")), + name="fail", + ) + ) + assert _wait_until(lambda: breaker.state == breaker.STATE_OPEN) + finally: + worker.shutdown(timeout=2.0) + + +class TestSyncWorkerShutdown: + def test_shutdown_is_idempotent(self): + worker = SyncWorker() + worker.enqueue(SyncTask(fn=lambda: None, name="t")) + worker.shutdown(timeout=2.0) + worker.shutdown(timeout=2.0) # Must not raise + assert not worker.is_running() + + def test_enqueue_after_shutdown_calls_on_failure(self): + worker = SyncWorker() + worker.shutdown(timeout=2.0) + failures: List[BaseException] = [] + ok = worker.enqueue( + SyncTask(fn=lambda: None, name="late", on_failure=failures.append) + ) + assert ok is False + assert len(failures) == 1