refactor(gateway): simplify auto-resume + extend to crash recovery

Follow-up on top of @kyan12's PR #20888 — same feature, cleaner shape,
wider coverage.

Changes:
- Drop the synthetic '[System note: ...]' in the internal MessageEvent.
  The existing _is_resume_pending branch in _handle_message_with_agent
  (run.py ~L13738) already injects a reason-aware recovery system note
  on the next turn.  With kyan's text in place the model saw two stacked
  system notes.  Now the event text is empty and the existing injection
  path owns the wording.
- Drop SessionStore.list_resume_pending() as a new public method.  The
  filter is 8 lines inline in _schedule_resume_pending_sessions() —
  one caller, no other pluggability need.
- Add 'restart_interrupted' to the auto-resume reason set.  That's the
  reason SessionStore.suspend_recently_active() stamps on sessions
  recovered from a crash/OOM/SIGKILL (no .clean_shutdown marker).
  Previously those sessions had to wait for a real user message to
  auto-resume; now they continue automatically at startup like
  drain-timeout interruptions do.
- Reasons live in a _AUTO_RESUME_REASONS frozenset at class scope so
  future reasons (e.g. 'manual_resume_request') can be opted in with
  one line.

Test coverage added:
- drain-timeout + crash-recovery both scheduled
- stale entries skipped (outside freshness window)
- suspended entries skipped (suspended > resume_pending)
- originless entries skipped (no routing target)
- disallowed reasons skipped (graceful forward-compat)

E2E verified end-to-end with a real on-disk SessionStore: 2 eligible
sessions scheduled, 2 ineligible skipped, empty-text internal events
delivered to the adapter.

Co-authored-by: Kevin Yan <kevyan1998@gmail.com>
This commit is contained in:
Teknium 2026-05-07 05:03:16 -07:00
parent 961a3535fa
commit 38b1c7dce5
4 changed files with 193 additions and 92 deletions

View file

@ -245,30 +245,6 @@ class TestSessionEntryResumeFields:
class TestMarkResumePending:
def test_list_resume_pending_returns_fresh_entries_with_origins(self, tmp_path):
store = _make_store(tmp_path)
fresh = store.get_or_create_session(_make_source(chat_id="fresh"))
stale = store.get_or_create_session(_make_source(chat_id="stale"))
missing_origin = store.get_or_create_session(_make_source(chat_id="missing-origin"))
suspended = store.get_or_create_session(_make_source(chat_id="suspended"))
store.mark_resume_pending(fresh.session_key, reason="restart_timeout")
store.mark_resume_pending(stale.session_key, reason="restart_timeout")
store.mark_resume_pending(missing_origin.session_key, reason="restart_timeout")
store.mark_resume_pending(suspended.session_key, reason="restart_timeout")
old = datetime.now() - timedelta(hours=3)
store._entries[stale.session_key].last_resume_marked_at = old
store._entries[missing_origin.session_key].origin = None
store._entries[suspended.session_key].suspended = True
pending = store.list_resume_pending(
window_secs=3600,
now=datetime.now().timestamp(),
allowed_reasons={"restart_timeout"},
)
assert [entry.session_key for entry in pending] == [fresh.session_key]
def test_marks_existing_session(self, tmp_path):
store = _make_store(tmp_path)
source = _make_source()
@ -978,24 +954,158 @@ async def test_startup_auto_resume_schedules_fresh_pending_sessions():
resume_reason="restart_timeout",
last_resume_marked_at=datetime.now(),
)
runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry])
runner.session_store._entries = {pending_entry.session_key: pending_entry}
adapter.handle_message = AsyncMock()
scheduled = runner._schedule_resume_pending_sessions()
await asyncio.sleep(0)
assert scheduled == 1
runner.session_store.list_resume_pending.assert_called_once_with(
window_secs=_auto_continue_freshness_window(),
allowed_reasons={"restart_timeout", "shutdown_timeout"},
)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert isinstance(event, MessageEvent)
assert event.internal is True
assert event.message_type == MessageType.TEXT
assert event.source == source
assert event.text.startswith("[System note: The gateway restarted")
# Text is empty — the existing _is_resume_pending branch in
# _handle_message_with_agent owns the system-note injection so we don't
# double it up.
assert event.text == ""
@pytest.mark.asyncio
async def test_startup_auto_resume_includes_crash_recovery():
"""Crash-recovered sessions (reason=restart_interrupted) are also auto-resumed.
suspend_recently_active() marks in-flight sessions with resume_reason
"restart_interrupted" when the previous gateway exit was not clean
(crash/SIGKILL/OOM). These should get the same magic continuation as
drain-timeout interruptions.
"""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="crash-chat")
pending_entry = SessionEntry(
session_key="agent:main:telegram:dm:crash-chat",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_interrupted",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {pending_entry.session_key: pending_entry}
adapter.handle_message = AsyncMock()
scheduled = runner._schedule_resume_pending_sessions()
await asyncio.sleep(0)
assert scheduled == 1
adapter.handle_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_startup_auto_resume_skips_stale_entries():
"""Entries older than the freshness window must not be auto-resumed."""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="stale-chat")
stale_marker = datetime.now() - timedelta(
seconds=_auto_continue_freshness_window() + 60
)
stale_entry = SessionEntry(
session_key="agent:main:telegram:dm:stale-chat",
session_id="sid",
created_at=stale_marker,
updated_at=stale_marker,
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_timeout",
last_resume_marked_at=stale_marker,
)
runner.session_store._entries = {stale_entry.session_key: stale_entry}
adapter.handle_message = AsyncMock()
scheduled = runner._schedule_resume_pending_sessions()
assert scheduled == 0
adapter.handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_startup_auto_resume_skips_suspended_and_originless():
"""suspended entries and entries with no origin are excluded."""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="ok")
suspended_entry = SessionEntry(
session_key="agent:main:telegram:dm:suspended",
session_id="sid-s",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_timeout",
suspended=True,
last_resume_marked_at=datetime.now(),
)
originless = SessionEntry(
session_key="agent:main:telegram:dm:originless",
session_id="sid-o",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=None,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_timeout",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {
suspended_entry.session_key: suspended_entry,
originless.session_key: originless,
}
adapter.handle_message = AsyncMock()
scheduled = runner._schedule_resume_pending_sessions()
assert scheduled == 0
adapter.handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_startup_auto_resume_skips_disallowed_reasons():
"""Reasons outside the auto-resume set (e.g. a future custom reason) are skipped.
These sessions still auto-resume on the next real user message via the
existing _is_resume_pending branch we just don't synthesize a turn
for them at startup.
"""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="other")
other_entry = SessionEntry(
session_key="agent:main:telegram:dm:other",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="manual_resume_request",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {other_entry.session_key: other_entry}
adapter.handle_message = AsyncMock()
scheduled = runner._schedule_resume_pending_sessions()
assert scheduled == 0
adapter.handle_message.assert_not_called()
@pytest.mark.asyncio
@ -1014,7 +1124,7 @@ async def test_startup_auto_resume_skips_when_adapter_unavailable():
resume_reason="restart_timeout",
last_resume_marked_at=datetime.now(),
)
runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry])
runner.session_store._entries = {pending_entry.session_key: pending_entry}
runner.adapters = {}
adapter.handle_message = AsyncMock()