From c630dfcdac4a64a3d55aa8724c7ca3bdd7e64b85 Mon Sep 17 00:00:00 2001 From: Erosika Date: Sat, 18 Apr 2026 13:07:09 -0400 Subject: [PATCH] =?UTF-8?q?feat(honcho):=20dialectic=20liveness=20?= =?UTF-8?q?=E2=80=94=20stale-thread=20watchdog,=20stale-result=20discard,?= =?UTF-8?q?=20empty-streak=20backoff?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hardens the dialectic lifecycle against three failure modes that could leave the prefetch pipeline stuck or injecting stale content: - Stale-thread watchdog: _thread_is_live() treats any prefetch thread older than timeout × 2.0 as dead. A hung Honcho call can no longer block subsequent fires indefinitely. - Stale-result discard: pending _prefetch_result is tagged with its fire turn. prefetch() discards the result if more than cadence × 2 turns passed before a consumer read it (e.g. a run of trivial-prompt turns between fire and read). - Empty-streak backoff: consecutive empty dialectic returns widen the effective cadence (dialectic_cadence + streak, capped at cadence × 8). A healthy fire resets the streak. Prevents the plugin from hammering the backend every turn when the peer graph is cold. - liveness_snapshot() on the provider exposes current turn, last fire, pending fire-at, empty streak, effective cadence, and thread status for in-process diagnostics. - system_prompt_block: nudge the model that honcho_reasoning accepts reasoning_level minimal/low/medium/high/max per call. - hermes honcho status: surface base reasoning level, cap, and heuristic toggle so config drift is visible at a glance. Tests: 550 passed. - TestDialecticLiveness (8 tests): stale-thread recovery, stale-result discard, fresh-result retention, backoff widening, backoff ceiling, streak reset on success, streak increment on empty, snapshot shape. - Existing TestDialecticCadenceAdvancesOnSuccess::test_in_flight_thread_is_not_stacked updated to set _prefetch_thread_started_at so it tests the fresh-thread-blocks branch (stale path covered separately). - test_cli TestCmdStatus fake updated with the new config attrs surfaced in the status block. --- plugins/memory/honcho/__init__.py | 120 +++++++++++++++++++-- plugins/memory/honcho/cli.py | 3 + tests/honcho_plugin/test_cli.py | 3 + tests/honcho_plugin/test_session.py | 156 +++++++++++++++++++++++++++- 4 files changed, 266 insertions(+), 16 deletions(-) diff --git a/plugins/memory/honcho/__init__.py b/plugins/memory/honcho/__init__.py index 51345b8e9..68fa86885 100644 --- a/plugins/memory/honcho/__init__.py +++ b/plugins/memory/honcho/__init__.py @@ -19,6 +19,7 @@ import json import logging import re import threading +import time from typing import Any, Dict, List, Optional from agent.memory_provider import MemoryProvider @@ -214,6 +215,11 @@ class HonchoMemoryProvider(MemoryProvider): self._last_context_turn = -999 self._last_dialectic_turn = -999 + # Liveness + observability state + self._prefetch_thread_started_at: float = 0.0 # monotonic ts of current thread + self._prefetch_result_fired_at: int = -999 # turn the pending result was fired at + self._dialectic_empty_streak: int = 0 # consecutive empty returns + # Port #1957: lazy session init for tools-only mode self._session_initialized = False self._lazy_init_kwargs: Optional[dict] = None @@ -413,13 +419,19 @@ class HonchoMemoryProvider(MemoryProvider): r = self._run_dialectic_depth(_prewarm_query) except Exception as exc: logger.debug("Honcho dialectic prewarm failed: %s", exc) + self._dialectic_empty_streak += 1 return if r and r.strip(): with self._prefetch_lock: self._prefetch_result = r + self._prefetch_result_fired_at = 0 # Treat prewarm as turn 0 so cadence gating starts clean. self._last_dialectic_turn = 0 + self._dialectic_empty_streak = 0 + else: + self._dialectic_empty_streak += 1 + self._prefetch_thread_started_at = time.monotonic() self._prefetch_thread = threading.Thread( target=_prewarm_dialectic, daemon=True, name="honcho-prewarm-dialectic" ) @@ -513,7 +525,8 @@ class HonchoMemoryProvider(MemoryProvider): "# Honcho Memory\n" "Active (tools-only mode). Use honcho_profile for a quick factual snapshot, " "honcho_search for raw excerpts, honcho_context for raw peer context, " - "honcho_reasoning for synthesized answers, " + "honcho_reasoning for synthesized answers (pass reasoning_level " + "minimal/low/medium/high/max — you pick the depth per call), " "honcho_conclude to save facts about the user. " "No automatic context injection — you must use tools to access memory." ) @@ -523,7 +536,8 @@ class HonchoMemoryProvider(MemoryProvider): "Active (hybrid mode). Relevant context is auto-injected AND memory tools are available. " "Use honcho_profile for a quick factual snapshot, " "honcho_search for raw excerpts, honcho_context for raw peer context, " - "honcho_reasoning for synthesized answers, " + "honcho_reasoning for synthesized answers (pass reasoning_level " + "minimal/low/medium/high/max — you pick the depth per call), " "honcho_conclude to save facts about the user." ) @@ -611,14 +625,20 @@ class HonchoMemoryProvider(MemoryProvider): r = self._run_dialectic_depth(query) except Exception as exc: logger.debug("Honcho first-turn dialectic failed: %s", exc) + self._dialectic_empty_streak += 1 return if r and r.strip(): with self._prefetch_lock: self._prefetch_result = r + self._prefetch_result_fired_at = _fired_at # Advance cadence only on a non-empty result so the next # turn retries when the call returned nothing. self._last_dialectic_turn = _fired_at + self._dialectic_empty_streak = 0 + else: + self._dialectic_empty_streak += 1 + self._prefetch_thread_started_at = time.monotonic() self._prefetch_thread = threading.Thread( target=_run_first_turn, daemon=True, name="honcho-prefetch-first" ) @@ -635,7 +655,21 @@ class HonchoMemoryProvider(MemoryProvider): self._prefetch_thread.join(timeout=3.0) with self._prefetch_lock: dialectic_result = self._prefetch_result + fired_at = self._prefetch_result_fired_at self._prefetch_result = "" + self._prefetch_result_fired_at = -999 + + # Discard stale pending results: if the fire happened more than + # cadence × multiplier turns ago (e.g. a run of trivial-prompt turns + # passed without consumption), the content likely no longer tracks + # the current conversational pivot. + stale_limit = self._dialectic_cadence * self._STALE_RESULT_MULTIPLIER + if dialectic_result and fired_at >= 0 and (self._turn_count - fired_at) > stale_limit: + logger.debug( + "Honcho pending dialectic discarded as stale: fired_at=%d, " + "turn=%d, limit=%d", fired_at, self._turn_count, stale_limit, + ) + dialectic_result = "" if dialectic_result and dialectic_result.strip(): parts.append(dialectic_result) @@ -693,18 +727,23 @@ class HonchoMemoryProvider(MemoryProvider): logger.debug("Honcho context prefetch failed: %s", e) # ----- Dialectic prefetch (supplement layer) ----- - # Guard against thread pile-up: if a prior dialectic is still in flight, - # let it finish instead of stacking races on _prefetch_result. - if self._prefetch_thread and self._prefetch_thread.is_alive(): + # Thread-alive guard with stale-thread recovery: a hung Honcho call + # older than timeout × multiplier is treated as dead so it can't + # block subsequent fires. + if self._thread_is_live(): logger.debug("Honcho dialectic prefetch skipped: prior thread still running") return - # B5: cadence check — skip if too soon since last *successful* dialectic call. - # The gate applies uniformly (including cadence=1): "every turn" means once - # per turn, not twice on the same turn when first-turn sync already fired. - if (self._turn_count - self._last_dialectic_turn) < self._dialectic_cadence: - logger.debug("Honcho dialectic prefetch skipped: cadence %d, turns since last: %d", - self._dialectic_cadence, self._turn_count - self._last_dialectic_turn) + # Cadence gate, widened by the empty-streak backoff so a persistently + # silent backend doesn't retry every turn forever. + effective = self._effective_cadence() + if (self._turn_count - self._last_dialectic_turn) < effective: + logger.debug( + "Honcho dialectic prefetch skipped: effective cadence %d " + "(base %d, empty streak %d), turns since last: %d", + effective, self._dialectic_cadence, self._dialectic_empty_streak, + self._turn_count - self._last_dialectic_turn, + ) return # Cadence advances only on a non-empty result so empty returns @@ -716,12 +755,18 @@ class HonchoMemoryProvider(MemoryProvider): result = self._run_dialectic_depth(query) except Exception as e: logger.debug("Honcho prefetch failed: %s", e) + self._dialectic_empty_streak += 1 return if result and result.strip(): with self._prefetch_lock: self._prefetch_result = result + self._prefetch_result_fired_at = _fired_at self._last_dialectic_turn = _fired_at + self._dialectic_empty_streak = 0 + else: + self._dialectic_empty_streak += 1 + self._prefetch_thread_started_at = time.monotonic() self._prefetch_thread = threading.Thread( target=_run, daemon=True, name="honcho-prefetch" ) @@ -750,6 +795,59 @@ class HonchoMemoryProvider(MemoryProvider): _HEURISTIC_LENGTH_MEDIUM = 120 _HEURISTIC_LENGTH_HIGH = 400 + # Liveness constants. A thread older than timeout × multiplier is treated + # as dead so a hung Honcho call can't block future retries indefinitely. + _STALE_THREAD_MULTIPLIER = 2.0 + # Pending result whose fire-turn is older than cadence × multiplier is + # discarded on read so we don't inject context for a stale conversational + # pivot after a gap of trivial-prompt turns. + _STALE_RESULT_MULTIPLIER = 2 + # Cap on the empty-streak backoff so a persistently silent backend + # eventually settles on a ceiling instead of unbounded widening. + _BACKOFF_MAX = 8 + + def _thread_is_live(self) -> bool: + """Thread-alive guard that treats threads older than the stale + threshold as dead, so a hung Honcho request can't block new fires.""" + if not self._prefetch_thread or not self._prefetch_thread.is_alive(): + return False + timeout = (self._config.timeout if self._config and self._config.timeout else 8.0) + age = time.monotonic() - self._prefetch_thread_started_at + if age > timeout * self._STALE_THREAD_MULTIPLIER: + logger.debug( + "Honcho prefetch thread age %.1fs exceeds stale threshold " + "%.1fs — treating as dead", age, timeout * self._STALE_THREAD_MULTIPLIER, + ) + return False + return True + + def _effective_cadence(self) -> int: + """Cadence plus empty-streak backoff, capped at _BACKOFF_MAX × base.""" + if self._dialectic_empty_streak <= 0: + return self._dialectic_cadence + widened = self._dialectic_cadence + self._dialectic_empty_streak + ceiling = self._dialectic_cadence * self._BACKOFF_MAX + return min(widened, ceiling) + + def liveness_snapshot(self) -> dict: + """In-process snapshot of dialectic liveness state for diagnostics. + + Returns current turn, last successful dialectic turn, pending-result + fire turn, empty streak, effective cadence, and thread status. + """ + thread_age = None + if self._prefetch_thread and self._prefetch_thread.is_alive(): + thread_age = time.monotonic() - self._prefetch_thread_started_at + return { + "turn_count": self._turn_count, + "last_dialectic_turn": self._last_dialectic_turn, + "pending_result_fired_at": self._prefetch_result_fired_at, + "empty_streak": self._dialectic_empty_streak, + "effective_cadence": self._effective_cadence(), + "thread_alive": thread_age is not None, + "thread_age_seconds": thread_age, + } + def _apply_reasoning_heuristic(self, base: str, query: str) -> str: """Scale `base` up by query length, clamped at reasoning_level_cap. diff --git a/plugins/memory/honcho/cli.py b/plugins/memory/honcho/cli.py index c73dd66f3..eb21c48ea 100644 --- a/plugins/memory/honcho/cli.py +++ b/plugins/memory/honcho/cli.py @@ -638,6 +638,9 @@ def cmd_status(args) -> None: raw = getattr(hcfg, "raw", None) or {} dialectic_cadence = raw.get("dialecticCadence") or 1 print(f" Dialectic cad: every {dialectic_cadence} turn{'s' if dialectic_cadence != 1 else ''}") + reasoning_cap = raw.get("reasoningLevelCap") or hcfg.reasoning_level_cap + heuristic_on = "on" if hcfg.reasoning_heuristic else "off" + print(f" Reasoning: base={hcfg.dialectic_reasoning_level}, cap={reasoning_cap}, heuristic={heuristic_on}") print(f" Observation: user(me={hcfg.user_observe_me},others={hcfg.user_observe_others}) ai(me={hcfg.ai_observe_me},others={hcfg.ai_observe_others})") print(f" Write freq: {hcfg.write_frequency}") diff --git a/tests/honcho_plugin/test_cli.py b/tests/honcho_plugin/test_cli.py index 006d687dc..a6fc39ea7 100644 --- a/tests/honcho_plugin/test_cli.py +++ b/tests/honcho_plugin/test_cli.py @@ -26,6 +26,9 @@ class TestCmdStatus: write_frequency = "async" session_strategy = "per-session" context_tokens = 800 + dialectic_reasoning_level = "low" + reasoning_level_cap = "high" + reasoning_heuristic = True def resolve_session_name(self): return "hermes" diff --git a/tests/honcho_plugin/test_session.py b/tests/honcho_plugin/test_session.py index 83db3f24d..37f54b541 100644 --- a/tests/honcho_plugin/test_session.py +++ b/tests/honcho_plugin/test_session.py @@ -823,8 +823,11 @@ def _settle_prewarm(provider): provider._prefetch_thread.join(timeout=3.0) with provider._prefetch_lock: provider._prefetch_result = "" + provider._prefetch_result_fired_at = -999 provider._prefetch_thread = None + provider._prefetch_thread_started_at = 0.0 provider._last_dialectic_turn = -999 + provider._dialectic_empty_streak = 0 if getattr(provider, "_manager", None) is not None: try: provider._manager.dialectic_query.reset_mock() @@ -1227,26 +1230,28 @@ class TestDialecticCadenceAdvancesOnSuccess: def test_in_flight_thread_is_not_stacked(self): import threading as _threading + import time as _time provider = self._make_provider() provider._session_key = "test" provider._turn_count = 10 provider._last_dialectic_turn = 0 - # Simulate a prior thread still running + # Simulate a prior thread still running (fresh, not stale) hold = _threading.Event() def _block(): hold.wait(timeout=5.0) - stale = _threading.Thread(target=_block, daemon=True) - stale.start() - provider._prefetch_thread = stale + fresh = _threading.Thread(target=_block, daemon=True) + fresh.start() + provider._prefetch_thread = fresh + provider._prefetch_thread_started_at = _time.monotonic() # fresh start provider.queue_prefetch("hello") # Should have short-circuited — no new dialectic call assert provider._manager.dialectic_query.call_count == 0 hold.set() - stale.join(timeout=2.0) + fresh.join(timeout=2.0) class TestSessionStartDialecticPrewarm: @@ -1321,6 +1326,147 @@ class TestSessionStartDialecticPrewarm: assert p._manager.dialectic_query.call_count == 1 +class TestDialecticLiveness: + """Liveness + observability: stale-thread recovery, stale-result discard, + empty-streak backoff, and the snapshot method used for diagnostics.""" + + @staticmethod + def _make_provider(cfg_extra=None): + from unittest.mock import patch, MagicMock + from plugins.memory.honcho.client import HonchoClientConfig + + defaults = dict(api_key="test-key", enabled=True, recall_mode="hybrid", timeout=2.0) + if cfg_extra: + defaults.update(cfg_extra) + cfg = HonchoClientConfig(**defaults) + provider = HonchoMemoryProvider() + mock_manager = MagicMock() + mock_manager.get_or_create.return_value = MagicMock(messages=[]) + mock_manager.get_prefetch_context.return_value = None + mock_manager.pop_context_result.return_value = None + mock_manager.dialectic_query.return_value = "" # default: silent + + with patch("plugins.memory.honcho.client.HonchoClientConfig.from_global_config", return_value=cfg), \ + patch("plugins.memory.honcho.client.get_honcho_client", return_value=MagicMock()), \ + patch("plugins.memory.honcho.session.HonchoSessionManager", return_value=mock_manager), \ + patch("hermes_constants.get_hermes_home", return_value=MagicMock()): + provider.initialize(session_id="test-liveness") + _settle_prewarm(provider) + return provider + + def test_stale_thread_is_treated_as_dead(self): + """A thread older than timeout × multiplier no longer blocks new fires.""" + import threading as _threading + p = self._make_provider() + p._session_key = "test" + p._turn_count = 10 + p._last_dialectic_turn = 0 + p._manager.dialectic_query.return_value = "fresh synthesis" + + # Plant an alive thread with an old timestamp (stale) + hold = _threading.Event() + stuck = _threading.Thread(target=lambda: hold.wait(timeout=10.0), daemon=True) + stuck.start() + p._prefetch_thread = stuck + # timeout=2.0, multiplier=2.0, so anything older than 4s is stale + p._prefetch_thread_started_at = 0.0 # very old (1970 monotonic baseline) + + p.queue_prefetch("hello") + # New thread should have been spawned since stuck one is stale + assert p._prefetch_thread is not stuck, "stale thread must be recycled" + if p._prefetch_thread: + p._prefetch_thread.join(timeout=2.0) + assert p._manager.dialectic_query.call_count == 1 + hold.set() + stuck.join(timeout=2.0) + + def test_stale_pending_result_is_discarded_on_read(self): + """A pending dialectic result from many turns ago is discarded + instead of injected against a fresh conversational pivot.""" + p = self._make_provider(cfg_extra={"raw": {"dialecticCadence": 2}}) + p._session_key = "test" + p._base_context_cache = "base ctx" + with p._prefetch_lock: + p._prefetch_result = "ancient synthesis" + p._prefetch_result_fired_at = 1 + # cadence=2, multiplier=2 → stale after 4 turns since fire + p._turn_count = 10 + p._last_dialectic_turn = 1 # prevents sync first-turn path + + result = p.prefetch("what's new") + assert "ancient synthesis" not in result, "stale pending must be discarded" + # Cache slot cleared + with p._prefetch_lock: + assert p._prefetch_result == "" + assert p._prefetch_result_fired_at == -999 + + def test_fresh_pending_result_is_kept(self): + """A pending result within the staleness window is injected normally.""" + p = self._make_provider(cfg_extra={"raw": {"dialecticCadence": 3}}) + p._session_key = "test" + p._base_context_cache = "" + with p._prefetch_lock: + p._prefetch_result = "recent synthesis" + p._prefetch_result_fired_at = 8 + p._turn_count = 9 # 1 turn since fire, well within cadence × 2 = 6 + p._last_dialectic_turn = 8 + + result = p.prefetch("what's new") + assert "recent synthesis" in result + + def test_empty_streak_widens_effective_cadence(self): + """After N empty returns, the gate waits cadence + N turns.""" + p = self._make_provider(cfg_extra={"raw": {"dialecticCadence": 1}}) + p._dialectic_empty_streak = 3 + # cadence=1, streak=3 → effective = 4 + assert p._effective_cadence() == 4 + + def test_backoff_is_capped(self): + """Effective cadence is capped at cadence × _BACKOFF_MAX.""" + p = self._make_provider(cfg_extra={"raw": {"dialecticCadence": 2}}) + p._dialectic_empty_streak = 100 + # cadence=2, ceiling = 2 × 8 = 16 + assert p._effective_cadence() == 16 + + def test_success_resets_empty_streak(self): + """A non-empty result zeroes the streak so healthy operation restores + the base cadence immediately.""" + p = self._make_provider(cfg_extra={"raw": {"dialecticCadence": 1}}) + p._session_key = "test" + p._dialectic_empty_streak = 5 + p._turn_count = 10 + p._last_dialectic_turn = 0 + p._manager.dialectic_query.return_value = "real output" + + p.queue_prefetch("hello") + if p._prefetch_thread: + p._prefetch_thread.join(timeout=2.0) + assert p._dialectic_empty_streak == 0 + assert p._last_dialectic_turn == 10 + + def test_empty_result_increments_streak(self): + p = self._make_provider(cfg_extra={"raw": {"dialecticCadence": 1}}) + p._session_key = "test" + p._turn_count = 5 + p._last_dialectic_turn = 0 + p._manager.dialectic_query.return_value = "" # empty + + p.queue_prefetch("hello") + if p._prefetch_thread: + p._prefetch_thread.join(timeout=2.0) + assert p._dialectic_empty_streak == 1 + assert p._last_dialectic_turn == 0 # cadence not advanced + + def test_liveness_snapshot_shape(self): + p = self._make_provider() + snap = p.liveness_snapshot() + for key in ( + "turn_count", "last_dialectic_turn", "pending_result_fired_at", + "empty_streak", "effective_cadence", "thread_alive", "thread_age_seconds", + ): + assert key in snap + + class TestDialecticLifecycleSmoke: """End-to-end smoke walking a multi-turn session through prewarm, turn 1 consume, trivial skip, cadence fire, empty-result retry,