From dd7921d51443bcd6b9225e2759d7ad161c5c40db Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 16 Mar 2026 00:23:47 -0700 Subject: [PATCH] fix(honcho): isolate session routing for multi-user gateway (#1500) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Salvaged from PR #1470 by adavyas. Core fix: Honcho tool calls in a multi-session gateway could route to the wrong session because honcho_tools.py relied on process-global state. Now threads session context through the call chain: AIAgent._invoke_tool() → handle_function_call() → registry.dispatch() → handler **kw → _resolve_session_context() Changes: - Add _resolve_session_context() to prefer per-call context over globals - Plumb honcho_manager + honcho_session_key through handle_function_call - Add sync_honcho=False to run_conversation() for synthetic flush turns - Pass honcho_session_key through gateway memory flush lifecycle - Harden gateway PID detection when /proc cmdline is unreadable - Make interrupt test scripts import-safe for pytest-xdist - Wrap BibTeX examples in Jekyll raw blocks for docs build - Fix thread-order-dependent assertion in client lifecycle test - Expand Honcho docs: session isolation, lifecycle, routing internals Dropped from original PR: - Indentation change in _create_request_openai_client that would move client creation inside the lock (causes unnecessary contention) Co-authored-by: adavyas --- gateway/run.py | 31 ++- gateway/status.py | 26 +- model_tools.py | 6 + run_agent.py | 11 +- skills/research/arxiv/SKILL.md | 2 + .../references/citation-workflow.md | 2 + tests/gateway/test_honcho_lifecycle.py | 28 ++ tests/gateway/test_resume_command.py | 25 ++ tests/gateway/test_status.py | 16 ++ tests/run_interrupt_test.py | 215 ++++++++------- tests/test_interactive_interrupt.py | 261 +++++++++--------- tests/test_openai_client_lifecycle.py | 5 +- tests/test_run_agent.py | 36 ++- tests/tools/test_honcho_tools.py | 36 +++ tools/honcho_tools.py | 27 +- .../docs/developer-guide/gateway-internals.md | 28 +- website/docs/user-guide/features/honcho.md | 19 ++ 17 files changed, 522 insertions(+), 252 deletions(-) create mode 100644 tests/tools/test_honcho_tools.py diff --git a/gateway/run.py b/gateway/run.py index d27c9ba4c..7c309d484 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -478,7 +478,11 @@ class GatewayRunner: # ----------------------------------------------------------------- - def _flush_memories_for_session(self, old_session_id: str): + def _flush_memories_for_session( + self, + old_session_id: str, + honcho_session_key: Optional[str] = None, + ): """Prompt the agent to save memories/skills before context is lost. Synchronous worker — meant to be called via run_in_executor from @@ -506,6 +510,7 @@ class GatewayRunner: quiet_mode=True, enabled_toolsets=["memory", "skills"], session_id=old_session_id, + honcho_session_key=honcho_session_key, ) # Build conversation history from transcript @@ -533,6 +538,7 @@ class GatewayRunner: tmp_agent.run_conversation( user_message=flush_prompt, conversation_history=msgs, + sync_honcho=False, ) logger.info("Pre-reset memory flush completed for session %s", old_session_id) # Flush any queued Honcho writes before the session is dropped @@ -544,10 +550,19 @@ class GatewayRunner: except Exception as e: logger.debug("Pre-reset memory flush failed for session %s: %s", old_session_id, e) - async def _async_flush_memories(self, old_session_id: str): + async def _async_flush_memories( + self, + old_session_id: str, + honcho_session_key: Optional[str] = None, + ): """Run the sync memory flush in a thread pool so it won't block the event loop.""" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id) + await loop.run_in_executor( + None, + self._flush_memories_for_session, + old_session_id, + honcho_session_key, + ) @property def should_exit_cleanly(self) -> bool: @@ -923,7 +938,7 @@ class GatewayRunner: entry.session_id, key, ) try: - await self._async_flush_memories(entry.session_id) + await self._async_flush_memories(entry.session_id, key) self._shutdown_gateway_honcho(key) self.session_store._pre_flushed_sessions.add(entry.session_id) except Exception as e: @@ -1904,7 +1919,9 @@ class GatewayRunner: try: old_entry = self.session_store._entries.get(session_key) if old_entry: - asyncio.create_task(self._async_flush_memories(old_entry.session_id)) + asyncio.create_task( + self._async_flush_memories(old_entry.session_id, session_key) + ) except Exception as e: logger.debug("Gateway memory flush on reset failed: %s", e) @@ -3171,7 +3188,9 @@ class GatewayRunner: # Flush memories for current session before switching try: - asyncio.create_task(self._async_flush_memories(current_entry.session_id)) + asyncio.create_task( + self._async_flush_memories(current_entry.session_id, session_key) + ) except Exception as e: logger.debug("Memory flush on resume failed: %s", e) diff --git a/gateway/status.py b/gateway/status.py index 3362a7786..dda6e2321 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -83,8 +83,7 @@ def _looks_like_gateway_process(pid: int) -> bool: """Return True when the live PID still looks like the Hermes gateway.""" cmdline = _read_process_cmdline(pid) if not cmdline: - # If we cannot inspect the process, fall back to the liveness check. - return True + return False patterns = ( "hermes_cli.main gateway", @@ -94,6 +93,24 @@ def _looks_like_gateway_process(pid: int) -> bool: return any(pattern in cmdline for pattern in patterns) +def _record_looks_like_gateway(record: dict[str, Any]) -> bool: + """Validate gateway identity from PID-file metadata when cmdline is unavailable.""" + if record.get("kind") != _GATEWAY_KIND: + return False + + argv = record.get("argv") + if not isinstance(argv, list) or not argv: + return False + + cmdline = " ".join(str(part) for part in argv) + patterns = ( + "hermes_cli.main gateway", + "hermes gateway", + "gateway/run.py", + ) + return any(pattern in cmdline for pattern in patterns) + + def _build_pid_record() -> dict: return { "pid": os.getpid(), @@ -325,8 +342,9 @@ def get_running_pid() -> Optional[int]: return None if not _looks_like_gateway_process(pid): - remove_pid_file() - return None + if not _record_looks_like_gateway(record): + remove_pid_file() + return None return pid diff --git a/model_tools.py b/model_tools.py index 7ef2df10f..be1f5d02f 100644 --- a/model_tools.py +++ b/model_tools.py @@ -267,6 +267,8 @@ def handle_function_call( task_id: Optional[str] = None, user_task: Optional[str] = None, enabled_tools: Optional[List[str]] = None, + honcho_manager: Optional[Any] = None, + honcho_session_key: Optional[str] = None, ) -> str: """ Main function call dispatcher that routes calls to the tool registry. @@ -306,12 +308,16 @@ def handle_function_call( function_name, function_args, task_id=task_id, enabled_tools=sandbox_enabled, + honcho_manager=honcho_manager, + honcho_session_key=honcho_session_key, ) return registry.dispatch( function_name, function_args, task_id=task_id, user_task=user_task, + honcho_manager=honcho_manager, + honcho_session_key=honcho_session_key, ) except Exception as e: diff --git a/run_agent.py b/run_agent.py index 431c0e6ae..394e31adf 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3790,6 +3790,8 @@ class AIAgent: return handle_function_call( function_name, function_args, effective_task_id, enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, + honcho_manager=self._honcho, + honcho_session_key=self._honcho_session_key, ) def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None: @@ -4132,6 +4134,8 @@ class AIAgent: function_result = handle_function_call( function_name, function_args, effective_task_id, enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, + honcho_manager=self._honcho, + honcho_session_key=self._honcho_session_key, ) _spinner_result = function_result except Exception as tool_error: @@ -4146,6 +4150,8 @@ class AIAgent: function_result = handle_function_call( function_name, function_args, effective_task_id, enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, + honcho_manager=self._honcho, + honcho_session_key=self._honcho_session_key, ) except Exception as tool_error: function_result = f"Error executing tool '{function_name}': {tool_error}" @@ -4410,6 +4416,7 @@ class AIAgent: task_id: str = None, stream_callback: Optional[callable] = None, persist_user_message: Optional[str] = None, + sync_honcho: bool = True, ) -> Dict[str, Any]: """ Run a complete conversation with tool calling until completion. @@ -4425,6 +4432,8 @@ class AIAgent: persist_user_message: Optional clean user message to store in transcripts/history when user_message contains API-only synthetic prefixes. + sync_honcho: When False, skip writing the final synthetic turn back + to Honcho or queuing follow-up prefetch work. Returns: Dict: Complete conversation result with final response and message history @@ -5933,7 +5942,7 @@ class AIAgent: self._persist_session(messages, conversation_history) # Sync conversation to Honcho for user modeling - if final_response and not interrupted: + if final_response and not interrupted and sync_honcho: self._honcho_sync(original_user_message, final_response) self._queue_honcho_prefetch(original_user_message) diff --git a/skills/research/arxiv/SKILL.md b/skills/research/arxiv/SKILL.md index 248f91dc5..eb1ecb3c0 100644 --- a/skills/research/arxiv/SKILL.md +++ b/skills/research/arxiv/SKILL.md @@ -114,6 +114,7 @@ curl -s "https://export.arxiv.org/api/query?id_list=2402.03300,2401.12345,2403.0 After fetching metadata for a paper, generate a BibTeX entry: +{% raw %} ```bash curl -s "https://export.arxiv.org/api/query?id_list=1706.03762" | python3 -c " import sys, xml.etree.ElementTree as ET @@ -139,6 +140,7 @@ print(f' url = {{https://arxiv.org/abs/{raw_id}}}') print('}') " ``` +{% endraw %} ## Reading Paper Content diff --git a/skills/research/ml-paper-writing/references/citation-workflow.md b/skills/research/ml-paper-writing/references/citation-workflow.md index b7ec90b6a..b2b33bd6f 100644 --- a/skills/research/ml-paper-writing/references/citation-workflow.md +++ b/skills/research/ml-paper-writing/references/citation-workflow.md @@ -215,6 +215,7 @@ def generate_citation_key(bibtex: str) -> str: ### Complete Citation Manager Class +{% raw %} ```python """ Citation Manager - Verified citation workflow for ML papers. @@ -377,6 +378,7 @@ if __name__ == "__main__": if bibtex: print(bibtex) ``` +{% endraw %} ### Quick Functions diff --git a/tests/gateway/test_honcho_lifecycle.py b/tests/gateway/test_honcho_lifecycle.py index df8d9bc2e..01cff9182 100644 --- a/tests/gateway/test_honcho_lifecycle.py +++ b/tests/gateway/test_honcho_lifecycle.py @@ -90,6 +90,7 @@ class TestGatewayHonchoLifecycle: runner = _make_runner() event = _make_event() runner._shutdown_gateway_honcho = MagicMock() + runner._async_flush_memories = AsyncMock() runner.session_store = MagicMock() runner.session_store._generate_session_key.return_value = "gateway-key" runner.session_store._entries = { @@ -100,4 +101,31 @@ class TestGatewayHonchoLifecycle: result = await runner._handle_reset_command(event) runner._shutdown_gateway_honcho.assert_called_once_with("gateway-key") + runner._async_flush_memories.assert_called_once_with("old-session", "gateway-key") assert "Session reset" in result + + def test_flush_memories_reuses_gateway_session_key_and_skips_honcho_sync(self): + runner = _make_runner() + runner.session_store = MagicMock() + runner.session_store.load_transcript.return_value = [ + {"role": "user", "content": "a"}, + {"role": "assistant", "content": "b"}, + {"role": "user", "content": "c"}, + {"role": "assistant", "content": "d"}, + ] + tmp_agent = MagicMock() + + with ( + patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "test-key"}), + patch("gateway.run._resolve_gateway_model", return_value="model-name"), + patch("run_agent.AIAgent", return_value=tmp_agent) as mock_agent_cls, + ): + runner._flush_memories_for_session("old-session", "gateway-key") + + mock_agent_cls.assert_called_once() + _, kwargs = mock_agent_cls.call_args + assert kwargs["session_id"] == "old-session" + assert kwargs["honcho_session_key"] == "gateway-key" + tmp_agent.run_conversation.assert_called_once() + _, run_kwargs = tmp_agent.run_conversation.call_args + assert run_kwargs["sync_honcho"] is False diff --git a/tests/gateway/test_resume_command.py b/tests/gateway/test_resume_command.py index 987afbce3..739bc149b 100644 --- a/tests/gateway/test_resume_command.py +++ b/tests/gateway/test_resume_command.py @@ -199,3 +199,28 @@ class TestHandleResumeCommand: assert real_key not in runner._running_agents db.close() + + @pytest.mark.asyncio + async def test_resume_flushes_memories_with_gateway_session_key(self, tmp_path): + """Resume should preserve the gateway session key for Honcho flushes.""" + from hermes_state import SessionDB + + db = SessionDB(db_path=tmp_path / "state.db") + db.create_session("old_session", "telegram") + db.set_session_title("old_session", "Old Work") + db.create_session("current_session_001", "telegram") + + event = _make_event(text="/resume Old Work") + runner = _make_runner( + session_db=db, + current_session_id="current_session_001", + event=event, + ) + + await runner._handle_resume_command(event) + + runner._async_flush_memories.assert_called_once_with( + "current_session_001", + _session_key_for_event(event), + ) + db.close() diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index fdf1b57c5..892c4cbdd 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -26,6 +26,22 @@ class TestGatewayPidState: assert status.get_running_pid() is None assert not pid_path.exists() + def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + pid_path = tmp_path / "gateway.pid" + pid_path.write_text(json.dumps({ + "pid": os.getpid(), + "kind": "hermes-gateway", + "argv": ["python", "-m", "hermes_cli.main", "gateway"], + "start_time": 123, + })) + + monkeypatch.setattr(status.os, "kill", lambda pid, sig: None) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) + monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None) + + assert status.get_running_pid() == os.getpid() + class TestGatewayRuntimeStatus: def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch): diff --git a/tests/run_interrupt_test.py b/tests/run_interrupt_test.py index 19ff3009f..845060ffa 100644 --- a/tests/run_interrupt_test.py +++ b/tests/run_interrupt_test.py @@ -16,126 +16,131 @@ from run_agent import AIAgent, IterationBudget from tools.delegate_tool import _run_single_child from tools.interrupt import set_interrupt, is_interrupted -set_interrupt(False) +def main() -> int: + set_interrupt(False) -# Create parent agent (minimal) -parent = AIAgent.__new__(AIAgent) -parent._interrupt_requested = False -parent._interrupt_message = None -parent._active_children = [] -parent.quiet_mode = True -parent.model = "test/model" -parent.base_url = "http://localhost:1" -parent.api_key = "test" -parent.provider = "test" -parent.api_mode = "chat_completions" -parent.platform = "cli" -parent.enabled_toolsets = ["terminal", "file"] -parent.providers_allowed = None -parent.providers_ignored = None -parent.providers_order = None -parent.provider_sort = None -parent.max_tokens = None -parent.reasoning_config = None -parent.prefill_messages = None -parent._session_db = None -parent._delegate_depth = 0 -parent._delegate_spinner = None -parent.tool_progress_callback = None -parent.iteration_budget = IterationBudget(max_total=100) -parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} + # Create parent agent (minimal) + parent = AIAgent.__new__(AIAgent) + parent._interrupt_requested = False + parent._interrupt_message = None + parent._active_children = [] + parent.quiet_mode = True + parent.model = "test/model" + parent.base_url = "http://localhost:1" + parent.api_key = "test" + parent.provider = "test" + parent.api_mode = "chat_completions" + parent.platform = "cli" + parent.enabled_toolsets = ["terminal", "file"] + parent.providers_allowed = None + parent.providers_ignored = None + parent.providers_order = None + parent.provider_sort = None + parent.max_tokens = None + parent.reasoning_config = None + parent.prefill_messages = None + parent._session_db = None + parent._delegate_depth = 0 + parent._delegate_spinner = None + parent.tool_progress_callback = None + parent.iteration_budget = IterationBudget(max_total=100) + parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} -child_started = threading.Event() -result_holder = [None] + child_started = threading.Event() + result_holder = [None] + def run_delegate(): + with patch("run_agent.OpenAI") as MockOpenAI: + mock_client = MagicMock() -def run_delegate(): - with patch("run_agent.OpenAI") as MockOpenAI: - mock_client = MagicMock() + def slow_create(**kwargs): + time.sleep(3) + resp = MagicMock() + resp.choices = [MagicMock()] + resp.choices[0].message.content = "Done" + resp.choices[0].message.tool_calls = None + resp.choices[0].message.refusal = None + resp.choices[0].finish_reason = "stop" + resp.usage.prompt_tokens = 100 + resp.usage.completion_tokens = 10 + resp.usage.total_tokens = 110 + resp.usage.prompt_tokens_details = None + return resp - def slow_create(**kwargs): - time.sleep(3) - resp = MagicMock() - resp.choices = [MagicMock()] - resp.choices[0].message.content = "Done" - resp.choices[0].message.tool_calls = None - resp.choices[0].message.refusal = None - resp.choices[0].finish_reason = "stop" - resp.usage.prompt_tokens = 100 - resp.usage.completion_tokens = 10 - resp.usage.total_tokens = 110 - resp.usage.prompt_tokens_details = None - return resp + mock_client.chat.completions.create = slow_create + mock_client.close = MagicMock() + MockOpenAI.return_value = mock_client - mock_client.chat.completions.create = slow_create - mock_client.close = MagicMock() - MockOpenAI.return_value = mock_client + original_init = AIAgent.__init__ - original_init = AIAgent.__init__ + def patched_init(self_agent, *a, **kw): + original_init(self_agent, *a, **kw) + child_started.set() - def patched_init(self_agent, *a, **kw): - original_init(self_agent, *a, **kw) - child_started.set() + with patch.object(AIAgent, "__init__", patched_init): + try: + result = _run_single_child( + task_index=0, + goal="Test slow task", + context=None, + toolsets=["terminal"], + model="test/model", + max_iterations=5, + parent_agent=parent, + task_count=1, + override_provider="test", + override_base_url="http://localhost:1", + override_api_key="test", + override_api_mode="chat_completions", + ) + result_holder[0] = result + except Exception as e: + print(f"ERROR in delegate: {e}") + import traceback + traceback.print_exc() - with patch.object(AIAgent, "__init__", patched_init): - try: - result = _run_single_child( - task_index=0, - goal="Test slow task", - context=None, - toolsets=["terminal"], - model="test/model", - max_iterations=5, - parent_agent=parent, - task_count=1, - override_provider="test", - override_base_url="http://localhost:1", - override_api_key="test", - override_api_mode="chat_completions", - ) - result_holder[0] = result - except Exception as e: - print(f"ERROR in delegate: {e}") - import traceback - traceback.print_exc() + print("Starting agent thread...") + agent_thread = threading.Thread(target=run_delegate, daemon=True) + agent_thread.start() + started = child_started.wait(timeout=10) + if not started: + print("ERROR: Child never started") + set_interrupt(False) + return 1 -print("Starting agent thread...") -agent_thread = threading.Thread(target=run_delegate, daemon=True) -agent_thread.start() + time.sleep(0.5) -started = child_started.wait(timeout=10) -if not started: - print("ERROR: Child never started") - sys.exit(1) + print(f"Active children: {len(parent._active_children)}") + for i, c in enumerate(parent._active_children): + print(f" Child {i}: _interrupt_requested={c._interrupt_requested}") -time.sleep(0.5) + t0 = time.monotonic() + parent.interrupt("User typed a new message") + print("Called parent.interrupt()") -print(f"Active children: {len(parent._active_children)}") -for i, c in enumerate(parent._active_children): - print(f" Child {i}: _interrupt_requested={c._interrupt_requested}") + for i, c in enumerate(parent._active_children): + print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}") + print(f"Global is_interrupted: {is_interrupted()}") -t0 = time.monotonic() -parent.interrupt("User typed a new message") -print(f"Called parent.interrupt()") + agent_thread.join(timeout=10) + elapsed = time.monotonic() - t0 + print(f"Agent thread finished in {elapsed:.2f}s") -for i, c in enumerate(parent._active_children): - print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}") -print(f"Global is_interrupted: {is_interrupted()}") - -agent_thread.join(timeout=10) -elapsed = time.monotonic() - t0 -print(f"Agent thread finished in {elapsed:.2f}s") - -result = result_holder[0] -if result: - print(f"Status: {result['status']}") - print(f"Duration: {result['duration_seconds']}s") - if elapsed < 2.0: - print("✅ PASS: Interrupt detected quickly!") + result = result_holder[0] + if result: + print(f"Status: {result['status']}") + print(f"Duration: {result['duration_seconds']}s") + if elapsed < 2.0: + print("✅ PASS: Interrupt detected quickly!") + else: + print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected") else: - print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected") -else: - print("❌ FAIL: No result!") + print("❌ FAIL: No result!") -set_interrupt(False) + set_interrupt(False) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_interactive_interrupt.py b/tests/test_interactive_interrupt.py index bb90c7452..c01404e1c 100644 --- a/tests/test_interactive_interrupt.py +++ b/tests/test_interactive_interrupt.py @@ -29,51 +29,6 @@ from unittest.mock import MagicMock, patch from run_agent import AIAgent, IterationBudget from tools.interrupt import set_interrupt, is_interrupted -set_interrupt(False) - -# ─── Create parent agent ─── -parent = AIAgent.__new__(AIAgent) -parent._interrupt_requested = False -parent._interrupt_message = None -parent._active_children = [] -parent.quiet_mode = True -parent.model = "test/model" -parent.base_url = "http://localhost:1" -parent.api_key = "test" -parent.provider = "test" -parent.api_mode = "chat_completions" -parent.platform = "cli" -parent.enabled_toolsets = ["terminal", "file"] -parent.providers_allowed = None -parent.providers_ignored = None -parent.providers_order = None -parent.provider_sort = None -parent.max_tokens = None -parent.reasoning_config = None -parent.prefill_messages = None -parent._session_db = None -parent._delegate_depth = 0 -parent._delegate_spinner = None -parent.tool_progress_callback = None -parent.iteration_budget = IterationBudget(max_total=100) -parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} - -# Monkey-patch parent.interrupt to log -_original_interrupt = AIAgent.interrupt -def logged_interrupt(self, message=None): - log.info(f"🔴 parent.interrupt() called with: {message!r}") - log.info(f" _active_children count: {len(self._active_children)}") - _original_interrupt(self, message) - log.info(f" After interrupt: _interrupt_requested={self._interrupt_requested}") - for i, c in enumerate(self._active_children): - log.info(f" Child {i}._interrupt_requested={c._interrupt_requested}") -parent.interrupt = lambda msg=None: logged_interrupt(parent, msg) - -# ─── Simulate the exact CLI flow ─── -interrupt_queue = queue.Queue() -child_running = threading.Event() -agent_result = [None] - def make_slow_response(delay=2.0): """API response that takes a while.""" def create(**kwargs): @@ -94,96 +49,154 @@ def make_slow_response(delay=2.0): return create -def agent_thread_func(): - """Simulates the agent_thread in cli.py's chat() method.""" - log.info("🟢 agent_thread starting") +def main() -> int: + set_interrupt(False) - with patch("run_agent.OpenAI") as MockOpenAI: - mock_client = MagicMock() - mock_client.chat.completions.create = make_slow_response(delay=3.0) - mock_client.close = MagicMock() - MockOpenAI.return_value = mock_client + # ─── Create parent agent ─── + parent = AIAgent.__new__(AIAgent) + parent._interrupt_requested = False + parent._interrupt_message = None + parent._active_children = [] + parent.quiet_mode = True + parent.model = "test/model" + parent.base_url = "http://localhost:1" + parent.api_key = "test" + parent.provider = "test" + parent.api_mode = "chat_completions" + parent.platform = "cli" + parent.enabled_toolsets = ["terminal", "file"] + parent.providers_allowed = None + parent.providers_ignored = None + parent.providers_order = None + parent.provider_sort = None + parent.max_tokens = None + parent.reasoning_config = None + parent.prefill_messages = None + parent._session_db = None + parent._delegate_depth = 0 + parent._delegate_spinner = None + parent.tool_progress_callback = None + parent.iteration_budget = IterationBudget(max_total=100) + parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} - from tools.delegate_tool import _run_single_child + # Monkey-patch parent.interrupt to log + _original_interrupt = AIAgent.interrupt - # Signal that child is about to start - original_init = AIAgent.__init__ - def patched_init(self_agent, *a, **kw): - log.info("🟡 Child AIAgent.__init__ called") - original_init(self_agent, *a, **kw) - child_running.set() - log.info(f"🟡 Child started, parent._active_children = {len(parent._active_children)}") + def logged_interrupt(self, message=None): + log.info(f"🔴 parent.interrupt() called with: {message!r}") + log.info(f" _active_children count: {len(self._active_children)}") + _original_interrupt(self, message) + log.info(f" After interrupt: _interrupt_requested={self._interrupt_requested}") + for i, child in enumerate(self._active_children): + log.info(f" Child {i}._interrupt_requested={child._interrupt_requested}") - with patch.object(AIAgent, "__init__", patched_init): - result = _run_single_child( - task_index=0, - goal="Do a slow thing", - context=None, - toolsets=["terminal"], - model="test/model", - max_iterations=3, - parent_agent=parent, - task_count=1, - override_provider="test", - override_base_url="http://localhost:1", - override_api_key="test", - override_api_mode="chat_completions", - ) - agent_result[0] = result - log.info(f"🟢 agent_thread finished. Result status: {result.get('status')}") + parent.interrupt = lambda msg=None: logged_interrupt(parent, msg) + # ─── Simulate the exact CLI flow ─── + interrupt_queue = queue.Queue() + child_running = threading.Event() + agent_result = [None] -# ─── Start agent thread (like chat() does) ─── -agent_thread = threading.Thread(target=agent_thread_func, name="agent_thread", daemon=True) -agent_thread.start() + def agent_thread_func(): + """Simulates the agent_thread in cli.py's chat() method.""" + log.info("🟢 agent_thread starting") -# ─── Wait for child to start ─── -if not child_running.wait(timeout=10): - print("FAIL: Child never started", file=sys.stderr) - sys.exit(1) + with patch("run_agent.OpenAI") as MockOpenAI: + mock_client = MagicMock() + mock_client.chat.completions.create = make_slow_response(delay=3.0) + mock_client.close = MagicMock() + MockOpenAI.return_value = mock_client -# Give child time to enter its main loop and start API call -time.sleep(1.0) + from tools.delegate_tool import _run_single_child -# ─── Simulate user typing a message (like handle_enter does) ─── -log.info("📝 Simulating user typing 'Hey stop that'") -interrupt_queue.put("Hey stop that") + # Signal that child is about to start + original_init = AIAgent.__init__ -# ─── Simulate chat() polling loop (like the real chat() method) ─── -log.info("📡 Starting interrupt queue polling (like chat())") -interrupt_msg = None -poll_count = 0 -while agent_thread.is_alive(): - try: - interrupt_msg = interrupt_queue.get(timeout=0.1) - if interrupt_msg: - log.info(f"📨 Got interrupt message from queue: {interrupt_msg!r}") - log.info(f" Calling parent.interrupt()...") - parent.interrupt(interrupt_msg) - log.info(f" parent.interrupt() returned. Breaking poll loop.") - break - except queue.Empty: - poll_count += 1 - if poll_count % 20 == 0: # Log every 2s - log.info(f" Still polling ({poll_count} iterations)...") + def patched_init(self_agent, *a, **kw): + log.info("🟡 Child AIAgent.__init__ called") + original_init(self_agent, *a, **kw) + child_running.set() + log.info( + f"🟡 Child started, parent._active_children = {len(parent._active_children)}" + ) -# ─── Wait for agent to finish ─── -log.info("⏳ Waiting for agent_thread to join...") -t0 = time.monotonic() -agent_thread.join(timeout=10) -elapsed = time.monotonic() - t0 -log.info(f"✅ agent_thread joined after {elapsed:.2f}s") + with patch.object(AIAgent, "__init__", patched_init): + result = _run_single_child( + task_index=0, + goal="Do a slow thing", + context=None, + toolsets=["terminal"], + model="test/model", + max_iterations=3, + parent_agent=parent, + task_count=1, + override_provider="test", + override_base_url="http://localhost:1", + override_api_key="test", + override_api_mode="chat_completions", + ) + agent_result[0] = result + log.info(f"🟢 agent_thread finished. Result status: {result.get('status')}") -# ─── Check results ─── -result = agent_result[0] -if result: - log.info(f"Result status: {result['status']}") - log.info(f"Result duration: {result['duration_seconds']}s") - if result["status"] == "interrupted" and elapsed < 2.0: - print("✅ PASS: Interrupt worked correctly!", file=sys.stderr) - else: + # ─── Start agent thread (like chat() does) ─── + agent_thread = threading.Thread(target=agent_thread_func, name="agent_thread", daemon=True) + agent_thread.start() + + # ─── Wait for child to start ─── + if not child_running.wait(timeout=10): + print("FAIL: Child never started", file=sys.stderr) + set_interrupt(False) + return 1 + + # Give child time to enter its main loop and start API call + time.sleep(1.0) + + # ─── Simulate user typing a message (like handle_enter does) ─── + log.info("📝 Simulating user typing 'Hey stop that'") + interrupt_queue.put("Hey stop that") + + # ─── Simulate chat() polling loop (like the real chat() method) ─── + log.info("📡 Starting interrupt queue polling (like chat())") + interrupt_msg = None + poll_count = 0 + while agent_thread.is_alive(): + try: + interrupt_msg = interrupt_queue.get(timeout=0.1) + if interrupt_msg: + log.info(f"📨 Got interrupt message from queue: {interrupt_msg!r}") + log.info(" Calling parent.interrupt()...") + parent.interrupt(interrupt_msg) + log.info(" parent.interrupt() returned. Breaking poll loop.") + break + except queue.Empty: + poll_count += 1 + if poll_count % 20 == 0: # Log every 2s + log.info(f" Still polling ({poll_count} iterations)...") + + # ─── Wait for agent to finish ─── + log.info("⏳ Waiting for agent_thread to join...") + t0 = time.monotonic() + agent_thread.join(timeout=10) + elapsed = time.monotonic() - t0 + log.info(f"✅ agent_thread joined after {elapsed:.2f}s") + + # ─── Check results ─── + result = agent_result[0] + if result: + log.info(f"Result status: {result['status']}") + log.info(f"Result duration: {result['duration_seconds']}s") + if result["status"] == "interrupted" and elapsed < 2.0: + print("✅ PASS: Interrupt worked correctly!", file=sys.stderr) + set_interrupt(False) + return 0 print(f"❌ FAIL: status={result['status']}, elapsed={elapsed:.2f}s", file=sys.stderr) -else: - print("❌ FAIL: No result returned", file=sys.stderr) + set_interrupt(False) + return 1 -set_interrupt(False) + print("❌ FAIL: No result returned", file=sys.stderr) + set_interrupt(False) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_openai_client_lifecycle.py b/tests/test_openai_client_lifecycle.py index dc3ed7714..695737895 100644 --- a/tests/test_openai_client_lifecycle.py +++ b/tests/test_openai_client_lifecycle.py @@ -145,8 +145,9 @@ def test_concurrent_requests_do_not_break_each_other_when_one_client_closes(monk thread_one.join(timeout=5) thread_two.join(timeout=5) - assert isinstance(results["first"], APIConnectionError) - assert results["second"] == {"ok": "second"} + values = list(results.values()) + assert sum(isinstance(value, APIConnectionError) for value in values) == 1 + assert values.count({"ok": "second"}) == 1 assert len(factory.calls) == 2 diff --git a/tests/test_run_agent.py b/tests/test_run_agent.py index 1d54c9d01..2cc37fc51 100644 --- a/tests/test_run_agent.py +++ b/tests/test_run_agent.py @@ -930,8 +930,10 @@ class TestConcurrentToolExecution: mock_hfc.assert_called_once_with( "web_search", {"q": "test"}, "task-1", enabled_tools=list(agent.valid_tool_names), + honcho_manager=None, + honcho_session_key=None, ) - assert result == "result" + assert result == "result" def test_invoke_tool_handles_agent_level_tools(self, agent): """_invoke_tool should handle todo tool directly.""" @@ -1584,6 +1586,38 @@ class TestSystemPromptStability: should_prefetch = not conversation_history assert should_prefetch is True + def test_run_conversation_can_skip_honcho_sync_for_synthetic_turns(self, agent): + captured = {} + + def _fake_api_call(api_kwargs): + captured.update(api_kwargs) + return _mock_response(content="done", finish_reason="stop") + + agent._honcho = MagicMock() + agent._honcho_session_key = "session-1" + agent._honcho_config = SimpleNamespace( + ai_peer="hermes", + memory_mode="hybrid", + write_frequency="async", + recall_mode="hybrid", + ) + agent._use_prompt_caching = False + + with ( + patch.object(agent, "_honcho_sync") as mock_sync, + patch.object(agent, "_queue_honcho_prefetch") as mock_prefetch, + patch.object(agent, "_persist_session"), + patch.object(agent, "_save_trajectory"), + patch.object(agent, "_cleanup_task_resources"), + patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call), + ): + result = agent.run_conversation("synthetic flush turn", sync_honcho=False) + + assert result["completed"] is True + assert captured["messages"][-1]["content"] == "synthetic flush turn" + mock_sync.assert_not_called() + mock_prefetch.assert_not_called() + class TestHonchoActivation: def test_disabled_config_skips_honcho_init(self): diff --git a/tests/tools/test_honcho_tools.py b/tests/tools/test_honcho_tools.py new file mode 100644 index 000000000..16e144541 --- /dev/null +++ b/tests/tools/test_honcho_tools.py @@ -0,0 +1,36 @@ +"""Regression tests for per-call Honcho tool session routing.""" + +import json +from unittest.mock import MagicMock + +from tools import honcho_tools + + +class TestHonchoToolSessionContext: + def setup_method(self): + self.orig_manager = honcho_tools._session_manager + self.orig_key = honcho_tools._session_key + + def teardown_method(self): + honcho_tools._session_manager = self.orig_manager + honcho_tools._session_key = self.orig_key + + def test_explicit_call_context_wins_over_module_global_state(self): + global_manager = MagicMock() + global_manager.get_peer_card.return_value = ["global"] + explicit_manager = MagicMock() + explicit_manager.get_peer_card.return_value = ["explicit"] + + honcho_tools.set_session_context(global_manager, "global-session") + + result = json.loads( + honcho_tools._handle_honcho_profile( + {}, + honcho_manager=explicit_manager, + honcho_session_key="explicit-session", + ) + ) + + assert result == {"result": ["explicit"]} + explicit_manager.get_peer_card.assert_called_once_with("explicit-session") + global_manager.get_peer_card.assert_not_called() diff --git a/tools/honcho_tools.py b/tools/honcho_tools.py index 6ee8ad655..4aa86d57a 100644 --- a/tools/honcho_tools.py +++ b/tools/honcho_tools.py @@ -49,6 +49,13 @@ def _check_honcho_available() -> bool: return _session_manager is not None and _session_key is not None +def _resolve_session_context(**kwargs): + """Prefer the calling agent's session context over module-global fallback.""" + session_manager = kwargs.get("honcho_manager") or _session_manager + session_key = kwargs.get("honcho_session_key") or _session_key + return session_manager, session_key + + # ── honcho_profile ── _PROFILE_SCHEMA = { @@ -69,10 +76,11 @@ _PROFILE_SCHEMA = { def _handle_honcho_profile(args: dict, **kw) -> str: - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) try: - card = _session_manager.get_peer_card(_session_key) + card = session_manager.get_peer_card(session_key) if not card: return json.dumps({"result": "No profile facts available yet. The user's profile builds over time through conversations."}) return json.dumps({"result": card}) @@ -113,11 +121,12 @@ def _handle_honcho_search(args: dict, **kw) -> str: query = args.get("query", "") if not query: return json.dumps({"error": "Missing required parameter: query"}) - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) max_tokens = min(int(args.get("max_tokens", 800)), 2000) try: - result = _session_manager.search_context(_session_key, query, max_tokens=max_tokens) + result = session_manager.search_context(session_key, query, max_tokens=max_tokens) if not result: return json.dumps({"result": "No relevant context found."}) return json.dumps({"result": result}) @@ -158,11 +167,12 @@ def _handle_honcho_context(args: dict, **kw) -> str: query = args.get("query", "") if not query: return json.dumps({"error": "Missing required parameter: query"}) - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) peer_target = args.get("peer", "user") try: - result = _session_manager.dialectic_query(_session_key, query, peer=peer_target) + result = session_manager.dialectic_query(session_key, query, peer=peer_target) return json.dumps({"result": result or "No result from Honcho."}) except Exception as e: logger.error("Error querying Honcho context: %s", e) @@ -200,10 +210,11 @@ def _handle_honcho_conclude(args: dict, **kw) -> str: conclusion = args.get("conclusion", "") if not conclusion: return json.dumps({"error": "Missing required parameter: conclusion"}) - if not _session_manager or not _session_key: + session_manager, session_key = _resolve_session_context(**kw) + if not session_manager or not session_key: return json.dumps({"error": "Honcho is not active for this session."}) try: - ok = _session_manager.create_conclusion(_session_key, conclusion) + ok = session_manager.create_conclusion(session_key, conclusion) if ok: return json.dumps({"result": f"Conclusion saved: {conclusion}"}) return json.dumps({"error": "Failed to save conclusion."}) diff --git a/website/docs/developer-guide/gateway-internals.md b/website/docs/developer-guide/gateway-internals.md index 6edaf6504..8df6fd958 100644 --- a/website/docs/developer-guide/gateway-internals.md +++ b/website/docs/developer-guide/gateway-internals.md @@ -86,7 +86,33 @@ The gateway also runs maintenance tasks such as: ## Honcho interaction -When Honcho is enabled, the gateway can keep persistent Honcho managers aligned with session lifetimes and platform-specific session keys. +When Honcho is enabled, the gateway keeps persistent Honcho managers aligned with session lifetimes and platform-specific session keys. + +### Session routing + +Honcho tools (`honcho_profile`, `honcho_search`, `honcho_context`, `honcho_conclude`) need to execute against the correct user's Honcho session. In a multi-user gateway, the process-global module state in `tools/honcho_tools.py` is insufficient — multiple sessions may be active concurrently. + +The solution threads session context through the call chain: + +``` +AIAgent._invoke_tool() + → handle_function_call(honcho_manager=..., honcho_session_key=...) + → registry.dispatch(**kwargs) + → _handle_honcho_*(args, **kw) + → _resolve_session_context(**kw) # prefers explicit kwargs over module globals +``` + +`_resolve_session_context()` in `honcho_tools.py` checks for `honcho_manager` and `honcho_session_key` in the kwargs first, falling back to the module-global `_session_manager` / `_session_key` for CLI mode where there's only one session. + +### Memory flush lifecycle + +When a session is reset, resumed, or expires, the gateway flushes memories before discarding context. The flush creates a temporary `AIAgent` with: + +- `session_id` set to the old session's ID (so transcripts load correctly) +- `honcho_session_key` set to the gateway session key (so Honcho writes go to the right place) +- `sync_honcho=False` passed to `run_conversation()` (so the synthetic flush turn doesn't write back to Honcho's conversation history) + +After the flush completes, any queued Honcho writes are drained and the gateway-level Honcho manager is shut down for that session key. ## Related docs diff --git a/website/docs/user-guide/features/honcho.md b/website/docs/user-guide/features/honcho.md index 3902b530e..f9748070e 100644 --- a/website/docs/user-guide/features/honcho.md +++ b/website/docs/user-guide/features/honcho.md @@ -247,6 +247,25 @@ Dialectic queries scale reasoning effort with message complexity: The gateway creates short-lived `AIAgent` instances per request. Honcho managers are owned at the gateway session layer (`_honcho_managers` dict) so they persist across requests within the same session and flush at real session boundaries (reset, resume, expiry, server stop). +#### Session Isolation + +Each gateway session (e.g., a Telegram chat, a Discord channel) gets its own Honcho session context. The session key — derived from the platform and chat ID — is threaded through the entire tool dispatch chain so that Honcho tool calls always execute against the correct session, even when multiple users are messaging concurrently. + +This means: +- **`honcho_profile`**, **`honcho_search`**, **`honcho_context`**, and **`honcho_conclude`** all resolve the correct session at call time, not at startup +- Background memory flushes (triggered by `/reset`, `/resume`, or session expiry) preserve the original session key so they write to the correct Honcho session +- Synthetic flush turns (where the agent saves memories before context is lost) skip Honcho sync to avoid polluting conversation history with internal bookkeeping + +#### Session Lifecycle + +| Event | What happens to Honcho | +|-------|------------------------| +| New message arrives | Agent inherits the gateway's Honcho manager + session key | +| `/reset` | Memory flush fires with the old session key, then Honcho manager shuts down | +| `/resume` | Current session is flushed, then the resumed session's Honcho context loads | +| Session expiry | Automatic flush + shutdown after the configured idle timeout | +| Gateway stop | All active Honcho managers are flushed and shut down gracefully | + ## Tools When Honcho is active, four tools become available. Availability is gated dynamically — they are invisible when Honcho is disabled.