diff --git a/cli.py b/cli.py index 0292a2b943..3806dc4a3a 100644 --- a/cli.py +++ b/cli.py @@ -940,6 +940,18 @@ def _run_state_db_auto_maintenance(session_db) -> None: except Exception as _prune_exc: logger.debug("Ghost session prune skipped: %s", _prune_exc) + # One-time finalize of orphaned compression continuations (#20001). + try: + if not session_db.get_meta("orphaned_compression_finalize_v1"): + finalized = session_db.finalize_orphaned_compression_sessions() + session_db.set_meta("orphaned_compression_finalize_v1", "1") + if finalized: + logger.info( + "Finalized %d orphaned compression sessions", finalized + ) + except Exception as _finalize_exc: + logger.debug("Orphan compression finalize skipped: %s", _finalize_exc) + cfg = (_load_full_config().get("sessions") or {}) if not cfg.get("auto_prune", False): return diff --git a/gateway/run.py b/gateway/run.py index ed3bd47b96..66c31c4382 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -939,6 +939,52 @@ import weakref as _weakref _gateway_runner_ref: _weakref.ref = lambda: None +def _normalize_empty_agent_response( + agent_result: dict, + response: str, + *, + history_len: int = 0, +) -> str: + """Normalize empty/None agent responses into user-facing messages. + + Consolidates the existing ``failed`` handler and adds a catch-all for + the case where the agent did work (api_calls > 0) but returned no text. + Fix for #18765. + """ + if response: + return response + + if agent_result.get("failed"): + error_detail = agent_result.get("error", "unknown error") + error_str = str(error_detail).lower() + is_context_failure = any( + p in error_str + for p in ("context", "token", "too large", "too long", "exceed", "payload") + ) or ("400" in error_str and history_len > 50) + if is_context_failure: + return ( + "⚠️ Session too large for the model's context window.\n" + "Use /compact to compress the conversation, or " + "/reset to start fresh." + ) + return ( + f"The request failed: {str(error_detail)[:300]}\n" + "Try again or use /reset to start a fresh session." + ) + + api_calls = int(agent_result.get("api_calls", 0) or 0) + if api_calls > 0 and not agent_result.get("interrupted"): + if agent_result.get("partial"): + err = agent_result.get("error", "processing incomplete") + return f"⚠️ Processing stopped: {str(err)[:200]}. Try again." + return ( + "⚠️ Processing completed but no response was generated. " + "This may be a transient error — try sending your message again." + ) + + return response + + class GatewayRunner: """ Main gateway controller. @@ -6439,33 +6485,11 @@ class GatewayRunner: session_key, _e, ) - # Surface error details when the agent failed silently (final_response=None) - if not response and agent_result.get("failed"): - error_detail = agent_result.get("error", "unknown error") - error_str = str(error_detail).lower() - - # Detect context-overflow failures and give specific guidance. - # Generic 400 "Error" from Anthropic with large sessions is the - # most common cause of this (#1630). - _is_ctx_fail = any(p in error_str for p in ( - "context", "token", "too large", "too long", - "exceed", "payload", - )) or ( - "400" in error_str - and len(history) > 50 - ) - - if _is_ctx_fail: - response = ( - "⚠️ Session too large for the model's context window.\n" - "Use /compact to compress the conversation, or " - "/reset to start fresh." - ) - else: - response = ( - f"The request failed: {str(error_detail)[:300]}\n" - "Try again or use /reset to start a fresh session." - ) + # Normalize empty responses: surface errors, partial failures, and + # the case where agent did work but returned no text. Fix for #18765. + response = _normalize_empty_agent_response( + agent_result, response, history_len=len(history), + ) # If the agent's session_id changed during compression, update # session_entry so transcript writes below go to the right session. diff --git a/hermes_state.py b/hermes_state.py index 98bd68bee5..444af16772 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -718,6 +718,45 @@ class SessionDB: self._remove_session_files(sessions_dir, sid) return len(removed_ids) + def finalize_orphaned_compression_sessions(self) -> int: + """Mark orphaned compression continuation sessions as ended. + + Targets child sessions that were never finalized: parent is ended + with reason='compression', child has messages but no end_reason/ended_at + and api_call_count=0. Non-destructive: preserves all messages and sets + end_reason='orphaned_compression'. Fix for #20001. + """ + cutoff = time.time() - 604800 # 7 days + + def _do(conn): + now = time.time() + result = conn.execute( + """ + UPDATE sessions + SET ended_at = ?, + end_reason = 'orphaned_compression' + WHERE api_call_count = 0 + AND end_reason IS NULL + AND ended_at IS NULL + AND started_at < ? + AND parent_session_id IS NOT NULL + AND EXISTS ( + SELECT 1 FROM sessions p + WHERE p.id = sessions.parent_session_id + AND p.end_reason = 'compression' + AND p.ended_at IS NOT NULL + ) + AND EXISTS ( + SELECT 1 FROM messages m + WHERE m.session_id = sessions.id + ) + """, + (now, cutoff), + ) + return result.rowcount + + return self._execute_write(_do) or 0 + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Get a session by ID.""" with self._lock: diff --git a/tests/test_lazy_session_regressions.py b/tests/test_lazy_session_regressions.py new file mode 100644 index 0000000000..511554a417 --- /dev/null +++ b/tests/test_lazy_session_regressions.py @@ -0,0 +1,608 @@ +"""Reproduction tests for #18370 fallout: lazy session creation regressions. + +Tests cover: +1. Bug #20001 — _finalize_session() uses stale session_key after compression rotation +2. Bug #20001 — _sync_session_key_after_compress called post-run_conversation +3. Bug #19029 — pending_title ValueError leaves title wedged +4. Bug #18765 — gateway surfaces null response when agent did work +5. Prune — finalize_orphaned_compression_sessions catches ghost continuations +""" + +import threading +import time +import types +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# =========================================================================== +# Helpers +# =========================================================================== + +def _make_session_db(tmp_path): + """Create a real SessionDB for integration-style tests.""" + from hermes_state import SessionDB + db_path = tmp_path / "test_state.db" + return SessionDB(db_path=db_path) + + +def _tui_session(agent=None, session_key="session-key-old", **extra): + """Minimal TUI gateway session dict matching server._sessions values.""" + return { + "agent": agent if agent is not None else types.SimpleNamespace(session_id=session_key), + "session_key": session_key, + "history": [], + "history_lock": threading.Lock(), + "history_version": 0, + "running": False, + "attached_images": [], + "image_counter": 0, + "cols": 80, + "slash_worker": None, + "show_reasoning": False, + "tool_progress_mode": "all", + "pending_title": None, + **extra, + } + + +# =========================================================================== +# Bug #20001: _finalize_session uses stale session_key +# =========================================================================== + +class TestFinalizeSessionUsesAgentSessionId: + """After compression rotates agent.session_id, _finalize_session() + must call end_session() on the NEW (current) session_id, not the stale + session_key stored in the session dict.""" + + def test_finalize_targets_agent_session_id_not_stale_key(self, tmp_path): + """Reproduction: agent.session_id rotated by compression, but + session['session_key'] still holds old value. _finalize_session() + should end the agent's current session.""" + from tui_gateway import server + + db = _make_session_db(tmp_path) + + # Create two sessions: parent (already ended by compression) and continuation + db.create_session(session_id="parent-session", source="tui", model="test") + db.end_session("parent-session", "compression") + + db.create_session( + session_id="continuation-session", + source="tui", + model="test", + parent_session_id="parent-session", + ) + # Continuation is NOT ended — this is the bug state + + # Agent has rotated to continuation session + agent = types.SimpleNamespace( + session_id="continuation-session", + commit_memory_session=lambda h: None, + ) + + # Session dict still holds stale key (the bug condition) + session = _tui_session( + agent=agent, + session_key="parent-session", + history=[{"role": "user", "content": "hello"}], + ) + + # Monkeypatch _get_db to return our test DB + with patch.object(server, "_get_db", return_value=db): + with patch.object(server, "_notify_session_boundary", lambda *a: None): + server._finalize_session(session, end_reason="tui_close") + + # The continuation session should be ended + continuation = db.get_session("continuation-session") + assert continuation["ended_at"] is not None, ( + "_finalize_session should end the agent's current session (continuation), " + "not the already-ended parent" + ) + assert continuation["end_reason"] == "tui_close" + + def test_finalize_fallback_to_session_key_when_agent_is_none(self, tmp_path): + """When agent is None (e.g. session never fully initialized), + _finalize_session falls back to session_key.""" + from tui_gateway import server + + db = _make_session_db(tmp_path) + db.create_session(session_id="orphan-key", source="tui", model="test") + + session = _tui_session(agent=None, session_key="orphan-key") + + with patch.object(server, "_get_db", return_value=db): + with patch.object(server, "_notify_session_boundary", lambda *a: None): + server._finalize_session(session, end_reason="tui_close") + + row = db.get_session("orphan-key") + assert row["ended_at"] is not None + assert row["end_reason"] == "tui_close" + + +# =========================================================================== +# Bug #20001: _sync_session_key_after_compress post-run_conversation +# =========================================================================== + +class TestSyncSessionKeyAfterAutoCompress: + """When auto-compression fires inside run_conversation(), the post-turn + code in _run_prompt_submit must call _sync_session_key_after_compress + to update session_key for downstream consumers (title, goals, etc.).""" + + def test_session_key_synced_after_run_conversation_with_compression(self, monkeypatch): + """Simulate: run_conversation() internally compresses and rotates + agent.session_id. After it returns, session['session_key'] must match.""" + from tui_gateway import server + + class _CompressingAgent: + """Agent that simulates compression-driven session_id rotation.""" + def __init__(self): + self.session_id = "pre-compress-key" + self._cached_system_prompt = "" + + def run_conversation(self, prompt, conversation_history=None, stream_callback=None): + # Simulate what _compress_context does: rotate session_id + self.session_id = "post-compress-key" + return { + "final_response": "done", + "messages": [ + {"role": "user", "content": prompt}, + {"role": "assistant", "content": "done"}, + ], + } + + agent = _CompressingAgent() + session = _tui_session(agent=agent, session_key="pre-compress-key") + + # Track if _sync_session_key_after_compress was called + sync_calls = [] + original_sync = server._sync_session_key_after_compress + + def _tracking_sync(sid, sess, **kwargs): + sync_calls.append((sid, sess.get("session_key"))) + # Just update the key directly (skip approval routing etc.) + new_id = getattr(sess.get("agent"), "session_id", None) or "" + if new_id and new_id != sess.get("session_key"): + sess["session_key"] = new_id + + monkeypatch.setattr(server, "_sync_session_key_after_compress", _tracking_sync) + monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) + monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None) + monkeypatch.setattr(server, "render_message", lambda raw, cols: None) + + # Use _ImmediateThread pattern to run synchronously + class _ImmediateThread: + def __init__(self, target=None, daemon=None, **kw): + self._target = target + def start(self): + self._target() + + server._sessions["test-sid"] = session + monkeypatch.setattr(server.threading, "Thread", _ImmediateThread) + + try: + server.handle_request({ + "id": "1", + "method": "prompt.submit", + "params": {"session_id": "test-sid", "text": "hello"}, + }) + + # Sync should have been called + assert len(sync_calls) > 0, ( + "_sync_session_key_after_compress must be called after run_conversation " + "to pick up compression-driven session_id rotation" + ) + + # session_key should now match agent.session_id + assert session["session_key"] == "post-compress-key", ( + "session_key must be updated to match agent.session_id after compression" + ) + finally: + server._sessions.pop("test-sid", None) + + +# =========================================================================== +# Bug #19029: pending_title ValueError wedge +# =========================================================================== + +class TestPendingTitleValueError: + """When set_session_title raises ValueError (duplicate/invalid title), + pending_title must be cleared — not left wedged forever.""" + + def test_valueerror_clears_pending_title(self, monkeypatch): + """ValueError from set_session_title should drop pending_title.""" + from tui_gateway import server + + mock_db = MagicMock() + mock_db.set_session_title.side_effect = ValueError("duplicate title") + + class _Agent: + session_id = "test-session" + _cached_system_prompt = "" + def run_conversation(self, prompt, **kw): + return { + "final_response": "ok", + "messages": [{"role": "assistant", "content": "ok"}], + } + + session = _tui_session( + agent=_Agent(), + session_key="test-session", + pending_title="My Title", + ) + + monkeypatch.setattr(server, "_get_db", lambda: mock_db) + monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) + monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None) + monkeypatch.setattr(server, "render_message", lambda raw, cols: None) + monkeypatch.setattr( + server, "_sync_session_key_after_compress", lambda *a, **kw: None + ) + + class _ImmediateThread: + def __init__(self, target=None, daemon=None, **kw): + self._target = target + def start(self): + self._target() + + server._sessions["sid"] = session + monkeypatch.setattr(server.threading, "Thread", _ImmediateThread) + + try: + server.handle_request({ + "id": "1", + "method": "prompt.submit", + "params": {"session_id": "sid", "text": "hello"}, + }) + + # pending_title should be cleared on ValueError, not left wedged + assert session.get("pending_title") is None, ( + "ValueError from set_session_title must clear pending_title " + "so auto-title can take over" + ) + finally: + server._sessions.pop("sid", None) + + def test_other_exception_keeps_pending_title_for_retry(self, monkeypatch): + """Non-ValueError exceptions should keep pending_title for retry.""" + from tui_gateway import server + + mock_db = MagicMock() + mock_db.set_session_title.side_effect = RuntimeError("transient DB lock") + + class _Agent: + session_id = "test-session" + _cached_system_prompt = "" + def run_conversation(self, prompt, **kw): + return { + "final_response": "ok", + "messages": [{"role": "assistant", "content": "ok"}], + } + + session = _tui_session( + agent=_Agent(), + session_key="test-session", + pending_title="My Title", + ) + + monkeypatch.setattr(server, "_get_db", lambda: mock_db) + monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) + monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None) + monkeypatch.setattr(server, "render_message", lambda raw, cols: None) + monkeypatch.setattr( + server, "_sync_session_key_after_compress", lambda *a, **kw: None + ) + + class _ImmediateThread: + def __init__(self, target=None, daemon=None, **kw): + self._target = target + def start(self): + self._target() + + server._sessions["sid"] = session + monkeypatch.setattr(server.threading, "Thread", _ImmediateThread) + + try: + server.handle_request({ + "id": "1", + "method": "prompt.submit", + "params": {"session_id": "sid", "text": "hello"}, + }) + + # Non-ValueError should keep pending_title for retry + assert session.get("pending_title") == "My Title", ( + "Non-ValueError exceptions should keep pending_title intact " + "for retry on next turn" + ) + finally: + server._sessions.pop("sid", None) + + +# =========================================================================== +# Bug #18765: Gateway surfaces null response +# =========================================================================== + +class TestGatewaySurfacesNullResponse: + """When the agent does work (api_calls > 0) but returns no final_response, + the gateway must surface an error to the user instead of silently sending + nothing. Tests exercise the production _normalize_empty_agent_response helper.""" + + def test_partial_response_surfaces_error(self): + """Agent returns partial=True with no response → user sees error.""" + from gateway.run import _normalize_empty_agent_response + + agent_result = { + "final_response": None, + "api_calls": 5, + "partial": True, + "interrupted": False, + "error": "Model generated invalid tool call: nonexistent_tool", + } + + response = agent_result.get("final_response") or "" + response = _normalize_empty_agent_response( + agent_result, response, history_len=10, + ) + + assert response != "", "Null response with api_calls>0 must be surfaced" + assert "nonexistent_tool" in response + + def test_interrupted_response_stays_empty(self): + """Interrupted agent → response stays empty (platform handles UX).""" + from gateway.run import _normalize_empty_agent_response + + agent_result = { + "final_response": None, + "api_calls": 3, + "partial": False, + "interrupted": True, + } + + response = agent_result.get("final_response") or "" + response = _normalize_empty_agent_response( + agent_result, response, history_len=10, + ) + + assert response == "", "Interrupted turns should not get synthetic responses" + + def test_failed_context_overflow(self): + """Agent failed with context overflow → specific guidance message.""" + from gateway.run import _normalize_empty_agent_response + + agent_result = { + "final_response": None, + "api_calls": 0, + "failed": True, + "error": "400 Bad Request: context length exceeded", + } + + response = agent_result.get("final_response") or "" + response = _normalize_empty_agent_response( + agent_result, response, history_len=60, + ) + + assert "context window" in response + assert "/compact" in response + + def test_failed_generic_error(self): + """Agent failed with non-context error → generic error message.""" + from gateway.run import _normalize_empty_agent_response + + agent_result = { + "final_response": None, + "api_calls": 0, + "failed": True, + "error": "500 Internal Server Error", + } + + response = agent_result.get("final_response") or "" + response = _normalize_empty_agent_response( + agent_result, response, history_len=5, + ) + + assert "500 Internal Server Error" in response + assert "/reset" in response + + def test_nonempty_response_passes_through(self): + """Non-empty response is returned unchanged.""" + from gateway.run import _normalize_empty_agent_response + + agent_result = {"final_response": "Hello!", "api_calls": 1} + response = "Hello!" + result = _normalize_empty_agent_response( + agent_result, response, history_len=5, + ) + + assert result == "Hello!" + + +# =========================================================================== +# Prune: finalize_orphaned_compression_sessions +# =========================================================================== + +class TestFinalizeOrphanedCompressionSessions: + """The prune migration marks ghost compression continuations as ended.""" + + def test_marks_ghost_continuation_with_compression_parent(self, tmp_path): + """Ghost session with compression-ended parent + messages → finalized.""" + db = _make_session_db(tmp_path) + + # Parent session (ended by compression — this is the key condition) + db.create_session(session_id="parent", source="tui", model="test") + db.end_session("parent", "compression") + + # Ghost continuation (has messages, never finalized) + db.create_session( + session_id="ghost-cont", + source="tui", + model="test", + parent_session_id="parent", + ) + db.append_message("ghost-cont", role="user", content="hello") + db.append_message("ghost-cont", role="assistant", content="hi") + + # Make it old enough (fake started_at) + db._execute_write( + lambda conn: conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (time.time() - 800000, "ghost-cont"), # ~9 days old + ) + ) + + count = db.finalize_orphaned_compression_sessions() + assert count == 1 + + session = db.get_session("ghost-cont") + assert session["ended_at"] is not None + assert session["end_reason"] == "orphaned_compression" + + def test_skips_session_without_parent(self, tmp_path): + """Ghost session without parent_session_id is NOT a compression + continuation — should not be touched by this prune.""" + db = _make_session_db(tmp_path) + + db.create_session(session_id="ghost-notitle", source="tui", model="test") + db.append_message("ghost-notitle", role="user", content="test") + + db._execute_write( + lambda conn: conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (time.time() - 800000, "ghost-notitle"), + ) + ) + + count = db.finalize_orphaned_compression_sessions() + assert count == 0 + + def test_skips_recent_sessions(self, tmp_path): + """Sessions younger than 7 days are not touched.""" + db = _make_session_db(tmp_path) + + # Create parent first to satisfy FK constraint + db.create_session(session_id="some-parent", source="tui", model="test") + db.create_session( + session_id="recent", + source="tui", + model="test", + parent_session_id="some-parent", + ) + db.append_message("recent", role="user", content="hello") + # started_at is now() — within 7 days + + count = db.finalize_orphaned_compression_sessions() + assert count == 0 + + def test_skips_sessions_with_end_reason(self, tmp_path): + """Properly finalized sessions (even without api_call_count) are skipped.""" + db = _make_session_db(tmp_path) + + # Create parent first to satisfy FK constraint + db.create_session(session_id="parent", source="tui", model="test") + db.end_session("parent", "compression") + + db.create_session( + session_id="already-ended", + source="tui", + model="test", + parent_session_id="parent", + ) + db.append_message("already-ended", role="user", content="hello") + db.end_session("already-ended", "user_exit") + + db._execute_write( + lambda conn: conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (time.time() - 800000, "already-ended"), + ) + ) + + count = db.finalize_orphaned_compression_sessions() + assert count == 0 + + def test_skips_session_with_non_compression_parent(self, tmp_path): + """Child session whose parent was NOT ended by compression should + not be touched — it's not from the compression continuation path.""" + db = _make_session_db(tmp_path) + + # Parent ended by user_exit, not compression + db.create_session(session_id="parent", source="tui", model="test") + db.end_session("parent", "user_exit") + + db.create_session( + session_id="child", + source="tui", + model="test", + parent_session_id="parent", + ) + db.append_message("child", role="user", content="hello") + + db._execute_write( + lambda conn: conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (time.time() - 800000, "child"), + ) + ) + + count = db.finalize_orphaned_compression_sessions() + assert count == 0 + + def test_skips_sessions_without_messages(self, tmp_path): + """Empty sessions (no messages) are NOT targeted by this prune — + those are handled by prune_empty_ghost_sessions().""" + db = _make_session_db(tmp_path) + + # Create parent first to satisfy FK constraint + db.create_session(session_id="parent", source="tui", model="test") + db.end_session("parent", "compression") + + db.create_session( + session_id="empty-ghost", + source="tui", + model="test", + parent_session_id="parent", + ) + # No messages appended + + db._execute_write( + lambda conn: conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (time.time() - 800000, "empty-ghost"), + ) + ) + + count = db.finalize_orphaned_compression_sessions() + assert count == 0 + + def test_titled_ghost_with_parent_is_caught(self, tmp_path): + """Ghost continuation that HAS a title (propagated from parent by + _compress_context) is still caught via parent with end_reason='compression'.""" + db = _make_session_db(tmp_path) + + # Create parent first — ended by compression + db.create_session(session_id="parent", source="tui", model="test") + db.set_session_title("parent", "Chat") + db.end_session("parent", "compression") + + db.create_session( + session_id="titled-ghost", + source="tui", + model="test", + parent_session_id="parent", + ) + db.set_session_title("titled-ghost", "Chat (2)") + db.append_message("titled-ghost", role="user", content="continued...") + + db._execute_write( + lambda conn: conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (time.time() - 800000, "titled-ghost"), + ) + ) + + count = db.finalize_orphaned_compression_sessions() + assert count == 1 + + session = db.get_session("titled-ghost") + assert session["end_reason"] == "orphaned_compression" diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index 469b8895ea..03647f55f0 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -921,56 +921,70 @@ def test_session_title_set_errors_when_row_lookup_fails_after_noop(monkeypatch): def test_session_create_drops_pending_title_on_valueerror(monkeypatch): - unblock_agent = threading.Event() + """When set_session_title raises ValueError during post-message title flush, + pending_title should be dropped (non-retryable). Updated for post-#18370 + lazy session creation where title is applied post-first-message. + """ - class _FakeWorker: - def __init__(self, key, model): - self.key = key - - def close(self): - return None - - class _FakeAgent: + class _Agent: + session_id = "test-session" model = "x" provider = "openrouter" base_url = "" api_key = "" + _cached_system_prompt = "" + + def run_conversation(self, prompt, **kw): + return { + "final_response": "ok", + "messages": [{"role": "assistant", "content": "ok"}], + } class _FakeDB: - def create_session(self, _key, source="tui", model=None): - return None - def set_session_title(self, _key, _title): raise ValueError("Title already in use") - def _make_agent(_sid, _key): - unblock_agent.wait(timeout=2.0) - return _FakeAgent() + class _ImmediateThread: + def __init__(self, target=None, daemon=None, **kw): + self._target = target - monkeypatch.setattr(server, "_make_agent", _make_agent) - monkeypatch.setattr(server, "_SlashWorker", _FakeWorker) + def start(self): + self._target() + + agent = _Agent() + session = { + "agent": agent, + "session_key": "test-session", + "history": [], + "history_lock": threading.Lock(), + "history_version": 0, + "running": False, + "attached_images": [], + "image_counter": 0, + "cols": 80, + "slash_worker": None, + "show_reasoning": False, + "tool_progress_mode": "all", + "pending_title": "duplicate title", + } + + server._sessions["sid"] = session monkeypatch.setattr(server, "_get_db", lambda: _FakeDB()) - monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"}) - monkeypatch.setattr(server, "_probe_credentials", lambda _a: None) - monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None) monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) - - import tools.approval as _approval - - monkeypatch.setattr(_approval, "register_gateway_notify", lambda key, cb: None) - monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None) - - resp = server.handle_request( - {"id": "1", "method": "session.create", "params": {"cols": 80}} + monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None) + monkeypatch.setattr(server, "render_message", lambda raw, cols: None) + monkeypatch.setattr( + server, "_sync_session_key_after_compress", lambda *a, **kw: None ) - sid = resp["result"]["session_id"] - session = server._sessions[sid] - session["pending_title"] = "duplicate title" - unblock_agent.set() - session["agent_ready"].wait(timeout=2.0) + monkeypatch.setattr(server.threading, "Thread", _ImmediateThread) - assert session["pending_title"] is None - server._sessions.pop(sid, None) + try: + server.handle_request( + {"id": "1", "method": "prompt.submit", "params": {"session_id": "sid", "text": "hello"}} + ) + assert session["pending_title"] is None + finally: + server._sessions.pop("sid", None) def test_config_set_yolo_toggles_session_scope(): diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 2a4377c3f1..68b03f091a 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -304,12 +304,14 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No _notify_session_boundary("on_session_finalize", session_id) # Mark session ended in DB so it doesn't linger as a ghost row in /resume. - # Adapted from #18283 (luyao618) and #18299 (Bartok9). - if session_key: + # Use session_id (from agent.session_id) not session_key — after compression, + # session_key may be stale (the ended parent) while session_id is the live + # continuation. Fix for #20001. + if session_id: try: db = _get_db() if db is not None: - db.end_session(session_key, end_reason) + db.end_session(session_id, end_reason) except Exception: pass @@ -1175,7 +1177,13 @@ def _compress_session_history( return len(history) - len(compressed), usage -def _sync_session_key_after_compress(sid: str, session: dict) -> None: +def _sync_session_key_after_compress( + sid: str, + session: dict, + *, + clear_pending_title: bool = True, + restart_slash_worker: bool = True, +) -> None: """Re-anchor session_key when AIAgent._compress_context rotates session_id. AIAgent._compress_context ends the current SessionDB session and creates @@ -1184,7 +1192,14 @@ def _sync_session_key_after_compress(sid: str, session: dict) -> None: approval routing, slash worker init, DB title/history lookups, yolo state). Without this sync, those operations would target the ended parent session while the agent writes to the new continuation session. - Mirrors HermesCLI._manual_compress's session_id sync. + + Policy flags: + clear_pending_title: True for manual /compress (title belongs to old + session). False for post-turn auto-compression (preserve user + intent so pending_title can be applied to the continuation). + restart_slash_worker: True for manual /compress and post-turn + auto-compression (worker holds stale session key). False only + if the caller manages the worker lifecycle separately. """ agent = session.get("agent") new_session_id = getattr(agent, "session_id", None) or "" @@ -1229,11 +1244,13 @@ def _sync_session_key_after_compress(sid: str, session: dict) -> None: # don't keep targeting the ended row. session["session_key"] = new_session_id - session["pending_title"] = None - try: - _restart_slash_worker(session) - except Exception: - pass + if clear_pending_title: + session["pending_title"] = None + if restart_slash_worker: + try: + _restart_slash_worker(session) + except Exception: + pass def _get_usage(agent) -> dict: @@ -2965,6 +2982,17 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None: "History changed during this turn — the response above is visible " "but was not saved to session history." ) + + # If auto-compression fired inside run_conversation(), agent.session_id + # may have rotated. Sync session_key before downstream title/goal/finalize + # handling uses it. Preserve pending_title (user intent) so it can be + # applied to the continuation. Restart slash worker so subsequent + # worker-backed commands (/title etc.) target the live session. + # Fix for #20001. + _sync_session_key_after_compress( + sid, session, clear_pending_title=False, restart_slash_worker=True, + ) + raw = result.get("final_response", "") status = ( "interrupted" @@ -3042,11 +3070,21 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None: if _pending and status == "complete": _pdb = _get_db() if _pdb: + _session_key = session.get("session_key") or sid try: - if _pdb.set_session_title(session.get("session_key") or sid, _pending): + if _pdb.set_session_title(_session_key, _pending): session["pending_title"] = None + except ValueError as exc: + # Invalid/duplicate title — non-retryable, drop it. + # Auto-title will take over. Fix for #19029. + session["pending_title"] = None + logger.info( + "Dropping pending title for session %s: %s", + _session_key, exc, + ) except Exception: - pass # Best effort — auto-title will handle it below + # Transient DB failure — keep pending_title for retry. + pass if ( status == "complete"