Merge pull request #50018 from NousResearch/salvage/f3a-delivery-confirm

fix(cron): make live-adapter delivery confirmation reliable (#38922, #47056, #43014)
This commit is contained in:
kshitij 2026-06-21 13:29:45 +05:30 committed by GitHub
commit 02a3288de3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 374 additions and 31 deletions

View file

@ -710,6 +710,27 @@ def _send_media_via_adapter(
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
def _confirm_adapter_delivery(send_result) -> bool:
"""Return True only if ``send_result`` unambiguously confirms delivery.
A live adapter that returns ``None`` (e.g. a swallowed exception, a busy
platform, or a code path that returns early without producing a
``SendResult``) must NOT be treated as success doing so causes the
scheduler to log ``"delivered to <chat> via live adapter"`` while the
gateway never actually sees the message (#47056).
Likewise, an object missing a ``success`` attribute (e.g. a bare ``dict``
or a partial mock) is a contract violation: it does not actually tell us
whether the send succeeded. Require an explicit, truthy ``success``
attribute to count as confirmed.
"""
if send_result is None:
return False
if not hasattr(send_result, "success"):
return False
return bool(getattr(send_result, "success"))
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
"""
Deliver job output to the configured target(s) (origin chat, specific platform, etc.).
@ -723,11 +744,25 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
"""
targets = _resolve_delivery_targets(job)
if not targets:
if job.get("deliver", "local") != "local":
msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}"
logger.warning("Job '%s': %s", job["id"], msg)
return msg
return None # local-only jobs don't deliver — not a failure
deliver_value = _normalize_deliver_value(job.get("deliver", "local"))
if deliver_value == "local":
return None # local-only jobs don't deliver — not a failure
# deliver=origin with no resolvable origin and no configured home
# channels: treat as local rather than reporting an error. CLI-created
# jobs never capture a {platform, chat_id} origin, so failing here would
# make every CLI `deliver=origin` (or auto-detect) job emit a spurious
# "no delivery target resolved" error on every run (#43014). The output
# is still persisted in last_output for `cron list`/resume.
if deliver_value == "origin":
logger.info(
"Job '%s': deliver=origin but no origin or home channels — "
"skipping delivery (output saved in last_output)",
job.get("name", job.get("id", "?")),
)
return None
msg = f"no delivery target resolved for deliver={deliver_value}"
logger.warning("Job '%s': %s", job["id"], msg)
return msg
from tools.send_message_tool import _send_to_platform
from gateway.config import load_gateway_config, Platform
@ -817,6 +852,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
# Send cleaned text (MEDIA tags stripped) — not the raw content
text_to_send = cleaned_delivery_content.strip()
adapter_ok = True
timed_out = False
if text_to_send:
from agent.async_utils import safe_schedule_threadsafe
future = safe_schedule_threadsafe(
@ -827,19 +863,81 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
adapter_ok = False
target_errors.append("live adapter event loop scheduling failed")
else:
send_result = None
timeout_handled = False
try:
send_result = future.result(timeout=60)
except TimeoutError as te:
future.cancel()
target_errors.append(f"live adapter send timed out: {te}")
raise
except TimeoutError:
# #38922: a slow confirmation does NOT necessarily
# mean the send failed — but we must distinguish two
# cases via future.cancel()'s return value:
#
# cancel() == False -> the coroutine was already
# running on the gateway loop when the timeout
# fired; the request is in flight on the wire and
# cannot be un-sent. Re-sending via standalone
# would be a guaranteed DUPLICATE, so treat it as
# delivered (assume-delivered).
#
# cancel() == True -> the scheduled callback never
# started executing (loop wedged/backlogged for
# the full 60s), so nothing was sent. We MUST
# fall through to the standalone path or the
# message is silently dropped (worse than a
# duplicate).
cancelled = future.cancel()
if cancelled:
msg = (
f"live adapter send to {platform_name}:{chat_id} "
"timed out before the coroutine was dispatched"
)
logger.warning(
"Job '%s': %s, falling back to standalone",
job["id"], msg,
)
target_errors.append(msg)
adapter_ok = False # fall through to standalone path
timeout_handled = True
else:
timed_out = True
timeout_handled = True
logger.warning(
"Job '%s': live adapter send to %s:%s timed out "
"after 60s; already dispatched (in flight), "
"assuming delivered (skipping standalone fallback "
"to avoid duplicate)",
job["id"], platform_name, chat_id,
)
except Exception as ex:
# A real send error (not a slow confirmation) — fall
# through to the standalone path so the message is
# still delivered.
target_errors.append(f"live adapter send failed: {ex}")
raise
if send_result is None or not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown") if send_result else "no response from adapter"
msg = f"live adapter send to {platform_name}:{chat_id} failed: {err}"
if timeout_handled:
# The timeout branch above already decided the
# outcome (assume-delivered if in flight, or
# adapter_ok=False to fall through if never
# dispatched). send_result is None, so skip the
# confirmation/thread-fallback inspection below.
pass
elif not _confirm_adapter_delivery(send_result):
# A ``None`` return or a result object missing an
# explicit ``success`` attribute is NOT a confirmed
# delivery (#47056): the scheduler would log
# "delivered" while the gateway never saw it. Fall
# through to the standalone path.
err = (
getattr(send_result, "error", None)
if send_result is not None
else "no response from adapter"
)
shape = type(send_result).__name__ if send_result is not None else "None"
msg = (
f"live adapter send to {platform_name}:{chat_id} "
f"returned unconfirmed result ({shape}, error={err})"
)
logger.warning(
"Job '%s': %s, falling back to standalone",
job["id"], msg,
@ -860,8 +958,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
logger.warning("Job '%s': %s", job["id"], msg)
delivery_errors.append(msg)
# Send extracted media files as native attachments via the live adapter
if adapter_ok and media_files:
# Send extracted media files as native attachments via the live adapter.
# Skip on an in-flight confirmation timeout: the gateway loop is
# contended, so each media send would also block its 30s budget,
# and the text payload is already assumed delivered (#38922).
# Record the skipped attachments so the drop is visible in the
# job's delivery error rather than silently lost.
if adapter_ok and not timed_out and media_files:
_send_media_via_adapter(
runtime_adapter,
chat_id,
@ -871,6 +974,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
job,
platform=platform,
)
elif timed_out and media_files:
msg = (
f"{len(media_files)} media attachment(s) not delivered to "
f"{platform_name}:{chat_id} (live adapter confirmation timed out)"
)
logger.warning("Job '%s': %s", job["id"], msg)
delivery_errors.append(msg)
if adapter_ok:
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)

View file

@ -2706,15 +2706,20 @@ class TestParallelTick:
class TestDeliverResultTimeoutCancelsFuture:
"""When future.result(timeout=60) raises TimeoutError in the live
adapter delivery path, _deliver_result must cancel the orphan
coroutine so it cannot duplicate-send after the standalone fallback.
"""When future.result(timeout=60) raises TimeoutError in the live adapter
delivery path, the outcome depends on whether the coroutine was already
running. future.cancel() returning False means it is in flight on the wire
(cannot be un-sent) treat as DELIVERED and skip the standalone fallback to
avoid a duplicate (#38922). future.cancel() returning True means it never
started (wedged loop) nothing was sent, so fall through to standalone or
the message is silently dropped. Regression for #38922.
"""
def test_live_adapter_timeout_cancels_future_and_falls_back(self):
"""End-to-end: live adapter hangs past the 60s budget, _deliver_result
patches the timeout down to a fast value, confirms future.cancel() fires,
and verifies the standalone fallback path still delivers."""
def test_live_adapter_timeout_assumes_delivered_no_duplicate(self):
"""End-to-end: live adapter confirmation times out past the 60s budget.
The fix (#38922) treats the send as already-dispatched/delivered and
does NOT run the standalone fallback otherwise the message is sent
twice."""
from gateway.config import Platform
from concurrent.futures import Future
@ -2730,18 +2735,19 @@ class TestDeliverResultTimeoutCancelsFuture:
loop = MagicMock()
loop.is_running.return_value = True
# A real concurrent.futures.Future so .cancel() has real semantics,
# but we override .result() to raise TimeoutError exactly like the
# 60s wait firing in production.
# A real concurrent.futures.Future, but we override .result() to raise
# TimeoutError exactly like the 60s wait firing in production. We make
# .cancel() return False to simulate the coroutine being ALREADY RUNNING
# on the gateway loop (in flight on the wire) — the case where the send
# cannot be un-sent and a standalone resend would be a duplicate.
captured_future = Future()
cancel_calls = []
original_cancel = captured_future.cancel
def tracking_cancel():
def in_flight_cancel():
cancel_calls.append(True)
return original_cancel()
return False # already running — cannot be cancelled
captured_future.cancel = tracking_cancel
captured_future.cancel = in_flight_cancel
captured_future.result = MagicMock(side_effect=TimeoutError("timed out"))
def fake_run_coro(coro, _loop):
@ -2767,11 +2773,121 @@ class TestDeliverResultTimeoutCancelsFuture:
loop=loop,
)
# 1. The orphan future was cancelled on timeout (the bug fix)
assert cancel_calls == [True], "future.cancel() must fire on TimeoutError"
# 2. The standalone fallback delivered — no double send, no silent drop
# 1. cancel() was attempted (returned False = in flight).
assert cancel_calls == [True], "future.cancel() should be attempted on TimeoutError"
# 2. Delivery is reported successful (no error string returned).
assert result is None, f"expected successful delivery, got error: {result!r}"
# 3. The standalone fallback must NOT run — that is the #38922 fix:
# an in-flight confirmation timeout is assume-delivered, not a resend.
standalone_send.assert_not_awaited()
def test_live_adapter_timeout_before_dispatch_falls_back_to_standalone(self):
"""When the coroutine never started (loop wedged) — future.cancel()
returns True nothing was sent, so _deliver_result MUST fall through
to the standalone path rather than silently dropping the message.
This is the inverse of the assume-delivered case and guards against the
wedged-loop silent drop."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
captured_future = Future()
cancel_calls = []
def never_dispatched_cancel():
cancel_calls.append(True)
return True # callback never ran — successfully cancelled
captured_future.cancel = never_dispatched_cancel
captured_future.result = MagicMock(side_effect=TimeoutError("timed out"))
def fake_run_coro(coro, _loop):
coro.close()
return captured_future
job = {
"id": "timeout-undispatched-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
standalone_send = AsyncMock(return_value={"success": True})
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \
patch("tools.send_message_tool._send_to_platform", new=standalone_send):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
assert cancel_calls == [True], "future.cancel() should be attempted"
# The standalone path MUST run — the message was never sent.
standalone_send.assert_awaited_once()
assert result is None, f"standalone should have delivered, got: {result!r}"
def test_live_adapter_real_exception_falls_back_to_standalone(self):
"""A non-timeout send Exception (real failure, not a slow confirmation)
must fall through to the standalone path so the message is still
delivered. Guards the `except Exception: raise` branch the bug class
where broadening the timeout handler to swallow all exceptions would
silently drop messages."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
captured_future = Future()
captured_future.result = MagicMock(side_effect=RuntimeError("adapter exploded"))
def fake_run_coro(coro, _loop):
coro.close()
return captured_future
job = {
"id": "send-error-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
standalone_send = AsyncMock(return_value={"success": True})
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \
patch("tools.send_message_tool._send_to_platform", new=standalone_send):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
# A real exception must NOT be assume-delivered: standalone runs.
standalone_send.assert_awaited_once()
assert result is None, f"standalone should have delivered, got: {result!r}"
def test_live_adapter_thread_fallback_records_delivery_error(self):
"""A cron target with an explicit topic must not be marked clean if
@ -2833,6 +2949,123 @@ class TestDeliverResultTimeoutCancelsFuture:
)
class TestDeliverResultLiveAdapterUnconfirmed:
"""Regression for #47056.
When a live adapter's send() returns ``None`` (swallowed exception / busy
platform) or a result object that lacks an explicit ``success`` attribute
(bare dict / partial object), the scheduler must NOT log "delivered via
live adapter" and silently drop the message. Every unconfirmed shape must
fall through to the standalone delivery path so the message actually
arrives. The pre-fix check ``send_result is None or not getattr(...,
"success", True)`` let a ``.success``-less object default to True = silent
success.
"""
def _run(self, send_value):
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = send_value
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
completed_future = Future()
completed_future.set_result(send_value)
def fake_run_coro(coro, _loop):
coro.close()
return completed_future
job = {
"id": "unconfirmed-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
standalone_send = AsyncMock(return_value={"success": True})
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \
patch("tools.send_message_tool._send_to_platform", new=standalone_send):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
return result, standalone_send
def test_none_result_falls_through_to_standalone(self):
"""send() returning None must trigger the standalone fallback, not a
silent "delivered" log."""
result, standalone_send = self._run(None)
assert result is None, f"standalone should have delivered, got: {result!r}"
standalone_send.assert_awaited_once()
def test_result_missing_success_attr_falls_through(self):
"""A result object with no ``success`` attribute is a contract
violation and must NOT be counted as delivered (it defaulted to True
before the fix)."""
class _NoSuccess:
pass
result, standalone_send = self._run(_NoSuccess())
assert result is None, f"standalone should have delivered, got: {result!r}"
standalone_send.assert_awaited_once()
def test_confirmed_success_does_not_fall_through(self):
"""A genuine SendResult(success=True) is confirmed — the standalone
path must NOT run (no duplicate)."""
result, standalone_send = self._run(MagicMock(success=True, raw_response=None))
assert result is None
standalone_send.assert_not_awaited()
class TestDeliverOriginUnresolvableIsLocal:
"""Regression for #43014.
A cron job created in a CLI session has no {platform, chat_id} origin.
With ``deliver=origin`` (or auto-detect / deliver=None) and no configured
platform home channel, delivery is unresolvable but that is the EXPECTED
state for CLI jobs, not an error. _deliver_result must return None (treat
as local; output stays in last_output), not the "no delivery target
resolved" error string that previously fired on every run.
"""
def _deliver(self, job, monkeypatch):
import cron.scheduler as sched
# No home channel for any platform → origin is unresolvable.
monkeypatch.setattr(sched, "_get_home_target_chat_id", lambda *_: "")
return _deliver_result(job, "CLI bulletin")
def test_origin_with_no_home_channels_returns_none(self, monkeypatch):
job = {"id": "cli-job", "deliver": "origin", "origin": "cli-session-provenance"}
assert self._deliver(job, monkeypatch) is None
def test_omitted_deliver_autodetect_returns_none(self, monkeypatch):
# deliver key present but None (auto-detect) previously errored with
# "no delivery target resolved for deliver=None".
job = {"id": "cli-job", "deliver": None, "origin": "cli-session-provenance"}
assert self._deliver(job, monkeypatch) is None
def test_explicit_platform_with_no_channel_still_errors(self, monkeypatch):
# A concrete platform target that cannot resolve is still a real error
# (this must NOT be silently swallowed by the origin→local fallback).
job = {"id": "tg-job", "deliver": "telegram"}
result = self._deliver(job, monkeypatch)
assert result is not None
assert "no delivery target resolved" in result
class TestSendMediaTimeoutCancelsFuture:
"""Same orphan-coroutine guarantee for _send_media_via_adapter's
future.result(timeout=30) call. If this times out mid-batch, the