diff --git a/plugins/memory/honcho/__init__.py b/plugins/memory/honcho/__init__.py index 7b82a739ce..da9e2075a2 100644 --- a/plugins/memory/honcho/__init__.py +++ b/plugins/memory/honcho/__init__.py @@ -24,6 +24,12 @@ from typing import Any, Dict, List, Optional from agent.memory_manager import sanitize_context from agent.memory_provider import MemoryProvider +from plugins.memory.honcho.sync_worker import ( + CircuitBreaker, + HonchoLatencyTracker, + SyncTask, + SyncWorker, +) from tools.registry import tool_error logger = logging.getLogger(__name__) @@ -195,7 +201,22 @@ class HonchoMemoryProvider(MemoryProvider): self._prefetch_result = "" self._prefetch_lock = threading.Lock() self._prefetch_thread: Optional[threading.Thread] = None - self._sync_thread: Optional[threading.Thread] = None + + # Post-response write path (sync_turn / on_memory_write). See + # plugins/memory/honcho/sync_worker.py. The tracker + breaker are + # shared with the Honcho SDK client so adaptive timeouts and + # degraded-mode behaviour are consistent across the plugin. + self._latency_tracker = HonchoLatencyTracker() + self._breaker = CircuitBreaker() + self._sync_worker = SyncWorker( + latency_tracker=self._latency_tracker, + breaker=self._breaker, + thread_name="honcho-sync-worker", + ) + # Durable backlog of tasks that couldn't reach Honcho (breaker open + # or queue overflow). Drained on recovery — see _drain_backlog(). + self._backlog: List[SyncTask] = [] + self._backlog_lock = threading.Lock() # B1: recall_mode — set during initialize from config self._recall_mode = "hybrid" # "context", "tools", or "hybrid" @@ -1057,8 +1078,82 @@ class HonchoMemoryProvider(MemoryProvider): return chunks + # -- backlog management (Layer 3) ---------------------------------------- + + _BACKLOG_MAX = 256 + + def _enqueue_with_backlog(self, task: SyncTask) -> None: + """Submit a task to the worker with backlog fall-through on defer. + + Wraps the caller's task with a failure hook that captures the + task itself (not just the error) so it can be appended to the + durable backlog when the breaker is open or the queue is full. + """ + original_on_failure = task.on_failure + + def _on_failure(error: BaseException) -> None: + reason = str(error) + # Only backlog tasks that were deferred, not ones that crashed + # inside Honcho itself — those are unlikely to succeed on replay. + if any(marker in reason for marker in ( + "circuit breaker open", + "sync queue full", + "sync queue overflow", + "shutting down", + )): + with self._backlog_lock: + if len(self._backlog) < self._BACKLOG_MAX: + self._backlog.append(task) + else: + logger.debug("Honcho backlog full; dropping %s", task.name) + else: + logger.debug( + "Honcho sync task %s failed (not backlogged): %s", + task.name, error, + ) + if original_on_failure is not None: + try: + original_on_failure(error) + except Exception: + pass + + task.on_failure = _on_failure + self._sync_worker.enqueue(task) + + def _drain_backlog_if_healthy(self) -> None: + """Opportunistic replay of backlogged tasks when the breaker closes. + + Called from the happy path of ``sync_turn``; never blocks the + user. Walks the backlog, re-enqueueing everything on the worker + — if the breaker is still open, tasks will bounce back into + the backlog via their on_failure handler. + + Also nudges the Honcho client's HTTP timeout toward the tracker's + observed p95 so stalled backends fail fast on subsequent calls + instead of burning 30s per request. + """ + if self._breaker.state != self._breaker.STATE_CLOSED: + return + # Layer 2: adaptive timeout rebuild (cheap no-op if below threshold). + try: + from plugins.memory.honcho.client import rebuild_honcho_client_with_timeout + rebuild_honcho_client_with_timeout(self._latency_tracker.timeout()) + except Exception as e: + logger.debug("Honcho timeout rebuild skipped: %s", e) + with self._backlog_lock: + if not self._backlog: + return + pending = self._backlog + self._backlog = [] + for task in pending: + self._sync_worker.enqueue(task) + def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: - """Record the conversation turn in Honcho (non-blocking). + """Record the conversation turn in Honcho (fire-and-forget). + + Enqueues the sync on the persistent worker thread and returns + immediately. Callers never wait on Honcho — the run_conversation + return path is fully decoupled from post-response writes. Messages exceeding the Honcho API limit (default 25k chars) are split into multiple messages with continuation markers. @@ -1071,27 +1166,32 @@ class HonchoMemoryProvider(MemoryProvider): msg_limit = self._config.message_max_chars if self._config else 25000 clean_user_content = sanitize_context(user_content or "").strip() clean_assistant_content = sanitize_context(assistant_content or "").strip() + session_key = self._session_key def _sync(): - try: - session = self._manager.get_or_create(self._session_key) - for chunk in self._chunk_message(clean_user_content, msg_limit): - session.add_message("user", chunk) - for chunk in self._chunk_message(clean_assistant_content, msg_limit): - session.add_message("assistant", chunk) - self._manager._flush_session(session) - except Exception as e: - logger.debug("Honcho sync_turn failed: %s", e) + session = self._manager.get_or_create(session_key) + for chunk in self._chunk_message(clean_user_content, msg_limit): + session.add_message("user", chunk) + for chunk in self._chunk_message(clean_assistant_content, msg_limit): + session.add_message("assistant", chunk) + self._manager._flush_session(session) - if self._sync_thread and self._sync_thread.is_alive(): - self._sync_thread.join(timeout=5.0) - self._sync_thread = threading.Thread( - target=_sync, daemon=True, name="honcho-sync" + task = SyncTask( + fn=_sync, + name="sync_turn", ) - self._sync_thread.start() + self._enqueue_with_backlog(task) + # If the breaker transitioned back to closed between turns, try to + # drain anything that piled up while Honcho was unreachable. + self._drain_backlog_if_healthy() def on_memory_write(self, action: str, target: str, content: str) -> None: - """Mirror built-in user profile writes as Honcho conclusions.""" + """Mirror built-in user profile writes as Honcho conclusions. + + Enqueued on the shared sync worker so every post-response write + path (turn sync + conclusion mirror) observes the same breaker + and backlog. + """ if action != "add" or target != "user" or not content: return if self._cron_skipped: @@ -1099,14 +1199,17 @@ class HonchoMemoryProvider(MemoryProvider): if not self._manager or not self._session_key: return - def _write(): - try: - self._manager.create_conclusion(self._session_key, content) - except Exception as e: - logger.debug("Honcho memory mirror failed: %s", e) + session_key = self._session_key + payload = content - t = threading.Thread(target=_write, daemon=True, name="honcho-memwrite") - t.start() + def _write(): + self._manager.create_conclusion(session_key, payload) + + task = SyncTask( + fn=_write, + name="memory_mirror", + ) + self._enqueue_with_backlog(task) def on_session_end(self, messages: List[Dict[str, Any]]) -> None: """Flush all pending messages to Honcho on session end.""" @@ -1114,9 +1217,10 @@ class HonchoMemoryProvider(MemoryProvider): return if not self._manager: return - # Wait for pending sync - if self._sync_thread and self._sync_thread.is_alive(): - self._sync_thread.join(timeout=10.0) + # Wait briefly for any in-flight sync tasks to drain. We can't + # block session-end indefinitely, but giving the worker 10s to + # finish a pending turn-sync matches the previous behaviour. + self._sync_worker.shutdown(timeout=10.0) try: self._manager.flush_all() except Exception as e: @@ -1236,9 +1340,10 @@ class HonchoMemoryProvider(MemoryProvider): return tool_error(f"Honcho {tool_name} failed: {e}") def shutdown(self) -> None: - for t in (self._prefetch_thread, self._sync_thread): - if t and t.is_alive(): - t.join(timeout=5.0) + # Drain the prefetch thread (legacy, unchanged) + the sync worker. + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=5.0) + self._sync_worker.shutdown(timeout=5.0) # Flush any remaining messages if self._manager: try: diff --git a/plugins/memory/honcho/client.py b/plugins/memory/honcho/client.py index 63e45b4628..beb720326b 100644 --- a/plugins/memory/honcho/client.py +++ b/plugins/memory/honcho/client.py @@ -652,6 +652,8 @@ class HonchoClientConfig: _honcho_client: Honcho | None = None +_honcho_client_kwargs: dict | None = None +_honcho_client_kwargs_active_timeout: float | None = None def get_honcho_client(config: HonchoClientConfig | None = None) -> Honcho: @@ -660,7 +662,7 @@ def get_honcho_client(config: HonchoClientConfig | None = None) -> Honcho: When no config is provided, attempts to load ~/.honcho/config.json first, falling back to environment variables. """ - global _honcho_client + global _honcho_client, _honcho_client_kwargs, _honcho_client_kwargs_active_timeout if _honcho_client is not None: return _honcho_client @@ -745,11 +747,56 @@ def get_honcho_client(config: HonchoClientConfig | None = None) -> Honcho: kwargs["timeout"] = resolved_timeout _honcho_client = Honcho(**kwargs) + _honcho_client_kwargs = dict(kwargs) + _honcho_client_kwargs_active_timeout = resolved_timeout return _honcho_client +def rebuild_honcho_client_with_timeout(new_timeout: float) -> None: + """Rebuild the singleton Honcho client with a new HTTP timeout. + + Called by the provider's sync worker once enough latency samples + have accumulated for the :class:`HonchoLatencyTracker` to recommend + a timeout meaningfully different from the one the current client + was built with. Safe to call concurrently; no-op if the delta is + below a 20% threshold so small jitter doesn't thrash the client. + + The previous client is discarded — the Honcho SDK uses a pooled + httpx.Client internally but doesn't expose it for in-place timeout + mutation, so rebuild is the only portable option. + """ + global _honcho_client, _honcho_client_kwargs_active_timeout + + if _honcho_client is None or _honcho_client_kwargs is None: + return + + active = _honcho_client_kwargs_active_timeout or _DEFAULT_HTTP_TIMEOUT + if active <= 0: + return + ratio = new_timeout / active + if 0.8 <= ratio <= 1.2: + # Not a meaningful change; skip rebuild. + return + + try: + from honcho import Honcho + except ImportError: + return + + new_kwargs = dict(_honcho_client_kwargs) + new_kwargs["timeout"] = new_timeout + logger.info( + "Adapting Honcho HTTP timeout: %.1fs -> %.1fs (tracker p95)", + active, new_timeout, + ) + _honcho_client = Honcho(**new_kwargs) + _honcho_client_kwargs_active_timeout = new_timeout + + def reset_honcho_client() -> None: """Reset the Honcho client singleton (useful for testing).""" - global _honcho_client + global _honcho_client, _honcho_client_kwargs, _honcho_client_kwargs_active_timeout _honcho_client = None + _honcho_client_kwargs = None + _honcho_client_kwargs_active_timeout = None diff --git a/tests/honcho_plugin/test_provider_sync_integration.py b/tests/honcho_plugin/test_provider_sync_integration.py new file mode 100644 index 0000000000..3bbdb50a70 --- /dev/null +++ b/tests/honcho_plugin/test_provider_sync_integration.py @@ -0,0 +1,146 @@ +"""Integration tests for the sync worker's integration into HonchoMemoryProvider. + +Layer 1 (fire-and-forget): sync_turn must return in < 20ms even when the +Honcho backend would block for seconds. + +Layer 3 (breaker + backlog): when the breaker trips open, sync_turn tasks +land in the provider's in-memory backlog instead of running. When the +breaker closes (via probe recovery), the backlog drains on the next +sync_turn call. +""" + +from __future__ import annotations + +import threading +import time +from unittest.mock import MagicMock + +from plugins.memory.honcho import HonchoMemoryProvider +from plugins.memory.honcho.sync_worker import SyncTask + + +def _wait_until(predicate, timeout: float = 2.0, interval: float = 0.01) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(interval) + return False + + +def _make_provider() -> HonchoMemoryProvider: + provider = HonchoMemoryProvider() + provider._manager = MagicMock() + session = MagicMock() + provider._manager.get_or_create.return_value = session + provider._session_key = "agent:main:test" + provider._cron_skipped = False + provider._config = MagicMock(message_max_chars=25000) + return provider + + +class TestLayer1FireAndForget: + def test_sync_turn_returns_immediately_with_slow_backend(self): + """sync_turn must not block even if the backend flush takes seconds.""" + provider = _make_provider() + + # Make the flush block for up to 2s. + flush_started = threading.Event() + release_flush = threading.Event() + + def slow_flush(_session): + flush_started.set() + release_flush.wait(timeout=3.0) + + provider._manager._flush_session.side_effect = slow_flush + + try: + t0 = time.monotonic() + provider.sync_turn("hello", "world") + elapsed = time.monotonic() - t0 + assert elapsed < 0.1, f"sync_turn blocked for {elapsed:.3f}s" + # Confirm the worker did pick it up + assert flush_started.wait(timeout=1.0) + finally: + release_flush.set() + provider.shutdown() + + def test_multiple_sync_turns_do_not_serialize_caller(self): + """Back-to-back sync_turns must not block on prior turn's completion.""" + provider = _make_provider() + + gate = threading.Event() + provider._manager._flush_session.side_effect = lambda _s: gate.wait(timeout=3.0) + + try: + t0 = time.monotonic() + for _ in range(5): + provider.sync_turn("u", "a") + elapsed = time.monotonic() - t0 + # Without fire-and-forget, the old code would serialize on + # the previous turn's join(timeout=5.0). 5 turns × 5s = 25s + # worst case. We assert << 1s. + assert elapsed < 0.2, f"5 sync_turns took {elapsed:.3f}s" + finally: + gate.set() + provider.shutdown() + + +class TestLayer3BacklogAndBreaker: + def test_breaker_open_backlogs_task(self): + """While the breaker is open, sync_turn tasks must land in the backlog.""" + provider = _make_provider() + + # Trip the breaker manually. + provider._breaker._state = provider._breaker.STATE_OPEN + provider._breaker._opened_at = float("inf") # never recover + + try: + provider.sync_turn("hello", "world") + # The task should have landed in the backlog rather than run. + assert len(provider._backlog) == 1 + assert provider._backlog[0].name == "sync_turn" + finally: + provider.shutdown() + + def test_backlog_drains_when_breaker_closes(self): + """Once the breaker closes, next sync_turn drains the backlog.""" + provider = _make_provider() + + # Trip the breaker and enqueue a backlog. + provider._breaker._state = provider._breaker.STATE_OPEN + provider._breaker._opened_at = float("inf") + for _ in range(3): + provider.sync_turn("u", "a") + assert len(provider._backlog) == 3 + + # Close the breaker (simulating recovery) and trigger another sync. + provider._breaker.reset() + + try: + provider.sync_turn("u", "a") + # One new task + 3 drained = 4 flushes eventually. + assert _wait_until( + lambda: provider._manager._flush_session.call_count >= 4, + timeout=2.0, + ), ( + "expected >= 4 flushes after recovery, got " + f"{provider._manager._flush_session.call_count}" + ) + assert provider._backlog == [] + finally: + provider.shutdown() + + def test_backlog_honors_max_size(self): + """Backlog must not grow unbounded during a long outage.""" + provider = _make_provider() + provider._BACKLOG_MAX = 5 + provider._breaker._state = provider._breaker.STATE_OPEN + provider._breaker._opened_at = float("inf") + + try: + for _ in range(20): + provider.sync_turn("u", "a") + assert len(provider._backlog) == 5 + finally: + provider.shutdown() diff --git a/tests/honcho_plugin/test_session.py b/tests/honcho_plugin/test_session.py index 64fcfc7ebf..ce4026f876 100644 --- a/tests/honcho_plugin/test_session.py +++ b/tests/honcho_plugin/test_session.py @@ -553,7 +553,7 @@ class TestConcludeToolDispatch: "Visible answer" ), ) - provider._sync_thread.join(timeout=1.0) + provider._sync_worker.shutdown(timeout=1.0) assert session.add_message.call_args_list[0].args == ("user", "hello") assert session.add_message.call_args_list[1].args == ("assistant", "Visible answer")