fix: resolve lazy session creation regressions (#18370 fallout) (#20363)

Fix three regressions introduced by PR #18370 (lazy session creation):

1. _finalize_session() uses stale session_key after compression (#20001)
2. session_key not synced after auto-compression in run_conversation (#20001)
3. pending_title ValueError leaves title wedged forever (#19029)
4. Gateway silently swallows null responses when agent did work (#18765)
5. One-time cleanup for accumulated ghost compression continuations (#20001)

Changes:
- tui_gateway/server.py: _finalize_session() now uses agent.session_id
  (falls back to session_key when agent is None). Refactor
  _sync_session_key_after_compress() with clear_pending_title and
  restart_slash_worker policy flags. Call it post-run_conversation()
  to sync session_key after auto-compression. Add ValueError handler
  to pending_title flush.
- gateway/run.py: Extract _normalize_empty_agent_response() helper that
  consolidates failed/partial/null response handling. Surfaces user-facing
  error when agent did work (api_calls > 0) but returned no text.
- hermes_state.py: Add finalize_orphaned_compression_sessions() — marks
  ghost continuation sessions as ended (non-destructive, preserves data).
- cli.py: One-time startup migration for orphaned compression sessions.

Test changes:
- tests/test_tui_gateway_server.py: Update pending_title ValueError test
  for post-#18370 architecture (title applied post-message, not at create).
- tests/test_lazy_session_regressions.py: 14 new regression tests covering
  all fixed paths.
This commit is contained in:
Siddharth Balyan 2026-05-06 01:11:49 +05:30 committed by GitHub
parent 0397be5939
commit 3b750715a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 809 additions and 74 deletions

12
cli.py
View file

@ -940,6 +940,18 @@ def _run_state_db_auto_maintenance(session_db) -> None:
except Exception as _prune_exc: except Exception as _prune_exc:
logger.debug("Ghost session prune skipped: %s", _prune_exc) logger.debug("Ghost session prune skipped: %s", _prune_exc)
# One-time finalize of orphaned compression continuations (#20001).
try:
if not session_db.get_meta("orphaned_compression_finalize_v1"):
finalized = session_db.finalize_orphaned_compression_sessions()
session_db.set_meta("orphaned_compression_finalize_v1", "1")
if finalized:
logger.info(
"Finalized %d orphaned compression sessions", finalized
)
except Exception as _finalize_exc:
logger.debug("Orphan compression finalize skipped: %s", _finalize_exc)
cfg = (_load_full_config().get("sessions") or {}) cfg = (_load_full_config().get("sessions") or {})
if not cfg.get("auto_prune", False): if not cfg.get("auto_prune", False):
return return

View file

@ -939,6 +939,52 @@ import weakref as _weakref
_gateway_runner_ref: _weakref.ref = lambda: None _gateway_runner_ref: _weakref.ref = lambda: None
def _normalize_empty_agent_response(
agent_result: dict,
response: str,
*,
history_len: int = 0,
) -> str:
"""Normalize empty/None agent responses into user-facing messages.
Consolidates the existing ``failed`` handler and adds a catch-all for
the case where the agent did work (api_calls > 0) but returned no text.
Fix for #18765.
"""
if response:
return response
if agent_result.get("failed"):
error_detail = agent_result.get("error", "unknown error")
error_str = str(error_detail).lower()
is_context_failure = any(
p in error_str
for p in ("context", "token", "too large", "too long", "exceed", "payload")
) or ("400" in error_str and history_len > 50)
if is_context_failure:
return (
"⚠️ Session too large for the model's context window.\n"
"Use /compact to compress the conversation, or "
"/reset to start fresh."
)
return (
f"The request failed: {str(error_detail)[:300]}\n"
"Try again or use /reset to start a fresh session."
)
api_calls = int(agent_result.get("api_calls", 0) or 0)
if api_calls > 0 and not agent_result.get("interrupted"):
if agent_result.get("partial"):
err = agent_result.get("error", "processing incomplete")
return f"⚠️ Processing stopped: {str(err)[:200]}. Try again."
return (
"⚠️ Processing completed but no response was generated. "
"This may be a transient error — try sending your message again."
)
return response
class GatewayRunner: class GatewayRunner:
""" """
Main gateway controller. Main gateway controller.
@ -6439,33 +6485,11 @@ class GatewayRunner:
session_key, _e, session_key, _e,
) )
# Surface error details when the agent failed silently (final_response=None) # Normalize empty responses: surface errors, partial failures, and
if not response and agent_result.get("failed"): # the case where agent did work but returned no text. Fix for #18765.
error_detail = agent_result.get("error", "unknown error") response = _normalize_empty_agent_response(
error_str = str(error_detail).lower() agent_result, response, history_len=len(history),
)
# Detect context-overflow failures and give specific guidance.
# Generic 400 "Error" from Anthropic with large sessions is the
# most common cause of this (#1630).
_is_ctx_fail = any(p in error_str for p in (
"context", "token", "too large", "too long",
"exceed", "payload",
)) or (
"400" in error_str
and len(history) > 50
)
if _is_ctx_fail:
response = (
"⚠️ Session too large for the model's context window.\n"
"Use /compact to compress the conversation, or "
"/reset to start fresh."
)
else:
response = (
f"The request failed: {str(error_detail)[:300]}\n"
"Try again or use /reset to start a fresh session."
)
# If the agent's session_id changed during compression, update # If the agent's session_id changed during compression, update
# session_entry so transcript writes below go to the right session. # session_entry so transcript writes below go to the right session.

View file

@ -718,6 +718,45 @@ class SessionDB:
self._remove_session_files(sessions_dir, sid) self._remove_session_files(sessions_dir, sid)
return len(removed_ids) return len(removed_ids)
def finalize_orphaned_compression_sessions(self) -> int:
"""Mark orphaned compression continuation sessions as ended.
Targets child sessions that were never finalized: parent is ended
with reason='compression', child has messages but no end_reason/ended_at
and api_call_count=0. Non-destructive: preserves all messages and sets
end_reason='orphaned_compression'. Fix for #20001.
"""
cutoff = time.time() - 604800 # 7 days
def _do(conn):
now = time.time()
result = conn.execute(
"""
UPDATE sessions
SET ended_at = ?,
end_reason = 'orphaned_compression'
WHERE api_call_count = 0
AND end_reason IS NULL
AND ended_at IS NULL
AND started_at < ?
AND parent_session_id IS NOT NULL
AND EXISTS (
SELECT 1 FROM sessions p
WHERE p.id = sessions.parent_session_id
AND p.end_reason = 'compression'
AND p.ended_at IS NOT NULL
)
AND EXISTS (
SELECT 1 FROM messages m
WHERE m.session_id = sessions.id
)
""",
(now, cutoff),
)
return result.rowcount
return self._execute_write(_do) or 0
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get a session by ID.""" """Get a session by ID."""
with self._lock: with self._lock:

View file

@ -0,0 +1,608 @@
"""Reproduction tests for #18370 fallout: lazy session creation regressions.
Tests cover:
1. Bug #20001 — _finalize_session() uses stale session_key after compression rotation
2. Bug #20001 — _sync_session_key_after_compress called post-run_conversation
3. Bug #19029 — pending_title ValueError leaves title wedged
4. Bug #18765 — gateway surfaces null response when agent did work
5. Prune finalize_orphaned_compression_sessions catches ghost continuations
"""
import threading
import time
import types
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ===========================================================================
# Helpers
# ===========================================================================
def _make_session_db(tmp_path):
"""Create a real SessionDB for integration-style tests."""
from hermes_state import SessionDB
db_path = tmp_path / "test_state.db"
return SessionDB(db_path=db_path)
def _tui_session(agent=None, session_key="session-key-old", **extra):
"""Minimal TUI gateway session dict matching server._sessions values."""
return {
"agent": agent if agent is not None else types.SimpleNamespace(session_id=session_key),
"session_key": session_key,
"history": [],
"history_lock": threading.Lock(),
"history_version": 0,
"running": False,
"attached_images": [],
"image_counter": 0,
"cols": 80,
"slash_worker": None,
"show_reasoning": False,
"tool_progress_mode": "all",
"pending_title": None,
**extra,
}
# ===========================================================================
# Bug #20001: _finalize_session uses stale session_key
# ===========================================================================
class TestFinalizeSessionUsesAgentSessionId:
"""After compression rotates agent.session_id, _finalize_session()
must call end_session() on the NEW (current) session_id, not the stale
session_key stored in the session dict."""
def test_finalize_targets_agent_session_id_not_stale_key(self, tmp_path):
"""Reproduction: agent.session_id rotated by compression, but
session['session_key'] still holds old value. _finalize_session()
should end the agent's current session."""
from tui_gateway import server
db = _make_session_db(tmp_path)
# Create two sessions: parent (already ended by compression) and continuation
db.create_session(session_id="parent-session", source="tui", model="test")
db.end_session("parent-session", "compression")
db.create_session(
session_id="continuation-session",
source="tui",
model="test",
parent_session_id="parent-session",
)
# Continuation is NOT ended — this is the bug state
# Agent has rotated to continuation session
agent = types.SimpleNamespace(
session_id="continuation-session",
commit_memory_session=lambda h: None,
)
# Session dict still holds stale key (the bug condition)
session = _tui_session(
agent=agent,
session_key="parent-session",
history=[{"role": "user", "content": "hello"}],
)
# Monkeypatch _get_db to return our test DB
with patch.object(server, "_get_db", return_value=db):
with patch.object(server, "_notify_session_boundary", lambda *a: None):
server._finalize_session(session, end_reason="tui_close")
# The continuation session should be ended
continuation = db.get_session("continuation-session")
assert continuation["ended_at"] is not None, (
"_finalize_session should end the agent's current session (continuation), "
"not the already-ended parent"
)
assert continuation["end_reason"] == "tui_close"
def test_finalize_fallback_to_session_key_when_agent_is_none(self, tmp_path):
"""When agent is None (e.g. session never fully initialized),
_finalize_session falls back to session_key."""
from tui_gateway import server
db = _make_session_db(tmp_path)
db.create_session(session_id="orphan-key", source="tui", model="test")
session = _tui_session(agent=None, session_key="orphan-key")
with patch.object(server, "_get_db", return_value=db):
with patch.object(server, "_notify_session_boundary", lambda *a: None):
server._finalize_session(session, end_reason="tui_close")
row = db.get_session("orphan-key")
assert row["ended_at"] is not None
assert row["end_reason"] == "tui_close"
# ===========================================================================
# Bug #20001: _sync_session_key_after_compress post-run_conversation
# ===========================================================================
class TestSyncSessionKeyAfterAutoCompress:
"""When auto-compression fires inside run_conversation(), the post-turn
code in _run_prompt_submit must call _sync_session_key_after_compress
to update session_key for downstream consumers (title, goals, etc.)."""
def test_session_key_synced_after_run_conversation_with_compression(self, monkeypatch):
"""Simulate: run_conversation() internally compresses and rotates
agent.session_id. After it returns, session['session_key'] must match."""
from tui_gateway import server
class _CompressingAgent:
"""Agent that simulates compression-driven session_id rotation."""
def __init__(self):
self.session_id = "pre-compress-key"
self._cached_system_prompt = ""
def run_conversation(self, prompt, conversation_history=None, stream_callback=None):
# Simulate what _compress_context does: rotate session_id
self.session_id = "post-compress-key"
return {
"final_response": "done",
"messages": [
{"role": "user", "content": prompt},
{"role": "assistant", "content": "done"},
],
}
agent = _CompressingAgent()
session = _tui_session(agent=agent, session_key="pre-compress-key")
# Track if _sync_session_key_after_compress was called
sync_calls = []
original_sync = server._sync_session_key_after_compress
def _tracking_sync(sid, sess, **kwargs):
sync_calls.append((sid, sess.get("session_key")))
# Just update the key directly (skip approval routing etc.)
new_id = getattr(sess.get("agent"), "session_id", None) or ""
if new_id and new_id != sess.get("session_key"):
sess["session_key"] = new_id
monkeypatch.setattr(server, "_sync_session_key_after_compress", _tracking_sync)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
# Use _ImmediateThread pattern to run synchronously
class _ImmediateThread:
def __init__(self, target=None, daemon=None, **kw):
self._target = target
def start(self):
self._target()
server._sessions["test-sid"] = session
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
try:
server.handle_request({
"id": "1",
"method": "prompt.submit",
"params": {"session_id": "test-sid", "text": "hello"},
})
# Sync should have been called
assert len(sync_calls) > 0, (
"_sync_session_key_after_compress must be called after run_conversation "
"to pick up compression-driven session_id rotation"
)
# session_key should now match agent.session_id
assert session["session_key"] == "post-compress-key", (
"session_key must be updated to match agent.session_id after compression"
)
finally:
server._sessions.pop("test-sid", None)
# ===========================================================================
# Bug #19029: pending_title ValueError wedge
# ===========================================================================
class TestPendingTitleValueError:
"""When set_session_title raises ValueError (duplicate/invalid title),
pending_title must be cleared not left wedged forever."""
def test_valueerror_clears_pending_title(self, monkeypatch):
"""ValueError from set_session_title should drop pending_title."""
from tui_gateway import server
mock_db = MagicMock()
mock_db.set_session_title.side_effect = ValueError("duplicate title")
class _Agent:
session_id = "test-session"
_cached_system_prompt = ""
def run_conversation(self, prompt, **kw):
return {
"final_response": "ok",
"messages": [{"role": "assistant", "content": "ok"}],
}
session = _tui_session(
agent=_Agent(),
session_key="test-session",
pending_title="My Title",
)
monkeypatch.setattr(server, "_get_db", lambda: mock_db)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
monkeypatch.setattr(
server, "_sync_session_key_after_compress", lambda *a, **kw: None
)
class _ImmediateThread:
def __init__(self, target=None, daemon=None, **kw):
self._target = target
def start(self):
self._target()
server._sessions["sid"] = session
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
try:
server.handle_request({
"id": "1",
"method": "prompt.submit",
"params": {"session_id": "sid", "text": "hello"},
})
# pending_title should be cleared on ValueError, not left wedged
assert session.get("pending_title") is None, (
"ValueError from set_session_title must clear pending_title "
"so auto-title can take over"
)
finally:
server._sessions.pop("sid", None)
def test_other_exception_keeps_pending_title_for_retry(self, monkeypatch):
"""Non-ValueError exceptions should keep pending_title for retry."""
from tui_gateway import server
mock_db = MagicMock()
mock_db.set_session_title.side_effect = RuntimeError("transient DB lock")
class _Agent:
session_id = "test-session"
_cached_system_prompt = ""
def run_conversation(self, prompt, **kw):
return {
"final_response": "ok",
"messages": [{"role": "assistant", "content": "ok"}],
}
session = _tui_session(
agent=_Agent(),
session_key="test-session",
pending_title="My Title",
)
monkeypatch.setattr(server, "_get_db", lambda: mock_db)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
monkeypatch.setattr(
server, "_sync_session_key_after_compress", lambda *a, **kw: None
)
class _ImmediateThread:
def __init__(self, target=None, daemon=None, **kw):
self._target = target
def start(self):
self._target()
server._sessions["sid"] = session
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
try:
server.handle_request({
"id": "1",
"method": "prompt.submit",
"params": {"session_id": "sid", "text": "hello"},
})
# Non-ValueError should keep pending_title for retry
assert session.get("pending_title") == "My Title", (
"Non-ValueError exceptions should keep pending_title intact "
"for retry on next turn"
)
finally:
server._sessions.pop("sid", None)
# ===========================================================================
# Bug #18765: Gateway surfaces null response
# ===========================================================================
class TestGatewaySurfacesNullResponse:
"""When the agent does work (api_calls > 0) but returns no final_response,
the gateway must surface an error to the user instead of silently sending
nothing. Tests exercise the production _normalize_empty_agent_response helper."""
def test_partial_response_surfaces_error(self):
"""Agent returns partial=True with no response → user sees error."""
from gateway.run import _normalize_empty_agent_response
agent_result = {
"final_response": None,
"api_calls": 5,
"partial": True,
"interrupted": False,
"error": "Model generated invalid tool call: nonexistent_tool",
}
response = agent_result.get("final_response") or ""
response = _normalize_empty_agent_response(
agent_result, response, history_len=10,
)
assert response != "", "Null response with api_calls>0 must be surfaced"
assert "nonexistent_tool" in response
def test_interrupted_response_stays_empty(self):
"""Interrupted agent → response stays empty (platform handles UX)."""
from gateway.run import _normalize_empty_agent_response
agent_result = {
"final_response": None,
"api_calls": 3,
"partial": False,
"interrupted": True,
}
response = agent_result.get("final_response") or ""
response = _normalize_empty_agent_response(
agent_result, response, history_len=10,
)
assert response == "", "Interrupted turns should not get synthetic responses"
def test_failed_context_overflow(self):
"""Agent failed with context overflow → specific guidance message."""
from gateway.run import _normalize_empty_agent_response
agent_result = {
"final_response": None,
"api_calls": 0,
"failed": True,
"error": "400 Bad Request: context length exceeded",
}
response = agent_result.get("final_response") or ""
response = _normalize_empty_agent_response(
agent_result, response, history_len=60,
)
assert "context window" in response
assert "/compact" in response
def test_failed_generic_error(self):
"""Agent failed with non-context error → generic error message."""
from gateway.run import _normalize_empty_agent_response
agent_result = {
"final_response": None,
"api_calls": 0,
"failed": True,
"error": "500 Internal Server Error",
}
response = agent_result.get("final_response") or ""
response = _normalize_empty_agent_response(
agent_result, response, history_len=5,
)
assert "500 Internal Server Error" in response
assert "/reset" in response
def test_nonempty_response_passes_through(self):
"""Non-empty response is returned unchanged."""
from gateway.run import _normalize_empty_agent_response
agent_result = {"final_response": "Hello!", "api_calls": 1}
response = "Hello!"
result = _normalize_empty_agent_response(
agent_result, response, history_len=5,
)
assert result == "Hello!"
# ===========================================================================
# Prune: finalize_orphaned_compression_sessions
# ===========================================================================
class TestFinalizeOrphanedCompressionSessions:
"""The prune migration marks ghost compression continuations as ended."""
def test_marks_ghost_continuation_with_compression_parent(self, tmp_path):
"""Ghost session with compression-ended parent + messages → finalized."""
db = _make_session_db(tmp_path)
# Parent session (ended by compression — this is the key condition)
db.create_session(session_id="parent", source="tui", model="test")
db.end_session("parent", "compression")
# Ghost continuation (has messages, never finalized)
db.create_session(
session_id="ghost-cont",
source="tui",
model="test",
parent_session_id="parent",
)
db.append_message("ghost-cont", role="user", content="hello")
db.append_message("ghost-cont", role="assistant", content="hi")
# Make it old enough (fake started_at)
db._execute_write(
lambda conn: conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 800000, "ghost-cont"), # ~9 days old
)
)
count = db.finalize_orphaned_compression_sessions()
assert count == 1
session = db.get_session("ghost-cont")
assert session["ended_at"] is not None
assert session["end_reason"] == "orphaned_compression"
def test_skips_session_without_parent(self, tmp_path):
"""Ghost session without parent_session_id is NOT a compression
continuation should not be touched by this prune."""
db = _make_session_db(tmp_path)
db.create_session(session_id="ghost-notitle", source="tui", model="test")
db.append_message("ghost-notitle", role="user", content="test")
db._execute_write(
lambda conn: conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 800000, "ghost-notitle"),
)
)
count = db.finalize_orphaned_compression_sessions()
assert count == 0
def test_skips_recent_sessions(self, tmp_path):
"""Sessions younger than 7 days are not touched."""
db = _make_session_db(tmp_path)
# Create parent first to satisfy FK constraint
db.create_session(session_id="some-parent", source="tui", model="test")
db.create_session(
session_id="recent",
source="tui",
model="test",
parent_session_id="some-parent",
)
db.append_message("recent", role="user", content="hello")
# started_at is now() — within 7 days
count = db.finalize_orphaned_compression_sessions()
assert count == 0
def test_skips_sessions_with_end_reason(self, tmp_path):
"""Properly finalized sessions (even without api_call_count) are skipped."""
db = _make_session_db(tmp_path)
# Create parent first to satisfy FK constraint
db.create_session(session_id="parent", source="tui", model="test")
db.end_session("parent", "compression")
db.create_session(
session_id="already-ended",
source="tui",
model="test",
parent_session_id="parent",
)
db.append_message("already-ended", role="user", content="hello")
db.end_session("already-ended", "user_exit")
db._execute_write(
lambda conn: conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 800000, "already-ended"),
)
)
count = db.finalize_orphaned_compression_sessions()
assert count == 0
def test_skips_session_with_non_compression_parent(self, tmp_path):
"""Child session whose parent was NOT ended by compression should
not be touched it's not from the compression continuation path."""
db = _make_session_db(tmp_path)
# Parent ended by user_exit, not compression
db.create_session(session_id="parent", source="tui", model="test")
db.end_session("parent", "user_exit")
db.create_session(
session_id="child",
source="tui",
model="test",
parent_session_id="parent",
)
db.append_message("child", role="user", content="hello")
db._execute_write(
lambda conn: conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 800000, "child"),
)
)
count = db.finalize_orphaned_compression_sessions()
assert count == 0
def test_skips_sessions_without_messages(self, tmp_path):
"""Empty sessions (no messages) are NOT targeted by this prune —
those are handled by prune_empty_ghost_sessions()."""
db = _make_session_db(tmp_path)
# Create parent first to satisfy FK constraint
db.create_session(session_id="parent", source="tui", model="test")
db.end_session("parent", "compression")
db.create_session(
session_id="empty-ghost",
source="tui",
model="test",
parent_session_id="parent",
)
# No messages appended
db._execute_write(
lambda conn: conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 800000, "empty-ghost"),
)
)
count = db.finalize_orphaned_compression_sessions()
assert count == 0
def test_titled_ghost_with_parent_is_caught(self, tmp_path):
"""Ghost continuation that HAS a title (propagated from parent by
_compress_context) is still caught via parent with end_reason='compression'."""
db = _make_session_db(tmp_path)
# Create parent first — ended by compression
db.create_session(session_id="parent", source="tui", model="test")
db.set_session_title("parent", "Chat")
db.end_session("parent", "compression")
db.create_session(
session_id="titled-ghost",
source="tui",
model="test",
parent_session_id="parent",
)
db.set_session_title("titled-ghost", "Chat (2)")
db.append_message("titled-ghost", role="user", content="continued...")
db._execute_write(
lambda conn: conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 800000, "titled-ghost"),
)
)
count = db.finalize_orphaned_compression_sessions()
assert count == 1
session = db.get_session("titled-ghost")
assert session["end_reason"] == "orphaned_compression"

View file

@ -921,56 +921,70 @@ def test_session_title_set_errors_when_row_lookup_fails_after_noop(monkeypatch):
def test_session_create_drops_pending_title_on_valueerror(monkeypatch): def test_session_create_drops_pending_title_on_valueerror(monkeypatch):
unblock_agent = threading.Event() """When set_session_title raises ValueError during post-message title flush,
pending_title should be dropped (non-retryable). Updated for post-#18370
lazy session creation where title is applied post-first-message.
"""
class _FakeWorker: class _Agent:
def __init__(self, key, model): session_id = "test-session"
self.key = key
def close(self):
return None
class _FakeAgent:
model = "x" model = "x"
provider = "openrouter" provider = "openrouter"
base_url = "" base_url = ""
api_key = "" api_key = ""
_cached_system_prompt = ""
def run_conversation(self, prompt, **kw):
return {
"final_response": "ok",
"messages": [{"role": "assistant", "content": "ok"}],
}
class _FakeDB: class _FakeDB:
def create_session(self, _key, source="tui", model=None):
return None
def set_session_title(self, _key, _title): def set_session_title(self, _key, _title):
raise ValueError("Title already in use") raise ValueError("Title already in use")
def _make_agent(_sid, _key): class _ImmediateThread:
unblock_agent.wait(timeout=2.0) def __init__(self, target=None, daemon=None, **kw):
return _FakeAgent() self._target = target
monkeypatch.setattr(server, "_make_agent", _make_agent) def start(self):
monkeypatch.setattr(server, "_SlashWorker", _FakeWorker) self._target()
agent = _Agent()
session = {
"agent": agent,
"session_key": "test-session",
"history": [],
"history_lock": threading.Lock(),
"history_version": 0,
"running": False,
"attached_images": [],
"image_counter": 0,
"cols": 80,
"slash_worker": None,
"show_reasoning": False,
"tool_progress_mode": "all",
"pending_title": "duplicate title",
}
server._sessions["sid"] = session
monkeypatch.setattr(server, "_get_db", lambda: _FakeDB()) monkeypatch.setattr(server, "_get_db", lambda: _FakeDB())
monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"})
monkeypatch.setattr(server, "_probe_credentials", lambda _a: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
import tools.approval as _approval monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
monkeypatch.setattr(
monkeypatch.setattr(_approval, "register_gateway_notify", lambda key, cb: None) server, "_sync_session_key_after_compress", lambda *a, **kw: None
monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None)
resp = server.handle_request(
{"id": "1", "method": "session.create", "params": {"cols": 80}}
) )
sid = resp["result"]["session_id"] monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
session = server._sessions[sid]
session["pending_title"] = "duplicate title"
unblock_agent.set()
session["agent_ready"].wait(timeout=2.0)
assert session["pending_title"] is None try:
server._sessions.pop(sid, None) server.handle_request(
{"id": "1", "method": "prompt.submit", "params": {"session_id": "sid", "text": "hello"}}
)
assert session["pending_title"] is None
finally:
server._sessions.pop("sid", None)
def test_config_set_yolo_toggles_session_scope(): def test_config_set_yolo_toggles_session_scope():

View file

@ -304,12 +304,14 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No
_notify_session_boundary("on_session_finalize", session_id) _notify_session_boundary("on_session_finalize", session_id)
# Mark session ended in DB so it doesn't linger as a ghost row in /resume. # Mark session ended in DB so it doesn't linger as a ghost row in /resume.
# Adapted from #18283 (luyao618) and #18299 (Bartok9). # Use session_id (from agent.session_id) not session_key — after compression,
if session_key: # session_key may be stale (the ended parent) while session_id is the live
# continuation. Fix for #20001.
if session_id:
try: try:
db = _get_db() db = _get_db()
if db is not None: if db is not None:
db.end_session(session_key, end_reason) db.end_session(session_id, end_reason)
except Exception: except Exception:
pass pass
@ -1175,7 +1177,13 @@ def _compress_session_history(
return len(history) - len(compressed), usage return len(history) - len(compressed), usage
def _sync_session_key_after_compress(sid: str, session: dict) -> None: def _sync_session_key_after_compress(
sid: str,
session: dict,
*,
clear_pending_title: bool = True,
restart_slash_worker: bool = True,
) -> None:
"""Re-anchor session_key when AIAgent._compress_context rotates session_id. """Re-anchor session_key when AIAgent._compress_context rotates session_id.
AIAgent._compress_context ends the current SessionDB session and creates AIAgent._compress_context ends the current SessionDB session and creates
@ -1184,7 +1192,14 @@ def _sync_session_key_after_compress(sid: str, session: dict) -> None:
approval routing, slash worker init, DB title/history lookups, yolo approval routing, slash worker init, DB title/history lookups, yolo
state). Without this sync, those operations would target the ended state). Without this sync, those operations would target the ended
parent session while the agent writes to the new continuation session. parent session while the agent writes to the new continuation session.
Mirrors HermesCLI._manual_compress's session_id sync.
Policy flags:
clear_pending_title: True for manual /compress (title belongs to old
session). False for post-turn auto-compression (preserve user
intent so pending_title can be applied to the continuation).
restart_slash_worker: True for manual /compress and post-turn
auto-compression (worker holds stale session key). False only
if the caller manages the worker lifecycle separately.
""" """
agent = session.get("agent") agent = session.get("agent")
new_session_id = getattr(agent, "session_id", None) or "" new_session_id = getattr(agent, "session_id", None) or ""
@ -1229,11 +1244,13 @@ def _sync_session_key_after_compress(sid: str, session: dict) -> None:
# don't keep targeting the ended row. # don't keep targeting the ended row.
session["session_key"] = new_session_id session["session_key"] = new_session_id
session["pending_title"] = None if clear_pending_title:
try: session["pending_title"] = None
_restart_slash_worker(session) if restart_slash_worker:
except Exception: try:
pass _restart_slash_worker(session)
except Exception:
pass
def _get_usage(agent) -> dict: def _get_usage(agent) -> dict:
@ -2965,6 +2982,17 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None:
"History changed during this turn — the response above is visible " "History changed during this turn — the response above is visible "
"but was not saved to session history." "but was not saved to session history."
) )
# If auto-compression fired inside run_conversation(), agent.session_id
# may have rotated. Sync session_key before downstream title/goal/finalize
# handling uses it. Preserve pending_title (user intent) so it can be
# applied to the continuation. Restart slash worker so subsequent
# worker-backed commands (/title etc.) target the live session.
# Fix for #20001.
_sync_session_key_after_compress(
sid, session, clear_pending_title=False, restart_slash_worker=True,
)
raw = result.get("final_response", "") raw = result.get("final_response", "")
status = ( status = (
"interrupted" "interrupted"
@ -3042,11 +3070,21 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None:
if _pending and status == "complete": if _pending and status == "complete":
_pdb = _get_db() _pdb = _get_db()
if _pdb: if _pdb:
_session_key = session.get("session_key") or sid
try: try:
if _pdb.set_session_title(session.get("session_key") or sid, _pending): if _pdb.set_session_title(_session_key, _pending):
session["pending_title"] = None session["pending_title"] = None
except ValueError as exc:
# Invalid/duplicate title — non-retryable, drop it.
# Auto-title will take over. Fix for #19029.
session["pending_title"] = None
logger.info(
"Dropping pending title for session %s: %s",
_session_key, exc,
)
except Exception: except Exception:
pass # Best effort — auto-title will handle it below # Transient DB failure — keep pending_title for retry.
pass
if ( if (
status == "complete" status == "complete"