fix(api-server): stop silently promising async delivery on stateless HTTP path (#50319)

* fix(api-server): stop silently promising async delivery on stateless HTTP path

terminal(notify_on_complete=True / watch_patterns) and delegate_task(background=True)
silently no-op'd on the API server / WebUI path (#10760): the watcher / detached
child registered, but every API-server route (OpenAI-spec /v1/chat/completions
and /v1/responses, plus the proprietary /v1/runs SSE stream) tears down its
channel when the turn ends, and APIServerAdapter.send() is a no-op stub. A
completion that fires after the response closed had nowhere to go — from the
agent side, indistinguishable from a hang.

There is no spec-compliant surface to wake the agent later on a stateless HTTP
client, so make the no-op honest instead of silent:

- Add a per-adapter capability flag supports_async_delivery (default True;
  APIServerAdapter = False), propagated into a HERMES_SESSION_ASYNC_DELIVERY
  contextvar via async_delivery_supported(). Toggle on the adapter, not a
  hardcoded platform string — a future stateless adapter is correct-by-default.
- terminal: when delivery is unsupported, skip watcher registration, force
  notify_on_complete off, and return a notify_unsupported note telling the
  agent to process(action='poll').
- delegate_task: when delivery is unsupported, fall back to SYNCHRONOUS
  execution (work runs and returns in the same response) with a note, instead
  of handing out a handle that never resolves.

CLI (in-process completion_queue) and the real gateway platforms are unchanged.

Fixes #10760

* refactor(api-server): route session binding through a single no-delivery chokepoint

Add APIServerAdapter._bind_api_server_session() and route both agent-entry
paths (_run_agent for /v1/chat/completions + /v1/responses, and the /v1/runs
_run_sync path) through it. The helper hardwires platform="api_server" and
async_delivery=False with no async_delivery parameter to pass, so a future
route added to the API server physically cannot reintroduce the silent
no-op (#10760) by forgetting to mark the channel as non-delivering.

The binding stays request-scoped (cleared per turn), so a session resumed
later on a delivering interface (CLI / gateway platform) re-binds fresh and
is NOT blocked — the no-delivery decision tracks the interface handling the
current turn, never the session.
This commit is contained in:
Teknium 2026-06-21 12:15:14 -07:00 committed by GitHub
parent 56255f83f7
commit 7a131f7f40
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 405 additions and 20 deletions

View file

@ -749,6 +749,16 @@ class APIServerAdapter(BasePlatformAdapter):
and routes them through hermes-agent's AIAgent.
"""
# Stateless request/response: every route (the OpenAI-spec
# /v1/chat/completions and /v1/responses, and the proprietary /v1/runs SSE
# stream) tears down its channel when the turn ends. There is no persistent
# outbound channel to push a background completion to a client that already
# received its response, and ``send()`` is a no-op stub. So async-delivery
# tools (terminal notify_on_complete / watch_patterns, delegate_task
# background=True) must NOT promise delivery on this path — see
# ``async_delivery_supported()``.
supports_async_delivery: bool = False
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.API_SERVER)
extra = config.extra or {}
@ -3655,6 +3665,38 @@ class APIServerAdapter(BasePlatformAdapter):
)
return None
@staticmethod
def _bind_api_server_session(
*,
chat_id: str = "",
session_key: str = "",
session_id: str = "",
) -> list:
"""Bind session contextvars for an API-server agent run.
This is the SINGLE structural chokepoint every API-server agent-entry
path must use to seed session context it hardwires
``platform="api_server"`` and ``async_delivery=False`` so a new route
physically cannot reintroduce the silent-no-op bug (#10760) by
forgetting to mark the channel as non-delivering. There is no
``async_delivery`` parameter to get wrong; the stateless HTTP path can
never wake the agent after the turn ends, on ANY route.
Returns reset tokens; pass them to ``clear_session_vars`` in a
``finally`` block (the binding is request-scoped and must not outlive
the turn a session resumed later on a delivering interface, e.g. the
CLI or a gateway platform, re-binds fresh and is NOT blocked).
"""
from gateway.session_context import set_session_vars
return set_session_vars(
platform="api_server",
chat_id=chat_id,
session_key=session_key,
session_id=session_id,
async_delivery=False,
)
async def _run_agent(
self,
user_message: str,
@ -3682,10 +3724,9 @@ class APIServerAdapter(BasePlatformAdapter):
loop = asyncio.get_running_loop()
def _run():
from gateway.session_context import clear_session_vars, set_session_vars
from gateway.session_context import clear_session_vars
tokens = set_session_vars(
platform="api_server",
tokens = self._bind_api_server_session(
chat_id=session_id or "",
session_key=gateway_session_key or session_id or "",
session_id=session_id or "",
@ -3940,7 +3981,7 @@ class APIServerAdapter(BasePlatformAdapter):
pass
def _run_sync():
from gateway.session_context import clear_session_vars, set_session_vars
from gateway.session_context import clear_session_vars
from tools.approval import (
register_gateway_notify,
reset_current_session_key,
@ -3956,8 +3997,7 @@ class APIServerAdapter(BasePlatformAdapter):
# contextvars so concurrent runs do not share process
# environment state.
approval_token = set_current_session_key(approval_session_key)
session_tokens = set_session_vars(
platform="api_server",
session_tokens = self._bind_api_server_session(
session_key=approval_session_key,
)
register_gateway_notify(approval_session_key, _approval_notify)

View file

@ -1925,6 +1925,22 @@ class BasePlatformAdapter(ABC):
# preview (see gateway/run.py progress_callback).
supports_code_blocks: bool = False
# Whether this adapter can deliver an ASYNC notification back to the agent
# AFTER a turn ends — i.e. wake a fresh turn to surface a background
# process completion (terminal notify_on_complete / watch_patterns) or a
# detached subagent result (delegate_task background=True).
#
# True for adapters that hold a persistent outbound channel (Telegram,
# Discord, Slack, ... — they have a real ``send()`` and the gateway runs
# the watcher/drain loops). False for stateless request/response adapters
# (the API server): every route closes its channel when the turn ends, so
# there is nowhere to push a later completion. The gateway propagates this
# into the ``HERMES_SESSION_ASYNC_DELIVERY`` contextvar at session-bind
# time; tools read it via ``async_delivery_supported()`` and refuse to make
# a delivery promise they can't keep. A new stateless adapter only needs to
# set this to False to stay correct-by-default.
supports_async_delivery: bool = True
# The command prefix users can always TYPE on this platform to reach
# Hermes commands. Default "/" (most platforms deliver "/approve" etc.
# as plain message text). Platforms where typing a leading "/" is

View file

@ -12683,6 +12683,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
in a ``finally`` block.
"""
from gateway.session_context import set_session_vars
# Propagate the adapter's async-delivery capability so async tools
# (terminal notify_on_complete / watch_patterns, delegate_task
# background=True) know whether this channel can wake a later turn.
# Default True keeps CLI / unknown paths working; stateless adapters
# (api_server) declare supports_async_delivery=False. Use getattr so
# bare runners built via object.__new__ (tests) without self.adapters
# don't blow up — they simply default to supported.
_adapters = getattr(self, "adapters", None) or {}
_adapter = _adapters.get(context.source.platform)
_async_delivery = getattr(_adapter, "supports_async_delivery", True)
return set_session_vars(
platform=context.source.platform.value,
chat_id=context.source.chat_id,
@ -12692,6 +12702,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
user_name=str(context.source.user_name) if context.source.user_name else "",
session_key=context.session_key,
message_id=str(context.source.message_id) if context.source.message_id else "",
async_delivery=_async_delivery,
)
def _clear_session_env(self, tokens: list) -> None:

View file

@ -62,6 +62,27 @@ _SESSION_ID: ContextVar = ContextVar("HERMES_SESSION_ID", default=_UNSET)
# private-chat topic (those lanes route only with thread id + reply anchor).
_SESSION_MESSAGE_ID: ContextVar = ContextVar("HERMES_SESSION_MESSAGE_ID", default=_UNSET)
# Whether the current session's delivery channel can route an ASYNC completion
# back to the agent AFTER the current turn ends (i.e. wake a fresh turn).
#
# True — CLI (in-process completion_queue drain) and the real gateway
# platforms (Telegram/Discord/Slack/...), which hold a persistent
# outbound channel and run the watcher/drain loops.
# False — stateless request/response adapters (the API server: every route,
# spec and proprietary, tears down its channel when the turn ends, so
# a background completion that finishes later has nowhere to go).
#
# Tools that promise async delivery (terminal notify_on_complete /
# watch_patterns, delegate_task background=True) read this via
# ``async_delivery_supported()`` and refuse to hand out a promise the channel
# can't keep — turning a silent no-op into an explicit contract.
#
# Default _UNSET => treated as supported, so CLI (which never sets a platform)
# and any contextvar-unaware path keep working. Stateless adapters opt OUT by
# setting ``supports_async_delivery = False`` on the adapter class; the gateway
# propagates that into this contextvar at session-bind time.
_SESSION_ASYNC_DELIVERY: ContextVar = ContextVar("HERMES_SESSION_ASYNC_DELIVERY", default=_UNSET)
# Cron auto-delivery vars — set per-job in run_job() so concurrent jobs
# don't clobber each other's delivery targets.
_CRON_AUTO_DELIVER_PLATFORM: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_PLATFORM", default=_UNSET)
@ -112,6 +133,7 @@ def set_session_vars(
session_id: str = "",
message_id: str = "",
cwd: str = "",
async_delivery: bool = True,
) -> list:
"""Set all session context variables and return reset tokens.
@ -122,6 +144,11 @@ def set_session_vars(
only for API compatibility.
``cwd`` pins the logical working directory for this context.
``async_delivery`` declares whether this session's channel can route a
background completion back to the agent after the turn ends (see
``_SESSION_ASYNC_DELIVERY`` / ``async_delivery_supported``). Stateless
request/response adapters (the API server) pass ``False``.
"""
tokens = [
_SESSION_PLATFORM.set(platform),
@ -134,6 +161,7 @@ def set_session_vars(
_SESSION_KEY.set(session_key),
_SESSION_ID.set(session_id),
_SESSION_MESSAGE_ID.set(message_id),
_SESSION_ASYNC_DELIVERY.set(bool(async_delivery)),
]
try:
from agent.runtime_cwd import set_session_cwd
@ -168,6 +196,11 @@ def clear_session_vars(tokens: list) -> None:
_SESSION_MESSAGE_ID,
):
var.set("")
# Reset async-delivery capability to the "never set" sentinel rather than a
# falsy value: a cleared context should fall back to the default-supported
# behavior (CLI / unaware paths), not be mistaken for an opted-out
# stateless adapter.
_SESSION_ASYNC_DELIVERY.set(_UNSET)
try:
from agent.runtime_cwd import clear_session_cwd
@ -200,3 +233,22 @@ def get_session_env(name: str, default: str = "") -> str:
return value
# Fall back to os.environ for CLI, cron, and test compatibility
return os.getenv(name, default)
def async_delivery_supported() -> bool:
"""Whether the current session can deliver a background completion later.
Returns ``False`` only when the active session was explicitly bound by a
stateless adapter (the API server) that cannot route a notification back to
the agent after the turn ends. CLI, cron, and the real gateway platforms
and any path that never bound the contextvar return ``True``.
Tools that promise async delivery (``terminal`` notify_on_complete /
watch_patterns, ``delegate_task`` background=True) consult this before
registering a watcher / dispatching a detached child, so they can refuse a
promise the channel can't keep instead of silently no-op'ing.
"""
value = _SESSION_ASYNC_DELIVERY.get()
if value is _UNSET:
return True
return bool(value)

View file

@ -0,0 +1,211 @@
"""Tests for the async-delivery capability gate (issue #10760).
Stateless request/response adapters (the API server / WebUI path) cannot route
a background completion back to the agent after a turn ends there is no
persistent channel and ``APIServerAdapter.send()`` is a no-op stub. So tools
that promise async delivery (``terminal`` notify_on_complete / watch_patterns,
``delegate_task`` background=True) must refuse the promise on that path instead
of silently registering a watcher that never fires.
This is wired through:
- ``BasePlatformAdapter.supports_async_delivery`` (default True)
- ``APIServerAdapter.supports_async_delivery = False``
- ``gateway.session_context._SESSION_ASYNC_DELIVERY`` contextvar +
``async_delivery_supported()`` helper, bound per-session.
These are behavior/invariant tests (how the capability relates to the channel),
not snapshots of a current value.
"""
import json
import pytest
from gateway.session_context import (
async_delivery_supported,
clear_session_vars,
get_session_env,
set_session_vars,
)
# ---------------------------------------------------------------------------
# Capability helper
# ---------------------------------------------------------------------------
class TestAsyncDeliverySupported:
def test_default_unbound_is_supported(self):
"""CLI / cron / unaware paths never bind the var -> supported."""
assert async_delivery_supported() is True
def test_set_true_is_supported(self):
tokens = set_session_vars(
platform="telegram",
chat_id="123",
session_key="telegram:private:123",
async_delivery=True,
)
try:
assert async_delivery_supported() is True
# Platform metadata stays readable alongside the capability.
assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram"
finally:
clear_session_vars(tokens)
def test_set_false_is_unsupported(self):
tokens = set_session_vars(
platform="api_server",
chat_id="sess1",
session_key="sess1",
async_delivery=False,
)
try:
assert async_delivery_supported() is False
# Platform must still be readable for routing/diagnostics even
# though delivery is unsupported.
assert get_session_env("HERMES_SESSION_PLATFORM") == "api_server"
finally:
clear_session_vars(tokens)
def test_omitted_arg_defaults_supported(self):
"""Back-compat: callers that don't pass async_delivery stay supported."""
tokens = set_session_vars(platform="discord", chat_id="9")
try:
assert async_delivery_supported() is True
finally:
clear_session_vars(tokens)
def test_clear_resets_to_default_supported(self):
"""A cleared context must fall back to default-supported, NOT be
mistaken for an opted-out stateless adapter."""
tokens = set_session_vars(
platform="api_server", session_key="s1", async_delivery=False
)
assert async_delivery_supported() is False
clear_session_vars(tokens)
assert async_delivery_supported() is True
# ---------------------------------------------------------------------------
# Adapter capability flag
# ---------------------------------------------------------------------------
class TestAdapterCapabilityFlag:
def test_base_default_true(self):
from gateway.platforms.base import BasePlatformAdapter
assert BasePlatformAdapter.supports_async_delivery is True
def test_api_server_false(self):
from gateway.platforms.api_server import APIServerAdapter
assert APIServerAdapter.supports_async_delivery is False
def test_api_server_bind_chokepoint_hardwires_no_delivery(self):
"""Every API-server agent-entry path binds through
_bind_api_server_session, which hardwires async_delivery=False a new
route physically cannot reintroduce the silent no-op (#10760)."""
from gateway.platforms.api_server import APIServerAdapter
from gateway.session_context import clear_session_vars, get_session_env
tokens = APIServerAdapter._bind_api_server_session(
chat_id="c1", session_key="sk1", session_id="sid1"
)
try:
assert async_delivery_supported() is False
assert get_session_env("HERMES_SESSION_PLATFORM") == "api_server"
finally:
clear_session_vars(tokens)
def test_api_server_binding_does_not_outlive_turn(self):
"""The no-delivery decision is request-scoped, NOT stuck to the session.
After clear, a session resumed on a delivering interface re-binds fresh
and is NOT blocked."""
from gateway.platforms.api_server import APIServerAdapter
from gateway.session_context import clear_session_vars
# Turn 1: same session over the API server -> blocked.
tokens = APIServerAdapter._bind_api_server_session(session_key="shared-key")
assert async_delivery_supported() is False
clear_session_vars(tokens)
# Turn 2: SAME session_key resumed on a delivering interface (CLI/gateway)
# -> supported. The earlier False did not follow the session.
tokens = set_session_vars(
platform="telegram",
session_key="shared-key",
async_delivery=True,
)
try:
assert async_delivery_supported() is True
finally:
clear_session_vars(tokens)
# ---------------------------------------------------------------------------
# terminal_tool: refuses to register a watcher on unsupported sessions
# ---------------------------------------------------------------------------
class TestTerminalNotifyGate:
@pytest.fixture(autouse=True)
def _clean_watchers(self):
from tools.process_registry import process_registry
process_registry.pending_watchers = []
yield
process_registry.pending_watchers = []
def _run_bg(self, command):
from tools.terminal_tool import terminal_tool
return json.loads(
terminal_tool(command=command, background=True, notify_on_complete=True)
)
def test_api_server_skips_watcher_and_notes(self):
from tools.process_registry import process_registry
tokens = set_session_vars(
platform="api_server", chat_id="s1", session_key="s1", async_delivery=False
)
try:
d = self._run_bg("sleep 30 && echo DONE")
finally:
clear_session_vars(tokens)
assert d.get("notify_on_complete") is False
assert d.get("notify_unsupported"), "must explain the limitation"
assert "poll" in d["notify_unsupported"].lower()
assert len(process_registry.pending_watchers) == 0
def test_gateway_registers_watcher(self):
from tools.process_registry import process_registry
tokens = set_session_vars(
platform="telegram",
chat_id="123",
thread_id="7",
user_id="u1",
session_key="telegram:private:123",
async_delivery=True,
)
try:
d = self._run_bg("sleep 30 && echo DONE")
finally:
clear_session_vars(tokens)
assert d.get("notify_on_complete") is True
assert not d.get("notify_unsupported")
assert len(process_registry.pending_watchers) == 1
assert process_registry.pending_watchers[0]["platform"] == "telegram"
def test_cli_stays_supported(self):
"""CLI delivers via the in-process completion_queue: notify stays on,
no false 'unsupported' note, and no pending_watcher (empty platform)."""
from tools.process_registry import process_registry
d = self._run_bg("sleep 30 && echo DONE")
assert d.get("notify_on_complete") is True
assert not d.get("notify_unsupported")
# No platform bound -> no gateway watcher, but completion_queue still fires.
assert len(process_registry.pending_watchers) == 0

View file

@ -2490,6 +2490,34 @@ def delegate_task(
from tools.async_delegation import dispatch_async_delegation_batch
from tools.approval import get_current_session_key
# Stateless request/response sessions (the API server / WebUI path)
# cannot route a detached subagent result back to the agent after the
# turn ends — there is no persistent channel and the adapter's send()
# is a no-op, so a background dispatch would silently never re-enter the
# conversation (issue #10760). Fall back to SYNCHRONOUS execution: the
# work still runs and its result returns in this same response, which is
# strictly better than a handle that never resolves. Mirrors the
# pool-at-capacity inline fallback below.
try:
from gateway.session_context import async_delivery_supported
_async_ok = async_delivery_supported()
except Exception:
_async_ok = True
if not _async_ok:
logger.info(
"delegate_task: async delivery unsupported on this session "
"(stateless HTTP API); running the batch synchronously instead."
)
_sync_result = _execute_and_aggregate()
if isinstance(_sync_result, dict):
_sync_result["note"] = (
"background=true is not available on this endpoint (stateless "
"HTTP API — no channel to deliver a detached subagent result "
"after the turn ends), so the subagent(s) ran SYNCHRONOUSLY and "
"the result is included above."
)
return json.dumps(_sync_result, ensure_ascii=False)
_session_key = get_current_session_key(default="")
_child_agents = [c for (_, _, c) in children]

View file

@ -2297,20 +2297,47 @@ def terminal_tool(
# watch-pattern and completion notifications can be
# routed back to the correct chat/thread.
if background and (notify_on_complete or watch_patterns):
from gateway.session_context import get_session_env as _gse
_gw_platform = _gse("HERMES_SESSION_PLATFORM", "")
if _gw_platform:
_gw_chat_id = _gse("HERMES_SESSION_CHAT_ID", "")
_gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "")
_gw_user_id = _gse("HERMES_SESSION_USER_ID", "")
_gw_user_name = _gse("HERMES_SESSION_USER_NAME", "")
_gw_message_id = _gse("HERMES_SESSION_MESSAGE_ID", "")
proc_session.watcher_platform = _gw_platform
proc_session.watcher_chat_id = _gw_chat_id
proc_session.watcher_user_id = _gw_user_id
proc_session.watcher_user_name = _gw_user_name
proc_session.watcher_thread_id = _gw_thread_id
proc_session.watcher_message_id = _gw_message_id
from gateway.session_context import (
async_delivery_supported as _async_ok,
get_session_env as _gse,
)
# Stateless request/response sessions (the API server /
# WebUI path) cannot route a completion back to the agent
# after the turn ends — there is no persistent channel and
# send() is a no-op. Registering a watcher there silently
# no-ops (issue #10760). Refuse the promise instead: drop
# the flags and tell the agent to poll.
if not _async_ok():
notify_on_complete = False
watch_patterns = None
result_data["notify_on_complete"] = False
result_data["notify_unsupported"] = (
"notify_on_complete / watch_patterns are not available on "
"this endpoint (stateless HTTP API — no channel to deliver "
"an async completion after the turn ends). The process is "
"running in the background; retrieve its result with "
"process(action='poll') or process(action='wait')."
)
logger.info(
"background proc %s: async delivery unsupported on this "
"session; notify_on_complete/watch_patterns disabled",
proc_session.id,
)
else:
_gw_platform = _gse("HERMES_SESSION_PLATFORM", "")
if _gw_platform:
_gw_chat_id = _gse("HERMES_SESSION_CHAT_ID", "")
_gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "")
_gw_user_id = _gse("HERMES_SESSION_USER_ID", "")
_gw_user_name = _gse("HERMES_SESSION_USER_NAME", "")
_gw_message_id = _gse("HERMES_SESSION_MESSAGE_ID", "")
proc_session.watcher_platform = _gw_platform
proc_session.watcher_chat_id = _gw_chat_id
proc_session.watcher_user_id = _gw_user_id
proc_session.watcher_user_name = _gw_user_name
proc_session.watcher_thread_id = _gw_thread_id
proc_session.watcher_message_id = _gw_message_id
# Mutual exclusion: if both notify_on_complete and watch_patterns
# are set, drop watch_patterns. The combination produces duplicate