mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway/webhook): don't pop delivery_info on send
The webhook adapter stored per-request `deliver`/`deliver_extra` config in
`_delivery_info[chat_id]` during POST handling and consumed it via `.pop()`
inside `send()`. That worked for routes whose agent run produced exactly
one outbound message — the final response — but it broke whenever the
agent emitted any interim status message before the final response.
Status messages flow through the same `send(chat_id, ...)` path as the
final response (see `gateway/run.py::_status_callback_sync` →
`adapter.send(...)`). Common triggers include:
- "🔄 Primary model failed — switching to fallback: ..."
(run_agent.py::_emit_status when `fallback_providers` activates)
- context-pressure / compression notices
- any other lifecycle event routed through `status_callback`
When any of those fired, the first `send()` call popped the entry, so the
subsequent final-response `send()` saw an empty dict and silently
downgraded `deliver_type` from `"telegram"` (or `discord`/`slack`/etc.) to
the default `"log"`. The agent's response was logged to the gateway log
instead of being delivered to the configured cross-platform target — no
warning, no error, just a missing message.
This was easy to hit in practice. Any user with `fallback_providers`
configured saw it the first time their primary provider hiccuped on a
webhook-triggered run. Routes that worked perfectly in dev (where the
primary stays healthy) silently dropped responses in prod.
Fix: read `_delivery_info` with `.get()` so multiple `send()` calls for
the same `chat_id` all see the same delivery config. To keep the dict
bounded without relying on per-send cleanup, add a parallel
`_delivery_info_created` timestamp dict and a `_prune_delivery_info()`
helper that drops entries older than `_idempotency_ttl` (1h, same window
already used by `_seen_deliveries`). Pruning runs on each POST, mirroring
the existing `_seen_deliveries` cleanup pattern.
Worst-case memory footprint is now `rate_limit * TTL = 30/min * 60min =
1800` entries, each ~1KB → under 2 MB. In practice it'll be far smaller
because most webhooks complete in seconds, not the full hour.
Test changes:
- `test_delivery_info_cleaned_after_send` is replaced with
`test_delivery_info_survives_multiple_sends`, which is now the
regression test for this bug — it asserts that two consecutive
`send()` calls both see the delivery config.
- A new `test_delivery_info_pruned_via_ttl` covers the TTL cleanup
behavior.
- The two integration tests that asserted `chat_id not in
adapter._delivery_info` after `send()` now assert the opposite, with
a comment explaining why.
All 40 tests in `tests/gateway/test_webhook_adapter.py` and
`tests/gateway/test_webhook_integration.py` pass. Verified end-to-end
locally against a dynamic `hermes webhook subscribe` route configured
with `--deliver telegram --deliver-chat-id <user>`: with `gpt-5.4` as
the primary (currently flaky) and `claude-opus-4.6` as the fallback,
the fallback notification fires, the agent finishes, and the final
response is delivered to Telegram as expected.
This commit is contained in:
parent
f3006ebef9
commit
4aef055805
3 changed files with 87 additions and 14 deletions
|
|
@ -76,8 +76,17 @@ class WebhookAdapter(BasePlatformAdapter):
|
|||
self._routes: Dict[str, dict] = dict(self._static_routes)
|
||||
self._runner = None
|
||||
|
||||
# Delivery info keyed by session chat_id — consumed by send()
|
||||
# Delivery info keyed by session chat_id.
|
||||
#
|
||||
# Read by every send() invocation for the chat_id (status messages
|
||||
# AND the final response). Cleaned up via TTL on each POST so the
|
||||
# dict stays bounded — see _prune_delivery_info(). Do NOT pop on
|
||||
# send(), or interim status messages (e.g. fallback notifications,
|
||||
# context-pressure warnings) will consume the entry before the
|
||||
# final response arrives, causing the response to silently fall
|
||||
# back to the "log" deliver type.
|
||||
self._delivery_info: Dict[str, dict] = {}
|
||||
self._delivery_info_created: Dict[str, float] = {}
|
||||
|
||||
# Reference to gateway runner for cross-platform delivery (set externally)
|
||||
self.gateway_runner = None
|
||||
|
|
@ -160,10 +169,14 @@ class WebhookAdapter(BasePlatformAdapter):
|
|||
) -> SendResult:
|
||||
"""Deliver the agent's response to the configured destination.
|
||||
|
||||
chat_id is ``webhook:{route}:{delivery_id}`` — we pop the delivery
|
||||
info stored during webhook receipt so it doesn't leak memory.
|
||||
chat_id is ``webhook:{route}:{delivery_id}``. The delivery info
|
||||
stored during webhook receipt is read with ``.get()`` (not popped)
|
||||
so that interim status messages emitted before the final response
|
||||
— fallback-model notifications, context-pressure warnings, etc. —
|
||||
do not consume the entry and silently downgrade the final response
|
||||
to the ``log`` deliver type. TTL cleanup happens on POST.
|
||||
"""
|
||||
delivery = self._delivery_info.pop(chat_id, {})
|
||||
delivery = self._delivery_info.get(chat_id, {})
|
||||
deliver_type = delivery.get("deliver", "log")
|
||||
|
||||
if deliver_type == "log":
|
||||
|
|
@ -190,6 +203,23 @@ class WebhookAdapter(BasePlatformAdapter):
|
|||
success=False, error=f"Unknown deliver type: {deliver_type}"
|
||||
)
|
||||
|
||||
def _prune_delivery_info(self, now: float) -> None:
|
||||
"""Drop delivery_info entries older than the idempotency TTL.
|
||||
|
||||
Mirrors the cleanup pattern used for ``_seen_deliveries``. Called
|
||||
on each POST so the dict size is bounded by ``rate_limit * TTL``
|
||||
even if many webhooks fire and never receive a final response.
|
||||
"""
|
||||
cutoff = now - self._idempotency_ttl
|
||||
stale = [
|
||||
k
|
||||
for k, t in self._delivery_info_created.items()
|
||||
if t < cutoff
|
||||
]
|
||||
for k in stale:
|
||||
self._delivery_info.pop(k, None)
|
||||
self._delivery_info_created.pop(k, None)
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
return {"name": chat_id, "type": "webhook"}
|
||||
|
||||
|
|
@ -382,7 +412,9 @@ class WebhookAdapter(BasePlatformAdapter):
|
|||
# same route get independent agent runs (not queued/interrupted).
|
||||
session_chat_id = f"webhook:{route_name}:{delivery_id}"
|
||||
|
||||
# Store delivery info for send() — consumed (popped) on delivery
|
||||
# Store delivery info for send(). Read by every send() invocation
|
||||
# for this chat_id (interim status messages and the final response),
|
||||
# so we do NOT pop on send. TTL-based cleanup keeps the dict bounded.
|
||||
deliver_config = {
|
||||
"deliver": route_config.get("deliver", "log"),
|
||||
"deliver_extra": self._render_delivery_extra(
|
||||
|
|
@ -391,6 +423,8 @@ class WebhookAdapter(BasePlatformAdapter):
|
|||
"payload": payload,
|
||||
}
|
||||
self._delivery_info[session_chat_id] = deliver_config
|
||||
self._delivery_info_created[session_chat_id] = now
|
||||
self._prune_delivery_info(now)
|
||||
|
||||
# Build source and event
|
||||
source = self.build_source(
|
||||
|
|
|
|||
|
|
@ -590,8 +590,15 @@ class TestSessionIsolation:
|
|||
class TestDeliveryCleanup:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delivery_info_cleaned_after_send(self):
|
||||
"""send() pops delivery_info so the entry doesn't leak memory."""
|
||||
async def test_delivery_info_survives_multiple_sends(self):
|
||||
"""send() must NOT pop delivery_info.
|
||||
|
||||
Interim status messages (fallback notifications, context-pressure
|
||||
warnings, etc.) flow through the same send() path as the final
|
||||
response. If the entry were popped on the first send, the final
|
||||
response would silently downgrade to the ``log`` deliver type.
|
||||
Regression test for that bug.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
chat_id = "webhook:test:d-xyz"
|
||||
adapter._delivery_info[chat_id] = {
|
||||
|
|
@ -599,10 +606,40 @@ class TestDeliveryCleanup:
|
|||
"deliver_extra": {},
|
||||
"payload": {"x": 1},
|
||||
}
|
||||
adapter._delivery_info_created[chat_id] = time.time()
|
||||
|
||||
result = await adapter.send(chat_id, "Agent response here")
|
||||
assert result.success is True
|
||||
assert chat_id not in adapter._delivery_info
|
||||
# First send (e.g. an interim status message)
|
||||
result1 = await adapter.send(chat_id, "Status: switching to fallback")
|
||||
assert result1.success is True
|
||||
# Entry must still be present so the final send can read it
|
||||
assert chat_id in adapter._delivery_info
|
||||
|
||||
# Second send (the final agent response)
|
||||
result2 = await adapter.send(chat_id, "Final agent response")
|
||||
assert result2.success is True
|
||||
assert chat_id in adapter._delivery_info
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delivery_info_pruned_via_ttl(self):
|
||||
"""Stale delivery_info entries are dropped on the next POST."""
|
||||
adapter = _make_adapter()
|
||||
adapter._idempotency_ttl = 60 # short TTL for the test
|
||||
now = time.time()
|
||||
|
||||
# Stale entry — older than TTL
|
||||
adapter._delivery_info["webhook:test:old"] = {"deliver": "log"}
|
||||
adapter._delivery_info_created["webhook:test:old"] = now - 120
|
||||
|
||||
# Fresh entry — should survive
|
||||
adapter._delivery_info["webhook:test:new"] = {"deliver": "log"}
|
||||
adapter._delivery_info_created["webhook:test:new"] = now - 5
|
||||
|
||||
adapter._prune_delivery_info(now)
|
||||
|
||||
assert "webhook:test:old" not in adapter._delivery_info
|
||||
assert "webhook:test:old" not in adapter._delivery_info_created
|
||||
assert "webhook:test:new" in adapter._delivery_info
|
||||
assert "webhook:test:new" in adapter._delivery_info_created
|
||||
|
||||
|
||||
# ===================================================================
|
||||
|
|
|
|||
|
|
@ -259,8 +259,9 @@ class TestCrossPlatformDelivery:
|
|||
mock_tg_adapter.send.assert_awaited_once_with(
|
||||
"12345", "I've acknowledged the alert.", metadata=None
|
||||
)
|
||||
# Delivery info should be cleaned up
|
||||
assert chat_id not in adapter._delivery_info
|
||||
# Delivery info is retained after send() so interim status messages
|
||||
# don't strand the final response (TTL-based cleanup happens on POST).
|
||||
assert chat_id in adapter._delivery_info
|
||||
|
||||
|
||||
# ===================================================================
|
||||
|
|
@ -333,5 +334,6 @@ class TestGitHubCommentDelivery:
|
|||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
# Delivery info cleaned up
|
||||
assert chat_id not in adapter._delivery_info
|
||||
# Delivery info is retained after send() so interim status messages
|
||||
# don't strand the final response (TTL-based cleanup happens on POST).
|
||||
assert chat_id in adapter._delivery_info
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue