From 0613f10defe507242f17f89d81bbb2d15deb2df9 Mon Sep 17 00:00:00 2001 From: Ruzzgar Date: Mon, 20 Apr 2026 05:10:44 -0700 Subject: [PATCH] fix(gateway): use persisted session origin for shutdown notifications Prefer session_store origin over _parse_session_key() for shutdown notifications. Fixes misrouting when chat identifiers contain colons (e.g. Matrix room IDs like !room123:example.org). Falls back to session-key parsing when no persisted origin exists. Co-authored-by: Ruzzgar Ref: #12766 --- gateway/run.py | 33 +++++++++++++++++++++------ tests/gateway/restart_test_helpers.py | 1 + tests/gateway/test_restart_drain.py | 31 ++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 50f33aa35..eb0dfe237 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1667,12 +1667,32 @@ class GatewayRunner: notified: set = set() for session_key in active: - # Parse platform + chat_id from the session key. - _parsed = _parse_session_key(session_key) - if not _parsed: - continue - platform_str = _parsed["platform"] - chat_id = _parsed["chat_id"] + source = None + try: + if getattr(self, "session_store", None) is not None: + self.session_store._ensure_loaded() + entry = self.session_store._entries.get(session_key) + source = getattr(entry, "origin", None) if entry else None + except Exception as e: + logger.debug( + "Failed to load session origin for shutdown notification %s: %s", + session_key, + e, + ) + + if source is not None: + platform_str = source.platform.value + chat_id = source.chat_id + thread_id = source.thread_id + else: + # Fall back to parsing the session key when no persisted + # origin is available (legacy sessions/tests). + _parsed = _parse_session_key(session_key) + if not _parsed: + continue + platform_str = _parsed["platform"] + chat_id = _parsed["chat_id"] + thread_id = _parsed.get("thread_id") # Deduplicate: one notification per chat, even if multiple # sessions (different users/threads) share the same chat. @@ -1688,7 +1708,6 @@ class GatewayRunner: # Include thread_id if present so the message lands in the # correct forum topic / thread. - thread_id = _parsed.get("thread_id") metadata = {"thread_id": thread_id} if thread_id else None await adapter.send(chat_id, msg, metadata=metadata) diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index 75665325b..6332a194f 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -108,6 +108,7 @@ def make_restart_runner( runner.hooks.emit = AsyncMock() runner.pairing_store = MagicMock() runner.session_store = MagicMock() + runner.session_store._entries = {} runner.delivery_router = MagicMock() platform_adapter = adapter or RestartTestAdapter() diff --git a/tests/gateway/test_restart_drain.py b/tests/gateway/test_restart_drain.py index 3607b1e39..d2977f757 100644 --- a/tests/gateway/test_restart_drain.py +++ b/tests/gateway/test_restart_drain.py @@ -1,6 +1,7 @@ import asyncio import shutil import subprocess +from datetime import datetime from unittest.mock import AsyncMock, MagicMock import pytest @@ -8,7 +9,7 @@ import pytest import gateway.run as gateway_run from gateway.platforms.base import MessageEvent, MessageType from gateway.restart import DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT -from gateway.session import build_session_key +from gateway.session import SessionEntry, build_session_key from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source @@ -242,3 +243,31 @@ async def test_shutdown_notification_send_failure_does_not_block(): # Should not raise await runner._notify_active_sessions_of_shutdown() + + +@pytest.mark.asyncio +async def test_shutdown_notification_uses_persisted_origin_for_colon_ids(): + """Shutdown notifications should route from persisted origin, not reparsed keys.""" + runner, adapter = make_restart_runner() + adapter.send = AsyncMock() + source = make_restart_source(chat_id="!room123:example.org", chat_type="group") + source.platform = gateway_run.Platform.MATRIX + session_key = build_session_key(source) + runner._running_agents[session_key] = MagicMock() + runner.session_store._entries = { + session_key: SessionEntry( + session_key=session_key, + session_id="sess-1", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=source.platform, + chat_type=source.chat_type, + ) + } + runner.adapters = {gateway_run.Platform.MATRIX: adapter} + + await runner._notify_active_sessions_of_shutdown() + + assert adapter.send.await_count == 1 + assert adapter.send.await_args.args[0] == "!room123:example.org"