feat(honcho): wire fire-and-forget worker + adaptive timeout + breaker into provider

Replaces the per-turn threading.Thread(target=_sync).start() pattern in
HonchoMemoryProvider with a persistent SyncWorker.  sync_turn() and
on_memory_write() both enqueue SyncTasks on the shared worker and return
immediately — run_conversation's post-response path is no longer coupled
to Honcho latency.

Three behavioural changes land here:

  Layer 1 — fire-and-forget sync
    No more join(timeout=5.0) on prior turn's thread.  Back-to-back
    sync_turn() calls return in microseconds regardless of backend
    latency.  Worker runs tasks serially per-provider (intentional:
    session writes must be ordered), uses a bounded queue with
    oldest-drop backpressure.

  Layer 2 — adaptive timeout
    SyncWorker feeds successful call latencies into HonchoLatencyTracker.
    After each turn, _drain_backlog_if_healthy() invokes
    rebuild_honcho_client_with_timeout() which rebuilds the SDK client
    iff the tracker's p95-derived timeout differs >20% from the active
    one.  Hosted Honcho converges on ~1-3s timeouts; self-hosted cold
    starts scale naturally.  30s default still applies during warmup.

  Layer 3 — circuit breaker + in-memory backlog
    CircuitBreaker trips open after 3 consecutive failures; SyncWorker
    refuses breaker-open tasks via their on_failure callback.  Provider
    wraps each task's on_failure with _enqueue_with_backlog() so
    breaker-open and queue-full tasks land in a bounded backlog (256
    tasks max).  On recovery (probe succeeds, state → closed), the next
    sync_turn() drains the backlog through the worker.  Tasks that
    crashed inside Honcho itself are NOT backlogged — replay won't help.

Updates one existing test (test_session.py) that poked at the now-
removed _sync_thread attribute; replaced with the worker's shutdown().

5 new integration tests verify the provider-level wiring:
  - sync_turn returns in < 100ms even when flush blocks 2s
  - 5 back-to-back sync_turns in < 200ms total (old code: up to 25s)
  - breaker-open enqueue lands in backlog, not on the worker
  - recovery drains backlog + new task on next sync_turn
  - backlog respects _BACKLOG_MAX and stops growing during long outages

No change to run_conversation or any agent-facing API.
This commit is contained in:
Erosika 2026-04-24 18:55:40 -04:00
parent 7a26fb3436
commit f512fdf697
4 changed files with 331 additions and 33 deletions

View file

@ -24,6 +24,12 @@ from typing import Any, Dict, List, Optional
from agent.memory_manager import sanitize_context
from agent.memory_provider import MemoryProvider
from plugins.memory.honcho.sync_worker import (
CircuitBreaker,
HonchoLatencyTracker,
SyncTask,
SyncWorker,
)
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@ -195,7 +201,22 @@ class HonchoMemoryProvider(MemoryProvider):
self._prefetch_result = ""
self._prefetch_lock = threading.Lock()
self._prefetch_thread: Optional[threading.Thread] = None
self._sync_thread: Optional[threading.Thread] = None
# Post-response write path (sync_turn / on_memory_write). See
# plugins/memory/honcho/sync_worker.py. The tracker + breaker are
# shared with the Honcho SDK client so adaptive timeouts and
# degraded-mode behaviour are consistent across the plugin.
self._latency_tracker = HonchoLatencyTracker()
self._breaker = CircuitBreaker()
self._sync_worker = SyncWorker(
latency_tracker=self._latency_tracker,
breaker=self._breaker,
thread_name="honcho-sync-worker",
)
# Durable backlog of tasks that couldn't reach Honcho (breaker open
# or queue overflow). Drained on recovery — see _drain_backlog().
self._backlog: List[SyncTask] = []
self._backlog_lock = threading.Lock()
# B1: recall_mode — set during initialize from config
self._recall_mode = "hybrid" # "context", "tools", or "hybrid"
@ -1057,8 +1078,82 @@ class HonchoMemoryProvider(MemoryProvider):
return chunks
# -- backlog management (Layer 3) ----------------------------------------
_BACKLOG_MAX = 256
def _enqueue_with_backlog(self, task: SyncTask) -> None:
"""Submit a task to the worker with backlog fall-through on defer.
Wraps the caller's task with a failure hook that captures the
task itself (not just the error) so it can be appended to the
durable backlog when the breaker is open or the queue is full.
"""
original_on_failure = task.on_failure
def _on_failure(error: BaseException) -> None:
reason = str(error)
# Only backlog tasks that were deferred, not ones that crashed
# inside Honcho itself — those are unlikely to succeed on replay.
if any(marker in reason for marker in (
"circuit breaker open",
"sync queue full",
"sync queue overflow",
"shutting down",
)):
with self._backlog_lock:
if len(self._backlog) < self._BACKLOG_MAX:
self._backlog.append(task)
else:
logger.debug("Honcho backlog full; dropping %s", task.name)
else:
logger.debug(
"Honcho sync task %s failed (not backlogged): %s",
task.name, error,
)
if original_on_failure is not None:
try:
original_on_failure(error)
except Exception:
pass
task.on_failure = _on_failure
self._sync_worker.enqueue(task)
def _drain_backlog_if_healthy(self) -> None:
"""Opportunistic replay of backlogged tasks when the breaker closes.
Called from the happy path of ``sync_turn``; never blocks the
user. Walks the backlog, re-enqueueing everything on the worker
if the breaker is still open, tasks will bounce back into
the backlog via their on_failure handler.
Also nudges the Honcho client's HTTP timeout toward the tracker's
observed p95 so stalled backends fail fast on subsequent calls
instead of burning 30s per request.
"""
if self._breaker.state != self._breaker.STATE_CLOSED:
return
# Layer 2: adaptive timeout rebuild (cheap no-op if below threshold).
try:
from plugins.memory.honcho.client import rebuild_honcho_client_with_timeout
rebuild_honcho_client_with_timeout(self._latency_tracker.timeout())
except Exception as e:
logger.debug("Honcho timeout rebuild skipped: %s", e)
with self._backlog_lock:
if not self._backlog:
return
pending = self._backlog
self._backlog = []
for task in pending:
self._sync_worker.enqueue(task)
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
"""Record the conversation turn in Honcho (non-blocking).
"""Record the conversation turn in Honcho (fire-and-forget).
Enqueues the sync on the persistent worker thread and returns
immediately. Callers never wait on Honcho the run_conversation
return path is fully decoupled from post-response writes.
Messages exceeding the Honcho API limit (default 25k chars) are
split into multiple messages with continuation markers.
@ -1071,27 +1166,32 @@ class HonchoMemoryProvider(MemoryProvider):
msg_limit = self._config.message_max_chars if self._config else 25000
clean_user_content = sanitize_context(user_content or "").strip()
clean_assistant_content = sanitize_context(assistant_content or "").strip()
session_key = self._session_key
def _sync():
try:
session = self._manager.get_or_create(self._session_key)
for chunk in self._chunk_message(clean_user_content, msg_limit):
session.add_message("user", chunk)
for chunk in self._chunk_message(clean_assistant_content, msg_limit):
session.add_message("assistant", chunk)
self._manager._flush_session(session)
except Exception as e:
logger.debug("Honcho sync_turn failed: %s", e)
session = self._manager.get_or_create(session_key)
for chunk in self._chunk_message(clean_user_content, msg_limit):
session.add_message("user", chunk)
for chunk in self._chunk_message(clean_assistant_content, msg_limit):
session.add_message("assistant", chunk)
self._manager._flush_session(session)
if self._sync_thread and self._sync_thread.is_alive():
self._sync_thread.join(timeout=5.0)
self._sync_thread = threading.Thread(
target=_sync, daemon=True, name="honcho-sync"
task = SyncTask(
fn=_sync,
name="sync_turn",
)
self._sync_thread.start()
self._enqueue_with_backlog(task)
# If the breaker transitioned back to closed between turns, try to
# drain anything that piled up while Honcho was unreachable.
self._drain_backlog_if_healthy()
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in user profile writes as Honcho conclusions."""
"""Mirror built-in user profile writes as Honcho conclusions.
Enqueued on the shared sync worker so every post-response write
path (turn sync + conclusion mirror) observes the same breaker
and backlog.
"""
if action != "add" or target != "user" or not content:
return
if self._cron_skipped:
@ -1099,14 +1199,17 @@ class HonchoMemoryProvider(MemoryProvider):
if not self._manager or not self._session_key:
return
def _write():
try:
self._manager.create_conclusion(self._session_key, content)
except Exception as e:
logger.debug("Honcho memory mirror failed: %s", e)
session_key = self._session_key
payload = content
t = threading.Thread(target=_write, daemon=True, name="honcho-memwrite")
t.start()
def _write():
self._manager.create_conclusion(session_key, payload)
task = SyncTask(
fn=_write,
name="memory_mirror",
)
self._enqueue_with_backlog(task)
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
"""Flush all pending messages to Honcho on session end."""
@ -1114,9 +1217,10 @@ class HonchoMemoryProvider(MemoryProvider):
return
if not self._manager:
return
# Wait for pending sync
if self._sync_thread and self._sync_thread.is_alive():
self._sync_thread.join(timeout=10.0)
# Wait briefly for any in-flight sync tasks to drain. We can't
# block session-end indefinitely, but giving the worker 10s to
# finish a pending turn-sync matches the previous behaviour.
self._sync_worker.shutdown(timeout=10.0)
try:
self._manager.flush_all()
except Exception as e:
@ -1236,9 +1340,10 @@ class HonchoMemoryProvider(MemoryProvider):
return tool_error(f"Honcho {tool_name} failed: {e}")
def shutdown(self) -> None:
for t in (self._prefetch_thread, self._sync_thread):
if t and t.is_alive():
t.join(timeout=5.0)
# Drain the prefetch thread (legacy, unchanged) + the sync worker.
if self._prefetch_thread and self._prefetch_thread.is_alive():
self._prefetch_thread.join(timeout=5.0)
self._sync_worker.shutdown(timeout=5.0)
# Flush any remaining messages
if self._manager:
try: