diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 1d2dfea8a4c..424176967d2 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -749,6 +749,16 @@ class APIServerAdapter(BasePlatformAdapter): and routes them through hermes-agent's AIAgent. """ + # Stateless request/response: every route (the OpenAI-spec + # /v1/chat/completions and /v1/responses, and the proprietary /v1/runs SSE + # stream) tears down its channel when the turn ends. There is no persistent + # outbound channel to push a background completion to a client that already + # received its response, and ``send()`` is a no-op stub. So async-delivery + # tools (terminal notify_on_complete / watch_patterns, delegate_task + # background=True) must NOT promise delivery on this path — see + # ``async_delivery_supported()``. + supports_async_delivery: bool = False + def __init__(self, config: PlatformConfig): super().__init__(config, Platform.API_SERVER) extra = config.extra or {} @@ -3655,6 +3665,38 @@ class APIServerAdapter(BasePlatformAdapter): ) return None + @staticmethod + def _bind_api_server_session( + *, + chat_id: str = "", + session_key: str = "", + session_id: str = "", + ) -> list: + """Bind session contextvars for an API-server agent run. + + This is the SINGLE structural chokepoint every API-server agent-entry + path must use to seed session context — it hardwires + ``platform="api_server"`` and ``async_delivery=False`` so a new route + physically cannot reintroduce the silent-no-op bug (#10760) by + forgetting to mark the channel as non-delivering. There is no + ``async_delivery`` parameter to get wrong; the stateless HTTP path can + never wake the agent after the turn ends, on ANY route. + + Returns reset tokens; pass them to ``clear_session_vars`` in a + ``finally`` block (the binding is request-scoped and must not outlive + the turn — a session resumed later on a delivering interface, e.g. the + CLI or a gateway platform, re-binds fresh and is NOT blocked). + """ + from gateway.session_context import set_session_vars + + return set_session_vars( + platform="api_server", + chat_id=chat_id, + session_key=session_key, + session_id=session_id, + async_delivery=False, + ) + async def _run_agent( self, user_message: str, @@ -3682,10 +3724,9 @@ class APIServerAdapter(BasePlatformAdapter): loop = asyncio.get_running_loop() def _run(): - from gateway.session_context import clear_session_vars, set_session_vars + from gateway.session_context import clear_session_vars - tokens = set_session_vars( - platform="api_server", + tokens = self._bind_api_server_session( chat_id=session_id or "", session_key=gateway_session_key or session_id or "", session_id=session_id or "", @@ -3940,7 +3981,7 @@ class APIServerAdapter(BasePlatformAdapter): pass def _run_sync(): - from gateway.session_context import clear_session_vars, set_session_vars + from gateway.session_context import clear_session_vars from tools.approval import ( register_gateway_notify, reset_current_session_key, @@ -3956,8 +3997,7 @@ class APIServerAdapter(BasePlatformAdapter): # contextvars so concurrent runs do not share process # environment state. approval_token = set_current_session_key(approval_session_key) - session_tokens = set_session_vars( - platform="api_server", + session_tokens = self._bind_api_server_session( session_key=approval_session_key, ) register_gateway_notify(approval_session_key, _approval_notify) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index fe1039f2579..4632f94cf75 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1925,6 +1925,22 @@ class BasePlatformAdapter(ABC): # preview (see gateway/run.py progress_callback). supports_code_blocks: bool = False + # Whether this adapter can deliver an ASYNC notification back to the agent + # AFTER a turn ends — i.e. wake a fresh turn to surface a background + # process completion (terminal notify_on_complete / watch_patterns) or a + # detached subagent result (delegate_task background=True). + # + # True for adapters that hold a persistent outbound channel (Telegram, + # Discord, Slack, ... — they have a real ``send()`` and the gateway runs + # the watcher/drain loops). False for stateless request/response adapters + # (the API server): every route closes its channel when the turn ends, so + # there is nowhere to push a later completion. The gateway propagates this + # into the ``HERMES_SESSION_ASYNC_DELIVERY`` contextvar at session-bind + # time; tools read it via ``async_delivery_supported()`` and refuse to make + # a delivery promise they can't keep. A new stateless adapter only needs to + # set this to False to stay correct-by-default. + supports_async_delivery: bool = True + # The command prefix users can always TYPE on this platform to reach # Hermes commands. Default "/" (most platforms deliver "/approve" etc. # as plain message text). Platforms where typing a leading "/" is diff --git a/gateway/run.py b/gateway/run.py index 0145089b940..b107a58f1a7 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -12683,6 +12683,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew in a ``finally`` block. """ from gateway.session_context import set_session_vars + # Propagate the adapter's async-delivery capability so async tools + # (terminal notify_on_complete / watch_patterns, delegate_task + # background=True) know whether this channel can wake a later turn. + # Default True keeps CLI / unknown paths working; stateless adapters + # (api_server) declare supports_async_delivery=False. Use getattr so + # bare runners built via object.__new__ (tests) without self.adapters + # don't blow up — they simply default to supported. + _adapters = getattr(self, "adapters", None) or {} + _adapter = _adapters.get(context.source.platform) + _async_delivery = getattr(_adapter, "supports_async_delivery", True) return set_session_vars( platform=context.source.platform.value, chat_id=context.source.chat_id, @@ -12692,6 +12702,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew user_name=str(context.source.user_name) if context.source.user_name else "", session_key=context.session_key, message_id=str(context.source.message_id) if context.source.message_id else "", + async_delivery=_async_delivery, ) def _clear_session_env(self, tokens: list) -> None: diff --git a/gateway/session_context.py b/gateway/session_context.py index f6e6ab6dce4..55f269df54d 100644 --- a/gateway/session_context.py +++ b/gateway/session_context.py @@ -62,6 +62,27 @@ _SESSION_ID: ContextVar = ContextVar("HERMES_SESSION_ID", default=_UNSET) # private-chat topic (those lanes route only with thread id + reply anchor). _SESSION_MESSAGE_ID: ContextVar = ContextVar("HERMES_SESSION_MESSAGE_ID", default=_UNSET) +# Whether the current session's delivery channel can route an ASYNC completion +# back to the agent AFTER the current turn ends (i.e. wake a fresh turn). +# +# True — CLI (in-process completion_queue drain) and the real gateway +# platforms (Telegram/Discord/Slack/...), which hold a persistent +# outbound channel and run the watcher/drain loops. +# False — stateless request/response adapters (the API server: every route, +# spec and proprietary, tears down its channel when the turn ends, so +# a background completion that finishes later has nowhere to go). +# +# Tools that promise async delivery (terminal notify_on_complete / +# watch_patterns, delegate_task background=True) read this via +# ``async_delivery_supported()`` and refuse to hand out a promise the channel +# can't keep — turning a silent no-op into an explicit contract. +# +# Default _UNSET => treated as supported, so CLI (which never sets a platform) +# and any contextvar-unaware path keep working. Stateless adapters opt OUT by +# setting ``supports_async_delivery = False`` on the adapter class; the gateway +# propagates that into this contextvar at session-bind time. +_SESSION_ASYNC_DELIVERY: ContextVar = ContextVar("HERMES_SESSION_ASYNC_DELIVERY", default=_UNSET) + # Cron auto-delivery vars — set per-job in run_job() so concurrent jobs # don't clobber each other's delivery targets. _CRON_AUTO_DELIVER_PLATFORM: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_PLATFORM", default=_UNSET) @@ -112,6 +133,7 @@ def set_session_vars( session_id: str = "", message_id: str = "", cwd: str = "", + async_delivery: bool = True, ) -> list: """Set all session context variables and return reset tokens. @@ -122,6 +144,11 @@ def set_session_vars( only for API compatibility. ``cwd`` pins the logical working directory for this context. + + ``async_delivery`` declares whether this session's channel can route a + background completion back to the agent after the turn ends (see + ``_SESSION_ASYNC_DELIVERY`` / ``async_delivery_supported``). Stateless + request/response adapters (the API server) pass ``False``. """ tokens = [ _SESSION_PLATFORM.set(platform), @@ -134,6 +161,7 @@ def set_session_vars( _SESSION_KEY.set(session_key), _SESSION_ID.set(session_id), _SESSION_MESSAGE_ID.set(message_id), + _SESSION_ASYNC_DELIVERY.set(bool(async_delivery)), ] try: from agent.runtime_cwd import set_session_cwd @@ -168,6 +196,11 @@ def clear_session_vars(tokens: list) -> None: _SESSION_MESSAGE_ID, ): var.set("") + # Reset async-delivery capability to the "never set" sentinel rather than a + # falsy value: a cleared context should fall back to the default-supported + # behavior (CLI / unaware paths), not be mistaken for an opted-out + # stateless adapter. + _SESSION_ASYNC_DELIVERY.set(_UNSET) try: from agent.runtime_cwd import clear_session_cwd @@ -200,3 +233,22 @@ def get_session_env(name: str, default: str = "") -> str: return value # Fall back to os.environ for CLI, cron, and test compatibility return os.getenv(name, default) + + +def async_delivery_supported() -> bool: + """Whether the current session can deliver a background completion later. + + Returns ``False`` only when the active session was explicitly bound by a + stateless adapter (the API server) that cannot route a notification back to + the agent after the turn ends. CLI, cron, and the real gateway platforms — + and any path that never bound the contextvar — return ``True``. + + Tools that promise async delivery (``terminal`` notify_on_complete / + watch_patterns, ``delegate_task`` background=True) consult this before + registering a watcher / dispatching a detached child, so they can refuse a + promise the channel can't keep instead of silently no-op'ing. + """ + value = _SESSION_ASYNC_DELIVERY.get() + if value is _UNSET: + return True + return bool(value) diff --git a/tests/gateway/test_async_delivery_capability.py b/tests/gateway/test_async_delivery_capability.py new file mode 100644 index 00000000000..084d4dbdf32 --- /dev/null +++ b/tests/gateway/test_async_delivery_capability.py @@ -0,0 +1,211 @@ +"""Tests for the async-delivery capability gate (issue #10760). + +Stateless request/response adapters (the API server / WebUI path) cannot route +a background completion back to the agent after a turn ends — there is no +persistent channel and ``APIServerAdapter.send()`` is a no-op stub. So tools +that promise async delivery (``terminal`` notify_on_complete / watch_patterns, +``delegate_task`` background=True) must refuse the promise on that path instead +of silently registering a watcher that never fires. + +This is wired through: + - ``BasePlatformAdapter.supports_async_delivery`` (default True) + - ``APIServerAdapter.supports_async_delivery = False`` + - ``gateway.session_context._SESSION_ASYNC_DELIVERY`` contextvar + + ``async_delivery_supported()`` helper, bound per-session. + +These are behavior/invariant tests (how the capability relates to the channel), +not snapshots of a current value. +""" + +import json + +import pytest + +from gateway.session_context import ( + async_delivery_supported, + clear_session_vars, + get_session_env, + set_session_vars, +) + + +# --------------------------------------------------------------------------- +# Capability helper +# --------------------------------------------------------------------------- + +class TestAsyncDeliverySupported: + def test_default_unbound_is_supported(self): + """CLI / cron / unaware paths never bind the var -> supported.""" + assert async_delivery_supported() is True + + def test_set_true_is_supported(self): + tokens = set_session_vars( + platform="telegram", + chat_id="123", + session_key="telegram:private:123", + async_delivery=True, + ) + try: + assert async_delivery_supported() is True + # Platform metadata stays readable alongside the capability. + assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" + finally: + clear_session_vars(tokens) + + def test_set_false_is_unsupported(self): + tokens = set_session_vars( + platform="api_server", + chat_id="sess1", + session_key="sess1", + async_delivery=False, + ) + try: + assert async_delivery_supported() is False + # Platform must still be readable for routing/diagnostics even + # though delivery is unsupported. + assert get_session_env("HERMES_SESSION_PLATFORM") == "api_server" + finally: + clear_session_vars(tokens) + + def test_omitted_arg_defaults_supported(self): + """Back-compat: callers that don't pass async_delivery stay supported.""" + tokens = set_session_vars(platform="discord", chat_id="9") + try: + assert async_delivery_supported() is True + finally: + clear_session_vars(tokens) + + def test_clear_resets_to_default_supported(self): + """A cleared context must fall back to default-supported, NOT be + mistaken for an opted-out stateless adapter.""" + tokens = set_session_vars( + platform="api_server", session_key="s1", async_delivery=False + ) + assert async_delivery_supported() is False + clear_session_vars(tokens) + assert async_delivery_supported() is True + + +# --------------------------------------------------------------------------- +# Adapter capability flag +# --------------------------------------------------------------------------- + +class TestAdapterCapabilityFlag: + def test_base_default_true(self): + from gateway.platforms.base import BasePlatformAdapter + + assert BasePlatformAdapter.supports_async_delivery is True + + def test_api_server_false(self): + from gateway.platforms.api_server import APIServerAdapter + + assert APIServerAdapter.supports_async_delivery is False + + def test_api_server_bind_chokepoint_hardwires_no_delivery(self): + """Every API-server agent-entry path binds through + _bind_api_server_session, which hardwires async_delivery=False — a new + route physically cannot reintroduce the silent no-op (#10760).""" + from gateway.platforms.api_server import APIServerAdapter + from gateway.session_context import clear_session_vars, get_session_env + + tokens = APIServerAdapter._bind_api_server_session( + chat_id="c1", session_key="sk1", session_id="sid1" + ) + try: + assert async_delivery_supported() is False + assert get_session_env("HERMES_SESSION_PLATFORM") == "api_server" + finally: + clear_session_vars(tokens) + + def test_api_server_binding_does_not_outlive_turn(self): + """The no-delivery decision is request-scoped, NOT stuck to the session. + After clear, a session resumed on a delivering interface re-binds fresh + and is NOT blocked.""" + from gateway.platforms.api_server import APIServerAdapter + from gateway.session_context import clear_session_vars + + # Turn 1: same session over the API server -> blocked. + tokens = APIServerAdapter._bind_api_server_session(session_key="shared-key") + assert async_delivery_supported() is False + clear_session_vars(tokens) + + # Turn 2: SAME session_key resumed on a delivering interface (CLI/gateway) + # -> supported. The earlier False did not follow the session. + tokens = set_session_vars( + platform="telegram", + session_key="shared-key", + async_delivery=True, + ) + try: + assert async_delivery_supported() is True + finally: + clear_session_vars(tokens) + + +# --------------------------------------------------------------------------- +# terminal_tool: refuses to register a watcher on unsupported sessions +# --------------------------------------------------------------------------- + +class TestTerminalNotifyGate: + @pytest.fixture(autouse=True) + def _clean_watchers(self): + from tools.process_registry import process_registry + + process_registry.pending_watchers = [] + yield + process_registry.pending_watchers = [] + + def _run_bg(self, command): + from tools.terminal_tool import terminal_tool + + return json.loads( + terminal_tool(command=command, background=True, notify_on_complete=True) + ) + + def test_api_server_skips_watcher_and_notes(self): + from tools.process_registry import process_registry + + tokens = set_session_vars( + platform="api_server", chat_id="s1", session_key="s1", async_delivery=False + ) + try: + d = self._run_bg("sleep 30 && echo DONE") + finally: + clear_session_vars(tokens) + + assert d.get("notify_on_complete") is False + assert d.get("notify_unsupported"), "must explain the limitation" + assert "poll" in d["notify_unsupported"].lower() + assert len(process_registry.pending_watchers) == 0 + + def test_gateway_registers_watcher(self): + from tools.process_registry import process_registry + + tokens = set_session_vars( + platform="telegram", + chat_id="123", + thread_id="7", + user_id="u1", + session_key="telegram:private:123", + async_delivery=True, + ) + try: + d = self._run_bg("sleep 30 && echo DONE") + finally: + clear_session_vars(tokens) + + assert d.get("notify_on_complete") is True + assert not d.get("notify_unsupported") + assert len(process_registry.pending_watchers) == 1 + assert process_registry.pending_watchers[0]["platform"] == "telegram" + + def test_cli_stays_supported(self): + """CLI delivers via the in-process completion_queue: notify stays on, + no false 'unsupported' note, and no pending_watcher (empty platform).""" + from tools.process_registry import process_registry + + d = self._run_bg("sleep 30 && echo DONE") + assert d.get("notify_on_complete") is True + assert not d.get("notify_unsupported") + # No platform bound -> no gateway watcher, but completion_queue still fires. + assert len(process_registry.pending_watchers) == 0 diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 2160bbc279b..5e1875b5198 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -2490,6 +2490,34 @@ def delegate_task( from tools.async_delegation import dispatch_async_delegation_batch from tools.approval import get_current_session_key + # Stateless request/response sessions (the API server / WebUI path) + # cannot route a detached subagent result back to the agent after the + # turn ends — there is no persistent channel and the adapter's send() + # is a no-op, so a background dispatch would silently never re-enter the + # conversation (issue #10760). Fall back to SYNCHRONOUS execution: the + # work still runs and its result returns in this same response, which is + # strictly better than a handle that never resolves. Mirrors the + # pool-at-capacity inline fallback below. + try: + from gateway.session_context import async_delivery_supported + _async_ok = async_delivery_supported() + except Exception: + _async_ok = True + if not _async_ok: + logger.info( + "delegate_task: async delivery unsupported on this session " + "(stateless HTTP API); running the batch synchronously instead." + ) + _sync_result = _execute_and_aggregate() + if isinstance(_sync_result, dict): + _sync_result["note"] = ( + "background=true is not available on this endpoint (stateless " + "HTTP API — no channel to deliver a detached subagent result " + "after the turn ends), so the subagent(s) ran SYNCHRONOUSLY and " + "the result is included above." + ) + return json.dumps(_sync_result, ensure_ascii=False) + _session_key = get_current_session_key(default="") _child_agents = [c for (_, _, c) in children] diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 26d0f425c56..b89a5d8a959 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -2297,20 +2297,47 @@ def terminal_tool( # watch-pattern and completion notifications can be # routed back to the correct chat/thread. if background and (notify_on_complete or watch_patterns): - from gateway.session_context import get_session_env as _gse - _gw_platform = _gse("HERMES_SESSION_PLATFORM", "") - if _gw_platform: - _gw_chat_id = _gse("HERMES_SESSION_CHAT_ID", "") - _gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "") - _gw_user_id = _gse("HERMES_SESSION_USER_ID", "") - _gw_user_name = _gse("HERMES_SESSION_USER_NAME", "") - _gw_message_id = _gse("HERMES_SESSION_MESSAGE_ID", "") - proc_session.watcher_platform = _gw_platform - proc_session.watcher_chat_id = _gw_chat_id - proc_session.watcher_user_id = _gw_user_id - proc_session.watcher_user_name = _gw_user_name - proc_session.watcher_thread_id = _gw_thread_id - proc_session.watcher_message_id = _gw_message_id + from gateway.session_context import ( + async_delivery_supported as _async_ok, + get_session_env as _gse, + ) + + # Stateless request/response sessions (the API server / + # WebUI path) cannot route a completion back to the agent + # after the turn ends — there is no persistent channel and + # send() is a no-op. Registering a watcher there silently + # no-ops (issue #10760). Refuse the promise instead: drop + # the flags and tell the agent to poll. + if not _async_ok(): + notify_on_complete = False + watch_patterns = None + result_data["notify_on_complete"] = False + result_data["notify_unsupported"] = ( + "notify_on_complete / watch_patterns are not available on " + "this endpoint (stateless HTTP API — no channel to deliver " + "an async completion after the turn ends). The process is " + "running in the background; retrieve its result with " + "process(action='poll') or process(action='wait')." + ) + logger.info( + "background proc %s: async delivery unsupported on this " + "session; notify_on_complete/watch_patterns disabled", + proc_session.id, + ) + else: + _gw_platform = _gse("HERMES_SESSION_PLATFORM", "") + if _gw_platform: + _gw_chat_id = _gse("HERMES_SESSION_CHAT_ID", "") + _gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "") + _gw_user_id = _gse("HERMES_SESSION_USER_ID", "") + _gw_user_name = _gse("HERMES_SESSION_USER_NAME", "") + _gw_message_id = _gse("HERMES_SESSION_MESSAGE_ID", "") + proc_session.watcher_platform = _gw_platform + proc_session.watcher_chat_id = _gw_chat_id + proc_session.watcher_user_id = _gw_user_id + proc_session.watcher_user_name = _gw_user_name + proc_session.watcher_thread_id = _gw_thread_id + proc_session.watcher_message_id = _gw_message_id # Mutual exclusion: if both notify_on_complete and watch_patterns # are set, drop watch_patterns. The combination produces duplicate