diff --git a/gateway/run.py b/gateway/run.py index 7ee9582321a..40cb4a8de26 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -14565,6 +14565,61 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _context_length = getattr(_agent.context_compressor, "context_length", 0) or 0 _resolved_model = getattr(_agent, "model", None) if _agent else None + # Sync session_id immediately after run_conversation(). Compression + # can rotate before a follow-up model call fails; the failure return + # below must still point the gateway at the compressed child. + agent = agent_holder[0] + _session_was_split = False + agent_session_id = getattr(agent, 'session_id', session_id) if agent else session_id + if agent and session_key and agent_session_id != session_id: + _session_was_split = True + logger.info( + "Session split detected: %s → %s (compression)", + session_id, agent_session_id, + ) + entry = self.session_store._entries.get(session_key) + if entry: + entry.session_id = agent_session_id + self.session_store._save() + + # If this is a Telegram DM and source.thread_id was lost during + # the session split (synthetic / recovered event), restore it + # from the binding so _thread_metadata_for_source produces the + # correct message_thread_id instead of routing to the General + # thread. Failure here is non-fatal — we log and continue; + # worst case the message lands in General, which is the + # pre-fix behaviour. + if ( + getattr(source, "platform", None) == Platform.TELEGRAM + and getattr(source, "chat_type", None) == "dm" + and getattr(source, "thread_id", None) is None + and self._session_db is not None + ): + try: + _binding = self._session_db.get_telegram_topic_binding_by_session( + session_id=agent_session_id, + ) + if _binding and _binding.get("thread_id"): + source.thread_id = str(_binding["thread_id"]) + logger.debug( + "Restored source.thread_id=%s from binding after session split %s → %s", + source.thread_id, + session_id, + agent_session_id, + ) + except Exception: + logger.debug( + "Failed to restore thread_id from binding after session split", + exc_info=True, + ) + if entry: + self._sync_telegram_topic_binding( + source, entry, reason="agent-run-compression", + ) + + effective_session_id = agent_session_id + _effective_history_offset = 0 if _session_was_split else len(agent_history) + if not final_response: error_msg = f"⚠️ {result['error']}" if result.get("error") else "" return { @@ -14579,7 +14634,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew "error": result.get("error"), "compression_exhausted": result.get("compression_exhausted", False), "tools": tools_holder[0] or [], - "history_offset": len(agent_history), + "history_offset": _effective_history_offset, + "session_id": effective_session_id, "last_prompt_tokens": _last_prompt_toks, "input_tokens": _input_toks, "output_tokens": _output_toks, @@ -14625,63 +14681,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew unique_tags.insert(0, "[[audio_as_voice]]") final_response = final_response + "\n" + "\n".join(unique_tags) - # Sync session_id: the agent may have created a new session during - # mid-run context compression (_compress_context splits sessions). - # If so, update the session store entry so the NEXT message loads - # the compressed transcript, not the stale pre-compression one. - agent = agent_holder[0] - _session_was_split = False - if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id: - _session_was_split = True - logger.info( - "Session split detected: %s → %s (compression)", - session_id, agent.session_id, - ) - entry = self.session_store._entries.get(session_key) - if entry: - entry.session_id = agent.session_id - self.session_store._save() - - # If this is a Telegram DM and source.thread_id was lost during - # the session split (synthetic / recovered event), restore it - # from the binding so _thread_metadata_for_source produces the - # correct message_thread_id instead of routing to the General - # thread. Failure here is non-fatal — we log and continue; - # worst case the message lands in General, which is the - # pre-fix behaviour. - if ( - getattr(source, "platform", None) == Platform.TELEGRAM - and getattr(source, "chat_type", None) == "dm" - and getattr(source, "thread_id", None) is None - and self._session_db is not None - ): - try: - _binding = self._session_db.get_telegram_topic_binding_by_session( - session_id=agent.session_id, - ) - if _binding and _binding.get("thread_id"): - source.thread_id = str(_binding["thread_id"]) - logger.debug( - "Restored source.thread_id=%s from binding after session split %s → %s", - source.thread_id, - session_id, - agent.session_id, - ) - except Exception: - logger.debug( - "Failed to restore thread_id from binding after session split", - exc_info=True, - ) - - effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id - - # When compression created a new session, the messages list was - # shortened. Using the original history offset would produce an - # empty new_messages slice, causing the gateway to write only a - # user/assistant pair — losing the compressed summary and tail. - # Reset to 0 so the gateway writes ALL compressed messages. - _effective_history_offset = 0 if _session_was_split else len(agent_history) - # Auto-generate session title after first exchange (non-blocking) if final_response and self._session_db: try: diff --git a/scripts/release.py b/scripts/release.py index 9d2e275d4b5..77b7eef9aeb 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -78,6 +78,7 @@ AUTHOR_MAP = { "290859878+synapsesx@users.noreply.github.com": "synapsesx", "157689911+itsflownium@users.noreply.github.com": "itsflownium", "dirtyren@users.noreply.github.com": "dirtyren", + "JustinBao@outlook.com": "justinbao19", "kdunn926@gmail.com": "kdunn926", "mvanhorn@MacBook-Pro.local": "mvanhorn", "470766206@qq.com": "youjunxiaji", diff --git a/tests/gateway/test_compression_failure_session_sync.py b/tests/gateway/test_compression_failure_session_sync.py new file mode 100644 index 00000000000..5e093eb0bf5 --- /dev/null +++ b/tests/gateway/test_compression_failure_session_sync.py @@ -0,0 +1,160 @@ +import asyncio +import sys +import threading +import types +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import gateway.run as gateway_run +from gateway.config import Platform +from gateway.session import SessionSource + + +SESSION_KEY = "agent:main:telegram:dm:12345" + + +class _SessionStore: + def __init__(self): + self.entry = SimpleNamespace( + session_key=SESSION_KEY, + session_id="session-before-compression", + ) + self._entries = {SESSION_KEY: self.entry} + self.save_calls = 0 + + def _save(self): + self.save_calls += 1 + + +class _CompressionThenFailureAgent: + def __init__(self, **kwargs): + self.session_id = kwargs["session_id"] + self.model = kwargs["model"] + self.tools = [] + self.context_compressor = SimpleNamespace( + last_prompt_tokens=4321, + context_length=200000, + ) + self.session_prompt_tokens = 4321 + self.session_completion_tokens = 0 + + def run_conversation(self, user_message, conversation_history=None, task_id=None, **_kwargs): + self.session_id = "session-after-compression" + return { + "failed": True, + "error": "APIConnectionError: Codex auxiliary Responses stream exceeded 120.0s total timeout", + "messages": [ + {"role": "user", "content": "[compressed summary]"}, + {"role": "user", "content": user_message}, + ], + "api_calls": 1, + } + + def interrupt(self, *_args, **_kwargs): + pass + + +class _StreamConsumer: + final_response_sent = False + + def __init__(self, *_args, **_kwargs): + pass + + async def run(self): + return None + + def finish(self): + pass + + +class _Adapter: + SUPPORTS_MESSAGE_EDITING = True + _pending_messages = {} + + def get_pending_message(self, _session_key): + return None + + async def send_typing(self, *_args, **_kwargs): + return None + + async def stop_typing(self, *_args, **_kwargs): + return None + + +def _runner(session_store): + runner = object.__new__(gateway_run.GatewayRunner) + runner.adapters = {Platform.TELEGRAM: _Adapter()} + runner.config = SimpleNamespace(streaming=None, group_sessions_per_user=True, thread_sessions_per_user=False) + runner.hooks = SimpleNamespace(loaded_hooks=False, emit=AsyncMock()) + runner.session_store = session_store + runner._session_db = MagicMock() + runner._session_db.get_telegram_topic_binding_by_session.return_value = None + runner._agent_cache = {} + runner._agent_cache_lock = threading.Lock() + runner._running_agents = {} + runner._running_agents_ts = {} + runner._session_run_generation = {} + runner._session_model_overrides = {} + runner._pending_model_notes = {} + runner._pending_skills_reload_notes = {} + runner._prefill_messages = [] + runner._ephemeral_system_prompt = "" + runner._reasoning_config = None + runner._provider_routing = {} + runner._fallback_model = None + runner._draining = False + runner._get_proxy_url = lambda: None + runner._resolve_session_agent_runtime = lambda **_kwargs: ( + "gpt-5.4", + {"provider": "openai-codex", "api_mode": "codex_responses", "base_url": "https://chatgpt.com/backend-api/codex", "api_key": "token"}, + ) + runner._resolve_session_reasoning_config = lambda **_kwargs: None + runner._resolve_turn_agent_config = lambda message, model, runtime: {"model": model, "runtime": runtime} + runner._load_service_tier = lambda: None + runner._agent_config_signature = lambda *_args, **_kwargs: ("sig",) + runner._extract_cache_busting_config = lambda _config: () + runner._thread_metadata_for_source = lambda *_args, **_kwargs: None + runner._sync_telegram_topic_binding = MagicMock() + runner._release_running_agent_state = MagicMock() + return runner + + +def test_failed_turn_still_syncs_compression_session_split(monkeypatch): + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = _CompressionThenFailureAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "off") + monkeypatch.setenv("HERMES_AGENT_TIMEOUT", "0") + monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {}) + monkeypatch.setattr("gateway.stream_consumer.GatewayStreamConsumer", _StreamConsumer) + + import hermes_cli.tools_config as tools_config + + monkeypatch.setattr(tools_config, "_get_platform_tools", lambda *_args, **_kwargs: {"core"}) + + session_store = _SessionStore() + runner = _runner(session_store) + source = SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm", user_id="user-1") + + result = asyncio.run( + asyncio.wait_for( + runner._run_agent( + message="continue", + context_prompt="", + history=[{"role": "user", "content": "old question"}], + source=source, + session_id="session-before-compression", + session_key=SESSION_KEY, + ), + timeout=2, + ) + ) + + assert result["failed"] is True + assert result["session_id"] == "session-after-compression" + assert result["history_offset"] == 0 + assert session_store.entry.session_id == "session-after-compression" + assert session_store.save_calls == 1 + runner._sync_telegram_topic_binding.assert_called_once_with( + source, session_store.entry, reason="agent-run-compression" + )