From d54890870ffd50a596b1ba0272bc05889e3e35c7 Mon Sep 17 00:00:00 2001 From: Luke The Dev Date: Sun, 21 Jun 2026 12:51:09 +0530 Subject: [PATCH] fix(cron): make live-adapter delivery confirmation reliable (#38922, #47056, #43014) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidates three cron-delivery defects in cron/scheduler.py::_deliver_result that all stem from how the live-adapter send result is interpreted. #38922 — duplicate message on confirmation timeout. future.result(timeout=60) raising TimeoutError bubbled to the outer except handler, which left delivered=False, so `if not delivered:` re-sent the identical message via the standalone path. future.cancel() cannot un-send a request already in flight on the wire, so a slow confirmation deterministically produced a duplicate. The send was already dispatched onto the gateway loop, so a bare timeout is now treated as delivered (assume-delivered is safer than guaranteed-duplicate) and the standalone fallback is skipped. The live-adapter media attempt is also skipped on timeout since the contended loop would re-block each 30s media budget. #47056 — silent drop when the gateway has an active session. The old check `if send_result is None or not getattr(send_result, "success", True)` let a result object missing a `success` attribute default to True = counted as a successful delivery, so the scheduler logged "delivered via live adapter" while the gateway never processed the message. Delivery is now confirmed via _confirm_adapter_delivery(): only an explicit, truthy `success` attribute counts; None or a `success`-less object falls through to the standalone path so the message actually arrives. A genuine send Exception (not a slow confirmation) still falls through to the standalone path, and is caught by run_job's outer handler — it is recorded as the job's last_error and never crashes the cron ticker. #43014 — deliver=origin fails to resolve in CLI sessions. A CLI-created job has no {platform, chat_id} origin, so deliver=origin (and auto-detect / deliver=None) was unresolvable and emitted "no delivery target resolved" on every run. An unresolvable origin with no configured home channel is now treated as local (output stays in last_output), matching the documented auto-deliver contract; a concrete unresolvable platform target still reports a real error. Salvaged from #41007 (timeout discriminator), folding in #47127's _confirm_adapter_delivery hardening and #38937 / #43063's origin→local fallback. Tests rewritten as behavior contracts (timeout => no duplicate; None / success-less result => standalone fallback; confirmed success => no fallback; CLI origin => local, explicit platform => still errors). Co-authored-by: Evi Nova <66773372+Tranquil-Flow@users.noreply.github.com> Co-authored-by: kyssta-exe --- cron/scheduler.py | 138 ++++++++++++++++-- tests/cron/test_scheduler.py | 267 ++++++++++++++++++++++++++++++++--- 2 files changed, 374 insertions(+), 31 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 0956528b132..d91a19dcac5 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -710,6 +710,27 @@ def _send_media_via_adapter( logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e) +def _confirm_adapter_delivery(send_result) -> bool: + """Return True only if ``send_result`` unambiguously confirms delivery. + + A live adapter that returns ``None`` (e.g. a swallowed exception, a busy + platform, or a code path that returns early without producing a + ``SendResult``) must NOT be treated as success — doing so causes the + scheduler to log ``"delivered to via live adapter"`` while the + gateway never actually sees the message (#47056). + + Likewise, an object missing a ``success`` attribute (e.g. a bare ``dict`` + or a partial mock) is a contract violation: it does not actually tell us + whether the send succeeded. Require an explicit, truthy ``success`` + attribute to count as confirmed. + """ + if send_result is None: + return False + if not hasattr(send_result, "success"): + return False + return bool(getattr(send_result, "success")) + + def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]: """ Deliver job output to the configured target(s) (origin chat, specific platform, etc.). @@ -723,11 +744,25 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option """ targets = _resolve_delivery_targets(job) if not targets: - if job.get("deliver", "local") != "local": - msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}" - logger.warning("Job '%s': %s", job["id"], msg) - return msg - return None # local-only jobs don't deliver — not a failure + deliver_value = _normalize_deliver_value(job.get("deliver", "local")) + if deliver_value == "local": + return None # local-only jobs don't deliver — not a failure + # deliver=origin with no resolvable origin and no configured home + # channels: treat as local rather than reporting an error. CLI-created + # jobs never capture a {platform, chat_id} origin, so failing here would + # make every CLI `deliver=origin` (or auto-detect) job emit a spurious + # "no delivery target resolved" error on every run (#43014). The output + # is still persisted in last_output for `cron list`/resume. + if deliver_value == "origin": + logger.info( + "Job '%s': deliver=origin but no origin or home channels — " + "skipping delivery (output saved in last_output)", + job.get("name", job.get("id", "?")), + ) + return None + msg = f"no delivery target resolved for deliver={deliver_value}" + logger.warning("Job '%s': %s", job["id"], msg) + return msg from tools.send_message_tool import _send_to_platform from gateway.config import load_gateway_config, Platform @@ -817,6 +852,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option # Send cleaned text (MEDIA tags stripped) — not the raw content text_to_send = cleaned_delivery_content.strip() adapter_ok = True + timed_out = False if text_to_send: from agent.async_utils import safe_schedule_threadsafe future = safe_schedule_threadsafe( @@ -827,19 +863,81 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option adapter_ok = False target_errors.append("live adapter event loop scheduling failed") else: + send_result = None + timeout_handled = False try: send_result = future.result(timeout=60) - except TimeoutError as te: - future.cancel() - target_errors.append(f"live adapter send timed out: {te}") - raise + except TimeoutError: + # #38922: a slow confirmation does NOT necessarily + # mean the send failed — but we must distinguish two + # cases via future.cancel()'s return value: + # + # cancel() == False -> the coroutine was already + # running on the gateway loop when the timeout + # fired; the request is in flight on the wire and + # cannot be un-sent. Re-sending via standalone + # would be a guaranteed DUPLICATE, so treat it as + # delivered (assume-delivered). + # + # cancel() == True -> the scheduled callback never + # started executing (loop wedged/backlogged for + # the full 60s), so nothing was sent. We MUST + # fall through to the standalone path or the + # message is silently dropped (worse than a + # duplicate). + cancelled = future.cancel() + if cancelled: + msg = ( + f"live adapter send to {platform_name}:{chat_id} " + "timed out before the coroutine was dispatched" + ) + logger.warning( + "Job '%s': %s, falling back to standalone", + job["id"], msg, + ) + target_errors.append(msg) + adapter_ok = False # fall through to standalone path + timeout_handled = True + else: + timed_out = True + timeout_handled = True + logger.warning( + "Job '%s': live adapter send to %s:%s timed out " + "after 60s; already dispatched (in flight), " + "assuming delivered (skipping standalone fallback " + "to avoid duplicate)", + job["id"], platform_name, chat_id, + ) except Exception as ex: + # A real send error (not a slow confirmation) — fall + # through to the standalone path so the message is + # still delivered. target_errors.append(f"live adapter send failed: {ex}") raise - if send_result is None or not getattr(send_result, "success", True): - err = getattr(send_result, "error", "unknown") if send_result else "no response from adapter" - msg = f"live adapter send to {platform_name}:{chat_id} failed: {err}" + if timeout_handled: + # The timeout branch above already decided the + # outcome (assume-delivered if in flight, or + # adapter_ok=False to fall through if never + # dispatched). send_result is None, so skip the + # confirmation/thread-fallback inspection below. + pass + elif not _confirm_adapter_delivery(send_result): + # A ``None`` return or a result object missing an + # explicit ``success`` attribute is NOT a confirmed + # delivery (#47056): the scheduler would log + # "delivered" while the gateway never saw it. Fall + # through to the standalone path. + err = ( + getattr(send_result, "error", None) + if send_result is not None + else "no response from adapter" + ) + shape = type(send_result).__name__ if send_result is not None else "None" + msg = ( + f"live adapter send to {platform_name}:{chat_id} " + f"returned unconfirmed result ({shape}, error={err})" + ) logger.warning( "Job '%s': %s, falling back to standalone", job["id"], msg, @@ -860,8 +958,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option logger.warning("Job '%s': %s", job["id"], msg) delivery_errors.append(msg) - # Send extracted media files as native attachments via the live adapter - if adapter_ok and media_files: + # Send extracted media files as native attachments via the live adapter. + # Skip on an in-flight confirmation timeout: the gateway loop is + # contended, so each media send would also block its 30s budget, + # and the text payload is already assumed delivered (#38922). + # Record the skipped attachments so the drop is visible in the + # job's delivery error rather than silently lost. + if adapter_ok and not timed_out and media_files: _send_media_via_adapter( runtime_adapter, chat_id, @@ -871,6 +974,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option job, platform=platform, ) + elif timed_out and media_files: + msg = ( + f"{len(media_files)} media attachment(s) not delivered to " + f"{platform_name}:{chat_id} (live adapter confirmation timed out)" + ) + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) if adapter_ok: logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index a13e943ad3c..a57f0805f8b 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -2706,15 +2706,20 @@ class TestParallelTick: class TestDeliverResultTimeoutCancelsFuture: - """When future.result(timeout=60) raises TimeoutError in the live - adapter delivery path, _deliver_result must cancel the orphan - coroutine so it cannot duplicate-send after the standalone fallback. + """When future.result(timeout=60) raises TimeoutError in the live adapter + delivery path, the outcome depends on whether the coroutine was already + running. future.cancel() returning False means it is in flight on the wire + (cannot be un-sent) → treat as DELIVERED and skip the standalone fallback to + avoid a duplicate (#38922). future.cancel() returning True means it never + started (wedged loop) → nothing was sent, so fall through to standalone or + the message is silently dropped. Regression for #38922. """ - def test_live_adapter_timeout_cancels_future_and_falls_back(self): - """End-to-end: live adapter hangs past the 60s budget, _deliver_result - patches the timeout down to a fast value, confirms future.cancel() fires, - and verifies the standalone fallback path still delivers.""" + def test_live_adapter_timeout_assumes_delivered_no_duplicate(self): + """End-to-end: live adapter confirmation times out past the 60s budget. + The fix (#38922) treats the send as already-dispatched/delivered and + does NOT run the standalone fallback — otherwise the message is sent + twice.""" from gateway.config import Platform from concurrent.futures import Future @@ -2730,18 +2735,19 @@ class TestDeliverResultTimeoutCancelsFuture: loop = MagicMock() loop.is_running.return_value = True - # A real concurrent.futures.Future so .cancel() has real semantics, - # but we override .result() to raise TimeoutError exactly like the - # 60s wait firing in production. + # A real concurrent.futures.Future, but we override .result() to raise + # TimeoutError exactly like the 60s wait firing in production. We make + # .cancel() return False to simulate the coroutine being ALREADY RUNNING + # on the gateway loop (in flight on the wire) — the case where the send + # cannot be un-sent and a standalone resend would be a duplicate. captured_future = Future() cancel_calls = [] - original_cancel = captured_future.cancel - def tracking_cancel(): + def in_flight_cancel(): cancel_calls.append(True) - return original_cancel() + return False # already running — cannot be cancelled - captured_future.cancel = tracking_cancel + captured_future.cancel = in_flight_cancel captured_future.result = MagicMock(side_effect=TimeoutError("timed out")) def fake_run_coro(coro, _loop): @@ -2767,11 +2773,121 @@ class TestDeliverResultTimeoutCancelsFuture: loop=loop, ) - # 1. The orphan future was cancelled on timeout (the bug fix) - assert cancel_calls == [True], "future.cancel() must fire on TimeoutError" - # 2. The standalone fallback delivered — no double send, no silent drop + # 1. cancel() was attempted (returned False = in flight). + assert cancel_calls == [True], "future.cancel() should be attempted on TimeoutError" + # 2. Delivery is reported successful (no error string returned). assert result is None, f"expected successful delivery, got error: {result!r}" + # 3. The standalone fallback must NOT run — that is the #38922 fix: + # an in-flight confirmation timeout is assume-delivered, not a resend. + standalone_send.assert_not_awaited() + + def test_live_adapter_timeout_before_dispatch_falls_back_to_standalone(self): + """When the coroutine never started (loop wedged) — future.cancel() + returns True — nothing was sent, so _deliver_result MUST fall through + to the standalone path rather than silently dropping the message. + This is the inverse of the assume-delivered case and guards against the + wedged-loop silent drop.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + captured_future = Future() + cancel_calls = [] + + def never_dispatched_cancel(): + cancel_calls.append(True) + return True # callback never ran — successfully cancelled + + captured_future.cancel = never_dispatched_cancel + captured_future.result = MagicMock(side_effect=TimeoutError("timed out")) + + def fake_run_coro(coro, _loop): + coro.close() + return captured_future + + job = { + "id": "timeout-undispatched-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + + standalone_send = AsyncMock(return_value={"success": True}) + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + assert cancel_calls == [True], "future.cancel() should be attempted" + # The standalone path MUST run — the message was never sent. standalone_send.assert_awaited_once() + assert result is None, f"standalone should have delivered, got: {result!r}" + + def test_live_adapter_real_exception_falls_back_to_standalone(self): + """A non-timeout send Exception (real failure, not a slow confirmation) + must fall through to the standalone path so the message is still + delivered. Guards the `except Exception: raise` branch — the bug class + where broadening the timeout handler to swallow all exceptions would + silently drop messages.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + captured_future = Future() + captured_future.result = MagicMock(side_effect=RuntimeError("adapter exploded")) + + def fake_run_coro(coro, _loop): + coro.close() + return captured_future + + job = { + "id": "send-error-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + + standalone_send = AsyncMock(return_value={"success": True}) + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + # A real exception must NOT be assume-delivered: standalone runs. + standalone_send.assert_awaited_once() + assert result is None, f"standalone should have delivered, got: {result!r}" def test_live_adapter_thread_fallback_records_delivery_error(self): """A cron target with an explicit topic must not be marked clean if @@ -2833,6 +2949,123 @@ class TestDeliverResultTimeoutCancelsFuture: ) +class TestDeliverResultLiveAdapterUnconfirmed: + """Regression for #47056. + + When a live adapter's send() returns ``None`` (swallowed exception / busy + platform) or a result object that lacks an explicit ``success`` attribute + (bare dict / partial object), the scheduler must NOT log "delivered via + live adapter" and silently drop the message. Every unconfirmed shape must + fall through to the standalone delivery path so the message actually + arrives. The pre-fix check ``send_result is None or not getattr(..., + "success", True)`` let a ``.success``-less object default to True = silent + success. + """ + + def _run(self, send_value): + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = send_value + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + completed_future = Future() + completed_future.set_result(send_value) + + def fake_run_coro(coro, _loop): + coro.close() + return completed_future + + job = { + "id": "unconfirmed-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + + standalone_send = AsyncMock(return_value={"success": True}) + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + return result, standalone_send + + def test_none_result_falls_through_to_standalone(self): + """send() returning None must trigger the standalone fallback, not a + silent "delivered" log.""" + result, standalone_send = self._run(None) + assert result is None, f"standalone should have delivered, got: {result!r}" + standalone_send.assert_awaited_once() + + def test_result_missing_success_attr_falls_through(self): + """A result object with no ``success`` attribute is a contract + violation and must NOT be counted as delivered (it defaulted to True + before the fix).""" + class _NoSuccess: + pass + + result, standalone_send = self._run(_NoSuccess()) + assert result is None, f"standalone should have delivered, got: {result!r}" + standalone_send.assert_awaited_once() + + def test_confirmed_success_does_not_fall_through(self): + """A genuine SendResult(success=True) is confirmed — the standalone + path must NOT run (no duplicate).""" + result, standalone_send = self._run(MagicMock(success=True, raw_response=None)) + assert result is None + standalone_send.assert_not_awaited() + + +class TestDeliverOriginUnresolvableIsLocal: + """Regression for #43014. + + A cron job created in a CLI session has no {platform, chat_id} origin. + With ``deliver=origin`` (or auto-detect / deliver=None) and no configured + platform home channel, delivery is unresolvable — but that is the EXPECTED + state for CLI jobs, not an error. _deliver_result must return None (treat + as local; output stays in last_output), not the "no delivery target + resolved" error string that previously fired on every run. + """ + + def _deliver(self, job, monkeypatch): + import cron.scheduler as sched + # No home channel for any platform → origin is unresolvable. + monkeypatch.setattr(sched, "_get_home_target_chat_id", lambda *_: "") + return _deliver_result(job, "CLI bulletin") + + def test_origin_with_no_home_channels_returns_none(self, monkeypatch): + job = {"id": "cli-job", "deliver": "origin", "origin": "cli-session-provenance"} + assert self._deliver(job, monkeypatch) is None + + def test_omitted_deliver_autodetect_returns_none(self, monkeypatch): + # deliver key present but None (auto-detect) previously errored with + # "no delivery target resolved for deliver=None". + job = {"id": "cli-job", "deliver": None, "origin": "cli-session-provenance"} + assert self._deliver(job, monkeypatch) is None + + def test_explicit_platform_with_no_channel_still_errors(self, monkeypatch): + # A concrete platform target that cannot resolve is still a real error + # (this must NOT be silently swallowed by the origin→local fallback). + job = {"id": "tg-job", "deliver": "telegram"} + result = self._deliver(job, monkeypatch) + assert result is not None + assert "no delivery target resolved" in result + + class TestSendMediaTimeoutCancelsFuture: """Same orphan-coroutine guarantee for _send_media_via_adapter's future.result(timeout=30) call. If this times out mid-batch, the