diff --git a/tests/tools/test_send_message_tool.py b/tests/tools/test_send_message_tool.py index 626179de19..8fae7239c5 100644 --- a/tests/tools/test_send_message_tool.py +++ b/tests/tools/test_send_message_tool.py @@ -349,6 +349,7 @@ class TestSendToPlatformChunking: "***", "C123", "*hello* from ", + thread_id=None, ) def test_slack_bold_italic_formatted_before_send(self, monkeypatch): @@ -1550,3 +1551,353 @@ class TestForumProbeCache: assert result2["success"] is True # Only one session opened (thread creation) — no probe session this time # (verified by not raising from our side_effect exhaustion) + + +# --------------------------------------------------------------------------- +# Regression for #5472: 'platform:current' resolves to the active session +# --------------------------------------------------------------------------- + + +class TestSendMessageCurrentSessionTarget: + """`discord:current` / `telegram:current` etc. should resolve to the active + gateway session's chat/thread instead of falling back to the home channel. + + Regression coverage for https://github.com/NousResearch/hermes-agent/issues/5472 + """ + + _SESSION_ENV_KEYS = ( + "HERMES_SESSION_PLATFORM", + "HERMES_SESSION_CHAT_ID", + "HERMES_SESSION_THREAD_ID", + ) + + def setup_method(self, _method): + """Isolate each test from session context leaked by earlier tests.""" + from gateway.session_context import set_session_vars + self._tokens = set_session_vars() + + def _apply_session(self, monkeypatch, platform="discord", chat_id="999888777", thread_id=""): + """Set session env vars + contextvars for the duration of the test.""" + from gateway.session_context import set_session_vars + monkeypatch.setenv("HERMES_SESSION_PLATFORM", platform) + monkeypatch.setenv("HERMES_SESSION_CHAT_ID", chat_id) + if thread_id: + monkeypatch.setenv("HERMES_SESSION_THREAD_ID", thread_id) + else: + monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) + set_session_vars( + platform=platform, + chat_id=chat_id, + thread_id=thread_id or "", + ) + + def _discord_config(self): + discord_cfg = SimpleNamespace(enabled=True, token="***", extra={}) + config = SimpleNamespace( + platforms={Platform.DISCORD: discord_cfg}, + get_home_channel=lambda _platform: SimpleNamespace(chat_id="1111111"), + ) + return config, discord_cfg + + def test_current_alias_routes_to_session_chat_id(self, monkeypatch): + config, discord_cfg = self._discord_config() + self._apply_session(monkeypatch, chat_id="999888777") + + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \ + patch("gateway.mirror.mirror_to_session", return_value=True): + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "discord:current", + "message": "batch 1 of 3", + } + ) + ) + + assert result["success"] is True + assert "current discord session" in result.get("note", "") + assert "home channel" not in result.get("note", "") + send_mock.assert_awaited_once_with( + Platform.DISCORD, + discord_cfg, + "999888777", + "batch 1 of 3", + thread_id=None, + media_files=[], + ) + + def test_current_alias_preserves_thread_id(self, monkeypatch): + config, discord_cfg = self._discord_config() + self._apply_session(monkeypatch, chat_id="999888777", thread_id="555444333") + + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \ + patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock: + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "discord:current", + "message": "hello thread", + } + ) + ) + + assert result["success"] is True + send_mock.assert_awaited_once_with( + Platform.DISCORD, + discord_cfg, + "999888777", + "hello thread", + thread_id="555444333", + media_files=[], + ) + mirror_mock.assert_called_once_with( + "discord", "999888777", "hello thread", + source_label="discord", thread_id="555444333", + ) + + def test_session_and_double_underscore_aliases_are_equivalent(self, monkeypatch): + """'session' and '__session__' are accepted as synonyms for 'current'.""" + config, discord_cfg = self._discord_config() + + for alias in ("session", "__session__", "CURRENT", "Current"): + self._apply_session(monkeypatch, chat_id="222") + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \ + patch("gateway.mirror.mirror_to_session", return_value=True): + result = json.loads( + send_message_tool( + { + "action": "send", + "target": f"discord:{alias}", + "message": "hi", + } + ) + ) + + assert result["success"] is True, f"alias {alias!r} failed" + assert send_mock.await_args.args[2] == "222" + + def test_no_active_session_returns_error_without_sending(self, monkeypatch): + """Without HERMES_SESSION_CHAT_ID, the tool must error (not fall back home).""" + config, _discord_cfg = self._discord_config() + for key in self._SESSION_ENV_KEYS: + monkeypatch.delenv(key, raising=False) + from gateway.session_context import set_session_vars + set_session_vars() + + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock()) as send_mock, \ + patch("gateway.mirror.mirror_to_session", return_value=True): + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "discord:current", + "message": "hi", + } + ) + ) + + assert "error" in result + assert "active gateway session" in result["error"] + assert "HERMES_SESSION_CHAT_ID" in result["error"] + send_mock.assert_not_awaited() + + def test_platform_mismatch_returns_clear_error(self, monkeypatch): + """discord:current from a Telegram session must NOT cross platforms.""" + config, _discord_cfg = self._discord_config() + self._apply_session(monkeypatch, platform="telegram", chat_id="-1001234") + + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock()) as send_mock: + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "discord:current", + "message": "hi", + } + ) + ) + + assert "error" in result + assert "does not match" in result["error"] + assert "telegram" in result["error"].lower() + send_mock.assert_not_awaited() + + def test_current_alias_does_not_use_home_channel_fallback(self, monkeypatch): + """Sentinel resolution must skip the home-channel fallback branch entirely.""" + home_calls = [] + + class _Config: + platforms = {Platform.DISCORD: SimpleNamespace(enabled=True, token="***", extra={})} + + def get_home_channel(self, _platform): + home_calls.append(_platform) + return SimpleNamespace(chat_id="1111111") + + self._apply_session(monkeypatch, chat_id="777") + with patch("gateway.config.load_gateway_config", return_value=_Config()), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \ + patch("gateway.mirror.mirror_to_session", return_value=True): + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "discord:current", + "message": "hi", + } + ) + ) + + assert result["success"] is True + assert home_calls == [] + + def test_telegram_general_topic_thread_id_normalized_to_none(self, monkeypatch): + """Telegram forum General topic uses synthetic thread_id='1' which the + Bot API rejects. The sentinel resolver must strip it to None.""" + telegram_cfg = SimpleNamespace(enabled=True, token="***", extra={}) + config = SimpleNamespace( + platforms={Platform.TELEGRAM: telegram_cfg}, + get_home_channel=lambda _platform: SimpleNamespace(chat_id="9999"), + ) + from gateway.session_context import set_session_vars + set_session_vars(platform="telegram", chat_id="-1001234", thread_id="1") + monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram") + monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "-1001234") + monkeypatch.setenv("HERMES_SESSION_THREAD_ID", "1") + + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \ + patch("gateway.mirror.mirror_to_session", return_value=True): + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "telegram:current", + "message": "hi", + } + ) + ) + + assert result["success"] is True + # thread_id must be None (stripped), not "1" + send_mock.assert_awaited_once_with( + Platform.TELEGRAM, + telegram_cfg, + "-1001234", + "hi", + thread_id=None, + media_files=[], + ) + + def test_telegram_real_topic_thread_id_preserved(self, monkeypatch): + """Non-General Telegram forum topics (thread_id != '1') must be kept.""" + telegram_cfg = SimpleNamespace(enabled=True, token="***", extra={}) + config = SimpleNamespace( + platforms={Platform.TELEGRAM: telegram_cfg}, + get_home_channel=lambda _platform: SimpleNamespace(chat_id="9999"), + ) + self._apply_session(monkeypatch, platform="telegram", chat_id="-1001234", thread_id="17585") + + with patch("gateway.config.load_gateway_config", return_value=config), \ + patch("tools.interrupt.is_interrupted", return_value=False), \ + patch("model_tools._run_async", side_effect=_run_async_immediately), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \ + patch("gateway.mirror.mirror_to_session", return_value=True): + result = json.loads( + send_message_tool( + { + "action": "send", + "target": "telegram:current", + "message": "hi", + } + ) + ) + + assert result["success"] is True + send_mock.assert_awaited_once_with( + Platform.TELEGRAM, + telegram_cfg, + "-1001234", + "hi", + thread_id="17585", + media_files=[], + ) + + +class TestSendSlackThreadId: + """_send_slack passes thread_ts when thread_id is provided.""" + + def test_slack_thread_ts_included_in_payload(self): + """thread_id should be forwarded as thread_ts in chat.postMessage.""" + import aiohttp + + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.json = AsyncMock(return_value={"ok": True, "ts": "1234.5678"}) + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=None) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=mock_resp) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + from tools.send_message_tool import _send_slack + + with patch("aiohttp.ClientSession", return_value=mock_session): + result = asyncio.run( + _send_slack("xoxb-test", "C123", "hello", thread_id="1712345678.1234") + ) + + assert result["success"] is True + # Verify thread_ts was in the JSON payload + call_kwargs = mock_session.post.call_args + payload = call_kwargs.kwargs.get("json") or call_kwargs[1].get("json") + assert payload["thread_ts"] == "1712345678.1234" + + def test_slack_no_thread_ts_when_thread_id_is_none(self): + """Without thread_id, thread_ts must NOT appear in the payload.""" + import aiohttp + + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.json = AsyncMock(return_value={"ok": True, "ts": "1234.5678"}) + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=None) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=mock_resp) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + from tools.send_message_tool import _send_slack + + with patch("aiohttp.ClientSession", return_value=mock_session): + result = asyncio.run( + _send_slack("xoxb-test", "C123", "hello") + ) + + assert result["success"] is True + call_kwargs = mock_session.post.call_args + payload = call_kwargs.kwargs.get("json") or call_kwargs[1].get("json") + assert "thread_ts" not in payload diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 19da4f55af..dfa20c3313 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -18,6 +18,7 @@ from agent.redact import redact_sensitive_text logger = logging.getLogger(__name__) +_CURRENT_SESSION_ALIASES = frozenset({"current", "__session__", "session"}) _TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$") _FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$") _WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$") @@ -120,7 +121,7 @@ SEND_MESSAGE_SCHEMA = { }, "target": { "type": "string", - "description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org'" + "description": "Delivery target. Format: 'platform' (uses home channel), 'platform:current' (current gateway session chat/thread — only valid when invoked from a messaging session; use for multi-message batch delivery into the same conversation), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'discord:current', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org'" }, "message": { "type": "string", @@ -163,11 +164,24 @@ def _handle_send(args): target_ref = parts[1].strip() if len(parts) > 1 else None chat_id = None thread_id = None + is_explicit = False + used_current_session = False - if target_ref: + # Session-relative sentinel: resolve 'current' / '__session__' / 'session' + # to the active gateway session's chat_id/thread_id. This enables skills + # that need multi-message batch delivery into the same conversation. + if target_ref and target_ref.lower() in _CURRENT_SESSION_ALIASES: + session_err, session_chat_id, session_thread_id = _resolve_current_session_target( + platform_name + ) + if session_err: + return json.dumps({"error": session_err}) + chat_id = session_chat_id + thread_id = session_thread_id + is_explicit = True + used_current_session = True + elif target_ref: chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref) - else: - is_explicit = False # Resolve human-friendly channel names to numeric IDs if target_ref and not is_explicit: @@ -285,6 +299,11 @@ def _handle_send(args): ) if used_home_channel and isinstance(result, dict) and result.get("success"): result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})" + elif used_current_session and isinstance(result, dict) and result.get("success"): + thread_suffix = f", thread_id: {thread_id}" if thread_id else "" + result["note"] = ( + f"Sent to current {platform_name} session (chat_id: {chat_id}{thread_suffix})" + ) # Mirror the sent message into the target's gateway session if isinstance(result, dict) and result.get("success") and mirror_text: @@ -304,6 +323,53 @@ def _handle_send(args): return json.dumps(_error(f"Send failed: {e}")) +def _resolve_current_session_target(platform_name: str): + """Resolve a ``platform:current`` target to the active session's chat/thread. + + Reads the concurrency-safe session context variables (with legacy env var + fallback) set by ``gateway.session_context.set_session_vars``. + + Returns a tuple ``(error_message, chat_id, thread_id)``. On success the + first element is ``None``. Returns an error if there is no active + gateway session or if the session platform does not match + ``platform_name`` (e.g. calling ``discord:current`` from a Telegram + session). + """ + from gateway.session_context import get_session_env + + session_platform = (get_session_env("HERMES_SESSION_PLATFORM", "") or "").strip().lower() + session_chat_id = (get_session_env("HERMES_SESSION_CHAT_ID", "") or "").strip() + session_thread_id = (get_session_env("HERMES_SESSION_THREAD_ID", "") or "").strip() or None + + if not session_chat_id: + return ( + f"send_message target '{platform_name}:current' requires an active " + "gateway session but no HERMES_SESSION_CHAT_ID is set. Use an explicit " + f"channel target instead, e.g. '{platform_name}:' or '{platform_name}' " + "for the home channel.", + None, + None, + ) + + if session_platform and session_platform != platform_name: + return ( + f"send_message target '{platform_name}:current' does not match the current " + f"session platform ('{session_platform}'). Use '{session_platform}:current' " + "to reply in the active conversation, or specify an explicit channel target.", + None, + None, + ) + + # Telegram forum chats synthesize thread_id="1" for the General topic, + # but the Bot API rejects message_thread_id=1. The gateway adapter + # normalizes this away (TelegramAdapter._message_thread_id_for_send); + # mirror that here so telegram:current doesn't break in General. + if platform_name == "telegram" and session_thread_id == "1": + session_thread_id = None + + return None, session_chat_id, session_thread_id + + def _parse_target_ref(platform_name: str, target_ref: str): """Parse a tool target into chat_id/thread_id and whether it is explicit.""" if platform_name == "telegram": @@ -546,7 +612,7 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, last_result = None for chunk in chunks: if platform == Platform.SLACK: - result = await _send_slack(pconfig.token, chat_id, chunk) + result = await _send_slack(pconfig.token, chat_id, chunk, thread_id=thread_id) elif platform == Platform.WHATSAPP: result = await _send_whatsapp(pconfig.extra, chat_id, chunk) elif platform == Platform.SIGNAL: @@ -935,7 +1001,7 @@ async def _send_discord(token, chat_id, message, thread_id=None, media_files=Non return _error(f"Discord send failed: {e}") -async def _send_slack(token, chat_id, message): +async def _send_slack(token, chat_id, message, thread_id=None): """Send via Slack Web API.""" try: import aiohttp @@ -949,6 +1015,8 @@ async def _send_slack(token, chat_id, message): headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: payload = {"channel": chat_id, "text": message, "mrkdwn": True} + if thread_id: + payload["thread_ts"] = thread_id async with session.post(url, headers=headers, json=payload, **_req_kw) as resp: data = await resp.json() if data.get("ok"):