fix(tui): persist session messages on force-quit / signal shutdown

Mirror the CLI's exit-path behaviour in the TUI gateway so that
unpersisted conversation messages are flushed to state.db and the
on_session_end plugin hook fires before the session is closed.

Root cause: _finalize_session() only called db.end_session() to
mark the session row as ended, but did NOT flush in-memory messages
via _persist_session() or fire the on_session_end hook.  When the
user force-quit (double Ctrl-C, terminal-close, SIGHUP) while the
agent was mid-turn, messages accumulated since the last persist
point were silently lost.

Changes
-------
tui_gateway/server.py - _finalize_session():
  - Persist unflushed messages via agent._persist_session() before
    db.end_session(). Prefers agent._session_messages (set by the
    last _persist_session call inside run_conversation) over
    session['history'] (stale when agent is mid-turn).
  - Fire on_session_end(interrupted=True) plugin hook so crash-
    recovery plugins can flush buffers, matching cli.py behaviour.

tui_gateway/entry.py - _log_signal():
  - Explicitly call _shutdown_sessions() before sys.exit(0) in the
    SIGHUP/SIGTERM handler as belt-and-suspenders over atexit.

tests/tui_gateway/test_finalize_session_persist.py (new):
  - 11 tests covering: history persistence, _session_messages
    priority, empty-history skip, missing-agent, double-finalize,
    persist-exception resilience, hook firing, hook-exception
    resilience, and db.end_session preservation.

Related
-------
Closes the TUI half of #5021 (CLI already handles this via its
atexit handler).  Also addresses the session-persistence gap
discussed in #18465 and #18269.
This commit is contained in:
bogerman1 2026-05-09 10:49:43 +08:00 committed by Teknium
parent e499d69e3e
commit c7e8854cb3
3 changed files with 287 additions and 1 deletions

View file

@ -0,0 +1,221 @@
"""
Integration test: verify _finalize_session persists messages on force-quit.
Tests the fix for TUI sessions losing conversation history when the
user interrupts and exits before the agent thread finishes flushing.
Scenarios:
1. Normal interrupt (single Ctrl+C) messages already in session["history"]
2. Force-quit mid-tool (double Ctrl+C) session["history"] has previous turns
3. Empty session no-op, no crash
4. Agent with _persist_session missing graceful no-op
"""
import threading
import time
from unittest.mock import MagicMock, PropertyMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_agent(history=None, session_id="test_session_001"):
"""Build a mock AIAgent with enough surface for _finalize_session."""
agent = MagicMock()
agent._persist_session = MagicMock()
agent.commit_memory_session = MagicMock()
agent.session_id = session_id
agent.model = "test-model"
agent.platform = "tui"
# _session_messages must be explicitly absent (None), otherwise
# MagicMock auto-creates it and getattr returns a truthy mock.
agent._session_messages = None
return agent
def _make_session(agent=None, history=None, session_key="test_key_001"):
return {
"agent": agent,
"history": history or [],
"history_lock": threading.Lock(),
"session_key": session_key,
"_finalized": False,
}
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestFinalizeSessionPersist:
"""Verify _finalize_session flushes messages via _persist_session."""
def test_persist_called_with_history(self):
"""History from session is passed to agent._persist_session.
When _session_messages is None (not yet set by any turn),
the session["history"] is used as the snapshot.
"""
from tui_gateway.server import _finalize_session
history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
agent = _make_agent()
session = _make_session(agent=agent, history=history)
_finalize_session(session, end_reason="test")
agent._persist_session.assert_called_once()
# snapshot = history (since _session_messages is None)
called_with = agent._persist_session.call_args[0][0]
assert called_with == history
# conversation_history kwarg passed for correct flush indexing
assert agent._persist_session.call_args[1].get("conversation_history") == history
def test_persist_uses_session_messages_when_available(self):
"""agent._session_messages takes priority over session['history']."""
from tui_gateway.server import _finalize_session
history = [{"role": "user", "content": "old"}]
session_msgs = [
{"role": "user", "content": "old"},
{"role": "assistant", "content": "newer"},
]
agent = _make_agent()
agent._session_messages = session_msgs
session = _make_session(agent=agent, history=history)
_finalize_session(session)
agent._persist_session.assert_called_once()
called_with = agent._persist_session.call_args[0][0]
assert called_with == session_msgs # _session_messages wins
assert agent._persist_session.call_args[1].get("conversation_history") == history
def test_commit_memory_still_called(self):
"""Existing memory commit path is preserved."""
from tui_gateway.server import _finalize_session
history = [{"role": "user", "content": "x"}]
agent = _make_agent()
session = _make_session(agent=agent, history=history)
_finalize_session(session)
agent.commit_memory_session.assert_called_once()
def test_no_agent_no_crash(self):
"""Session with agent=None exits cleanly."""
from tui_gateway.server import _finalize_session
session = _make_session(agent=None, history=[{"role": "user", "content": "x"}])
_finalize_session(session) # must not raise
def test_empty_history_skips_persist(self):
"""Empty history → _persist_session not called (guard)."""
from tui_gateway.server import _finalize_session
agent = _make_agent()
session = _make_session(agent=agent, history=[])
_finalize_session(session)
agent._persist_session.assert_not_called()
def test_no_persist_method_skips(self):
"""Agent without _persist_session attribute → graceful skip."""
from tui_gateway.server import _finalize_session
agent = _make_agent()
del agent._persist_session # simulate older agent without the method
session = _make_session(
agent=agent,
history=[{"role": "user", "content": "x"}],
)
_finalize_session(session) # must not raise
def test_already_finalized_skips(self):
"""Double-finalize is a no-op."""
from tui_gateway.server import _finalize_session
agent = _make_agent()
session = _make_session(agent=agent, history=[{"role": "user", "content": "x"}])
session["_finalized"] = True
_finalize_session(session)
agent._persist_session.assert_not_called()
def test_persist_exception_does_not_block(self):
"""If _persist_session raises, finalization continues."""
from tui_gateway.server import _finalize_session
agent = _make_agent()
agent._persist_session.side_effect = RuntimeError("db is down")
session = _make_session(
agent=agent,
history=[{"role": "user", "content": "x"}],
)
_finalize_session(session) # must not raise
# commit_memory_session should still be called
agent.commit_memory_session.assert_called_once()
@patch("tui_gateway.server._get_db")
def test_db_end_session_still_called(self, mock_get_db):
"""Existing db.end_session() path is preserved after the new code."""
from tui_gateway.server import _finalize_session
mock_db = MagicMock()
mock_get_db.return_value = mock_db
agent = _make_agent(session_id="sess_123")
session = _make_session(agent=agent, history=[{"role": "user", "content": "x"}])
_finalize_session(session, end_reason="test")
mock_db.end_session.assert_called_once_with("sess_123", "test")
class TestOnSessionEndHook:
"""Verify on_session_end plugin hook fires on finalize."""
@patch("hermes_cli.plugins.invoke_hook")
def test_hook_fired_with_interrupted_true(self, mock_invoke_hook):
"""on_session_end is called with interrupted=True when finalizing."""
from tui_gateway.server import _finalize_session
agent = _make_agent(session_id="hook_test_001")
agent.model = "claude-sonnet-4"
agent.platform = "tui"
session = _make_session(agent=agent, history=[{"role": "user", "content": "test"}])
_finalize_session(session, end_reason="tui_close")
mock_invoke_hook.assert_any_call(
"on_session_end",
session_id="hook_test_001",
completed=False,
interrupted=True,
model="claude-sonnet-4",
platform="tui",
)
@patch("hermes_cli.plugins.invoke_hook")
def test_hook_exception_does_not_block(self, mock_invoke_hook):
"""Hook failure doesn't prevent session finalization."""
from tui_gateway.server import _finalize_session
mock_invoke_hook.side_effect = RuntimeError("plugin crash")
agent = _make_agent()
session = _make_session(agent=agent, history=[{"role": "user", "content": "x"}])
_finalize_session(session) # must not raise
agent.commit_memory_session.assert_called_once()

View file

@ -130,6 +130,19 @@ def _log_signal(signum: int, frame) -> None:
timer.daemon = True
timer.start()
# ── Flush sessions before exit ───────────────────────────────────
# The atexit handler (_shutdown_sessions) is registered in
# tui_gateway/server.py, but a worker thread holding the GIL or
# _stdout_lock can block atexit from completing within the grace
# window. Explicitly finalize sessions here so that unpersisted
# messages reach state.db before the hard-exit timer fires.
try:
from tui_gateway.server import _shutdown_sessions
_shutdown_sessions()
except Exception:
pass
try:
sys.exit(0)
except SystemExit:

View file

@ -381,7 +381,14 @@ def _release_active_session_slot(session: dict | None) -> None:
def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> None:
"""Best-effort finalize hook + memory commit for a session."""
"""Best-effort finalize hook + memory commit for a session.
Fires ``on_session_end`` plugin hook and attempts to persist any
unflushed messages before closing the session. This mirrors the
CLI's exit-path behaviour and prevents data loss when the TUI is
force-quit (double CtrlC, terminalclose, SIGHUP) while the agent
is midturn.
"""
if not session or session.get("_finalized"):
return
session["_finalized"] = True
@ -397,6 +404,51 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No
history = list(session.get("history", []))
else:
history = list(session.get("history", []))
# ── Persist unflushed messages to SQLite ──────────────────────────
# Two sources, tried in order of freshness:
# 1. agent._session_messages — set by the last _persist_session()
# call inside run_conversation(). This is the most recent
# snapshot the agent thread wrote, and may include partial
# turn data that hasn't reached session["history"] yet.
# 2. session["history"] — updated after run_conversation()
# returns. Stale when the agent is midturn, but correct
# when the turn completed before finalize.
# Besteffort — the agent thread may still be midturn, so only
# previously completed messages are guaranteed.
if agent is not None and hasattr(agent, "_persist_session"):
snapshot = (
getattr(agent, "_session_messages", None)
or history
)
if snapshot:
try:
agent._persist_session(snapshot, conversation_history=history)
except Exception:
pass
# ── Plugin hook: on_session_end ────────────────────────────────────
# Signals every plugin that the session is closing, with
# interrupted=True so crashrecovery plugins can flush buffers,
# persist state, or close connections before the gateway exits.
# Mirrors cli.py's atexit handler that fires the same hook when
# the user CtrlC's midturn.
if agent is not None:
try:
from hermes_cli.plugins import invoke_hook
invoke_hook(
"on_session_end",
session_id=getattr(agent, "session_id", None)
or session.get("session_key", ""),
completed=False,
interrupted=True,
model=getattr(agent, "model", "unknown"),
platform=getattr(agent, "platform", None) or "tui",
)
except Exception:
pass
if agent is not None and history and hasattr(agent, "commit_memory_session"):
try:
agent.commit_memory_session(history)