mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): use persisted session origin for shutdown notifications
Prefer session_store origin over _parse_session_key() for shutdown notifications. Fixes misrouting when chat identifiers contain colons (e.g. Matrix room IDs like !room123:example.org). Falls back to session-key parsing when no persisted origin exists. Co-authored-by: Ruzzgar <ruzzgarcn@gmail.com> Ref: #12766
This commit is contained in:
parent
9725b452a1
commit
0613f10def
3 changed files with 57 additions and 8 deletions
|
|
@ -1667,12 +1667,32 @@ class GatewayRunner:
|
||||||
|
|
||||||
notified: set = set()
|
notified: set = set()
|
||||||
for session_key in active:
|
for session_key in active:
|
||||||
# Parse platform + chat_id from the session key.
|
source = None
|
||||||
_parsed = _parse_session_key(session_key)
|
try:
|
||||||
if not _parsed:
|
if getattr(self, "session_store", None) is not None:
|
||||||
continue
|
self.session_store._ensure_loaded()
|
||||||
platform_str = _parsed["platform"]
|
entry = self.session_store._entries.get(session_key)
|
||||||
chat_id = _parsed["chat_id"]
|
source = getattr(entry, "origin", None) if entry else None
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(
|
||||||
|
"Failed to load session origin for shutdown notification %s: %s",
|
||||||
|
session_key,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
|
||||||
|
if source is not None:
|
||||||
|
platform_str = source.platform.value
|
||||||
|
chat_id = source.chat_id
|
||||||
|
thread_id = source.thread_id
|
||||||
|
else:
|
||||||
|
# Fall back to parsing the session key when no persisted
|
||||||
|
# origin is available (legacy sessions/tests).
|
||||||
|
_parsed = _parse_session_key(session_key)
|
||||||
|
if not _parsed:
|
||||||
|
continue
|
||||||
|
platform_str = _parsed["platform"]
|
||||||
|
chat_id = _parsed["chat_id"]
|
||||||
|
thread_id = _parsed.get("thread_id")
|
||||||
|
|
||||||
# Deduplicate: one notification per chat, even if multiple
|
# Deduplicate: one notification per chat, even if multiple
|
||||||
# sessions (different users/threads) share the same chat.
|
# sessions (different users/threads) share the same chat.
|
||||||
|
|
@ -1688,7 +1708,6 @@ class GatewayRunner:
|
||||||
|
|
||||||
# Include thread_id if present so the message lands in the
|
# Include thread_id if present so the message lands in the
|
||||||
# correct forum topic / thread.
|
# correct forum topic / thread.
|
||||||
thread_id = _parsed.get("thread_id")
|
|
||||||
metadata = {"thread_id": thread_id} if thread_id else None
|
metadata = {"thread_id": thread_id} if thread_id else None
|
||||||
|
|
||||||
await adapter.send(chat_id, msg, metadata=metadata)
|
await adapter.send(chat_id, msg, metadata=metadata)
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,7 @@ def make_restart_runner(
|
||||||
runner.hooks.emit = AsyncMock()
|
runner.hooks.emit = AsyncMock()
|
||||||
runner.pairing_store = MagicMock()
|
runner.pairing_store = MagicMock()
|
||||||
runner.session_store = MagicMock()
|
runner.session_store = MagicMock()
|
||||||
|
runner.session_store._entries = {}
|
||||||
runner.delivery_router = MagicMock()
|
runner.delivery_router = MagicMock()
|
||||||
|
|
||||||
platform_adapter = adapter or RestartTestAdapter()
|
platform_adapter = adapter or RestartTestAdapter()
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from datetime import datetime
|
||||||
from unittest.mock import AsyncMock, MagicMock
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
@ -8,7 +9,7 @@ import pytest
|
||||||
import gateway.run as gateway_run
|
import gateway.run as gateway_run
|
||||||
from gateway.platforms.base import MessageEvent, MessageType
|
from gateway.platforms.base import MessageEvent, MessageType
|
||||||
from gateway.restart import DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
|
from gateway.restart import DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
|
||||||
from gateway.session import build_session_key
|
from gateway.session import SessionEntry, build_session_key
|
||||||
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
|
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -242,3 +243,31 @@ async def test_shutdown_notification_send_failure_does_not_block():
|
||||||
|
|
||||||
# Should not raise
|
# Should not raise
|
||||||
await runner._notify_active_sessions_of_shutdown()
|
await runner._notify_active_sessions_of_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_shutdown_notification_uses_persisted_origin_for_colon_ids():
|
||||||
|
"""Shutdown notifications should route from persisted origin, not reparsed keys."""
|
||||||
|
runner, adapter = make_restart_runner()
|
||||||
|
adapter.send = AsyncMock()
|
||||||
|
source = make_restart_source(chat_id="!room123:example.org", chat_type="group")
|
||||||
|
source.platform = gateway_run.Platform.MATRIX
|
||||||
|
session_key = build_session_key(source)
|
||||||
|
runner._running_agents[session_key] = MagicMock()
|
||||||
|
runner.session_store._entries = {
|
||||||
|
session_key: SessionEntry(
|
||||||
|
session_key=session_key,
|
||||||
|
session_id="sess-1",
|
||||||
|
created_at=datetime.now(),
|
||||||
|
updated_at=datetime.now(),
|
||||||
|
origin=source,
|
||||||
|
platform=source.platform,
|
||||||
|
chat_type=source.chat_type,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
runner.adapters = {gateway_run.Platform.MATRIX: adapter}
|
||||||
|
|
||||||
|
await runner._notify_active_sessions_of_shutdown()
|
||||||
|
|
||||||
|
assert adapter.send.await_count == 1
|
||||||
|
assert adapter.send.await_args.args[0] == "!room123:example.org"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue