fix(gateway): sync compression session splits before failures

Salvages PR #25747 by preserving gateway session rotation even when a post-compression model call fails before returning final content.

Co-authored-by: Hermes <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
Black-Kylin 2026-06-13 03:30:36 -07:00 committed by Teknium
parent 2d474e39c7
commit 202e318cb1
3 changed files with 218 additions and 58 deletions

View file

@ -14565,6 +14565,61 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_context_length = getattr(_agent.context_compressor, "context_length", 0) or 0
_resolved_model = getattr(_agent, "model", None) if _agent else None
# Sync session_id immediately after run_conversation(). Compression
# can rotate before a follow-up model call fails; the failure return
# below must still point the gateway at the compressed child.
agent = agent_holder[0]
_session_was_split = False
agent_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
if agent and session_key and agent_session_id != session_id:
_session_was_split = True
logger.info(
"Session split detected: %s%s (compression)",
session_id, agent_session_id,
)
entry = self.session_store._entries.get(session_key)
if entry:
entry.session_id = agent_session_id
self.session_store._save()
# If this is a Telegram DM and source.thread_id was lost during
# the session split (synthetic / recovered event), restore it
# from the binding so _thread_metadata_for_source produces the
# correct message_thread_id instead of routing to the General
# thread. Failure here is non-fatal — we log and continue;
# worst case the message lands in General, which is the
# pre-fix behaviour.
if (
getattr(source, "platform", None) == Platform.TELEGRAM
and getattr(source, "chat_type", None) == "dm"
and getattr(source, "thread_id", None) is None
and self._session_db is not None
):
try:
_binding = self._session_db.get_telegram_topic_binding_by_session(
session_id=agent_session_id,
)
if _binding and _binding.get("thread_id"):
source.thread_id = str(_binding["thread_id"])
logger.debug(
"Restored source.thread_id=%s from binding after session split %s%s",
source.thread_id,
session_id,
agent_session_id,
)
except Exception:
logger.debug(
"Failed to restore thread_id from binding after session split",
exc_info=True,
)
if entry:
self._sync_telegram_topic_binding(
source, entry, reason="agent-run-compression",
)
effective_session_id = agent_session_id
_effective_history_offset = 0 if _session_was_split else len(agent_history)
if not final_response:
error_msg = f"⚠️ {result['error']}" if result.get("error") else ""
return {
@ -14579,7 +14634,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"error": result.get("error"),
"compression_exhausted": result.get("compression_exhausted", False),
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"history_offset": _effective_history_offset,
"session_id": effective_session_id,
"last_prompt_tokens": _last_prompt_toks,
"input_tokens": _input_toks,
"output_tokens": _output_toks,
@ -14625,63 +14681,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
unique_tags.insert(0, "[[audio_as_voice]]")
final_response = final_response + "\n" + "\n".join(unique_tags)
# Sync session_id: the agent may have created a new session during
# mid-run context compression (_compress_context splits sessions).
# If so, update the session store entry so the NEXT message loads
# the compressed transcript, not the stale pre-compression one.
agent = agent_holder[0]
_session_was_split = False
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
_session_was_split = True
logger.info(
"Session split detected: %s%s (compression)",
session_id, agent.session_id,
)
entry = self.session_store._entries.get(session_key)
if entry:
entry.session_id = agent.session_id
self.session_store._save()
# If this is a Telegram DM and source.thread_id was lost during
# the session split (synthetic / recovered event), restore it
# from the binding so _thread_metadata_for_source produces the
# correct message_thread_id instead of routing to the General
# thread. Failure here is non-fatal — we log and continue;
# worst case the message lands in General, which is the
# pre-fix behaviour.
if (
getattr(source, "platform", None) == Platform.TELEGRAM
and getattr(source, "chat_type", None) == "dm"
and getattr(source, "thread_id", None) is None
and self._session_db is not None
):
try:
_binding = self._session_db.get_telegram_topic_binding_by_session(
session_id=agent.session_id,
)
if _binding and _binding.get("thread_id"):
source.thread_id = str(_binding["thread_id"])
logger.debug(
"Restored source.thread_id=%s from binding after session split %s%s",
source.thread_id,
session_id,
agent.session_id,
)
except Exception:
logger.debug(
"Failed to restore thread_id from binding after session split",
exc_info=True,
)
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
# When compression created a new session, the messages list was
# shortened. Using the original history offset would produce an
# empty new_messages slice, causing the gateway to write only a
# user/assistant pair — losing the compressed summary and tail.
# Reset to 0 so the gateway writes ALL compressed messages.
_effective_history_offset = 0 if _session_was_split else len(agent_history)
# Auto-generate session title after first exchange (non-blocking)
if final_response and self._session_db:
try:

View file

@ -78,6 +78,7 @@ AUTHOR_MAP = {
"290859878+synapsesx@users.noreply.github.com": "synapsesx",
"157689911+itsflownium@users.noreply.github.com": "itsflownium",
"dirtyren@users.noreply.github.com": "dirtyren",
"JustinBao@outlook.com": "justinbao19",
"kdunn926@gmail.com": "kdunn926",
"mvanhorn@MacBook-Pro.local": "mvanhorn",
"470766206@qq.com": "youjunxiaji",

View file

@ -0,0 +1,160 @@
import asyncio
import sys
import threading
import types
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import gateway.run as gateway_run
from gateway.config import Platform
from gateway.session import SessionSource
SESSION_KEY = "agent:main:telegram:dm:12345"
class _SessionStore:
def __init__(self):
self.entry = SimpleNamespace(
session_key=SESSION_KEY,
session_id="session-before-compression",
)
self._entries = {SESSION_KEY: self.entry}
self.save_calls = 0
def _save(self):
self.save_calls += 1
class _CompressionThenFailureAgent:
def __init__(self, **kwargs):
self.session_id = kwargs["session_id"]
self.model = kwargs["model"]
self.tools = []
self.context_compressor = SimpleNamespace(
last_prompt_tokens=4321,
context_length=200000,
)
self.session_prompt_tokens = 4321
self.session_completion_tokens = 0
def run_conversation(self, user_message, conversation_history=None, task_id=None, **_kwargs):
self.session_id = "session-after-compression"
return {
"failed": True,
"error": "APIConnectionError: Codex auxiliary Responses stream exceeded 120.0s total timeout",
"messages": [
{"role": "user", "content": "[compressed summary]"},
{"role": "user", "content": user_message},
],
"api_calls": 1,
}
def interrupt(self, *_args, **_kwargs):
pass
class _StreamConsumer:
final_response_sent = False
def __init__(self, *_args, **_kwargs):
pass
async def run(self):
return None
def finish(self):
pass
class _Adapter:
SUPPORTS_MESSAGE_EDITING = True
_pending_messages = {}
def get_pending_message(self, _session_key):
return None
async def send_typing(self, *_args, **_kwargs):
return None
async def stop_typing(self, *_args, **_kwargs):
return None
def _runner(session_store):
runner = object.__new__(gateway_run.GatewayRunner)
runner.adapters = {Platform.TELEGRAM: _Adapter()}
runner.config = SimpleNamespace(streaming=None, group_sessions_per_user=True, thread_sessions_per_user=False)
runner.hooks = SimpleNamespace(loaded_hooks=False, emit=AsyncMock())
runner.session_store = session_store
runner._session_db = MagicMock()
runner._session_db.get_telegram_topic_binding_by_session.return_value = None
runner._agent_cache = {}
runner._agent_cache_lock = threading.Lock()
runner._running_agents = {}
runner._running_agents_ts = {}
runner._session_run_generation = {}
runner._session_model_overrides = {}
runner._pending_model_notes = {}
runner._pending_skills_reload_notes = {}
runner._prefill_messages = []
runner._ephemeral_system_prompt = ""
runner._reasoning_config = None
runner._provider_routing = {}
runner._fallback_model = None
runner._draining = False
runner._get_proxy_url = lambda: None
runner._resolve_session_agent_runtime = lambda **_kwargs: (
"gpt-5.4",
{"provider": "openai-codex", "api_mode": "codex_responses", "base_url": "https://chatgpt.com/backend-api/codex", "api_key": "token"},
)
runner._resolve_session_reasoning_config = lambda **_kwargs: None
runner._resolve_turn_agent_config = lambda message, model, runtime: {"model": model, "runtime": runtime}
runner._load_service_tier = lambda: None
runner._agent_config_signature = lambda *_args, **_kwargs: ("sig",)
runner._extract_cache_busting_config = lambda _config: ()
runner._thread_metadata_for_source = lambda *_args, **_kwargs: None
runner._sync_telegram_topic_binding = MagicMock()
runner._release_running_agent_state = MagicMock()
return runner
def test_failed_turn_still_syncs_compression_session_split(monkeypatch):
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = _CompressionThenFailureAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "off")
monkeypatch.setenv("HERMES_AGENT_TIMEOUT", "0")
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {})
monkeypatch.setattr("gateway.stream_consumer.GatewayStreamConsumer", _StreamConsumer)
import hermes_cli.tools_config as tools_config
monkeypatch.setattr(tools_config, "_get_platform_tools", lambda *_args, **_kwargs: {"core"})
session_store = _SessionStore()
runner = _runner(session_store)
source = SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm", user_id="user-1")
result = asyncio.run(
asyncio.wait_for(
runner._run_agent(
message="continue",
context_prompt="",
history=[{"role": "user", "content": "old question"}],
source=source,
session_id="session-before-compression",
session_key=SESSION_KEY,
),
timeout=2,
)
)
assert result["failed"] is True
assert result["session_id"] == "session-after-compression"
assert result["history_offset"] == 0
assert session_store.entry.session_id == "session-after-compression"
assert session_store.save_calls == 1
runner._sync_telegram_topic_binding.assert_called_once_with(
source, session_store.entry, reason="agent-run-compression"
)