From b177d4ee4891aeb3bf8a908f6e5def846a55c180 Mon Sep 17 00:00:00 2001 From: Victor Kyriazakos Date: Thu, 25 Jun 2026 00:35:54 +0300 Subject: [PATCH] fix(cron): mirror continuable cron as a labelled user turn (alternation-safe) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review on #51077 (kxee). The continuable-cron mirror reused gateway.mirror.mirror_to_session, which writes role=assistant — re- introducing the exact alternation violation #2313 (37a997945) deliberately removed: a cron brief landing as assistant after the agent's last turn yields assistant->assistant, which breaks strict- alternation providers (OpenAI/OpenRouter) per issue #2221. The mirror/ mirror_source metadata is also dropped at the SQLite boundary, so the [Delivered from cron] label is lost on replay. This is an intentional, opt-in (default OFF) reversal of #2313's 'cron output does not belong in interactive history' for the reply-to- cron use case — gated behind cron.mirror_delivery / attach_to_session. Fixes: - mirror_to_session gains a role param (default 'assistant' — interactive send_message mirror unchanged, it IS the agent speaking). Cron paths pass role='user' with a '[Cron delivery: ]' prefix so the brief collapses via repair_message_sequence's consecutive-user merge on every provider, and stays distinguishable on replay despite the metadata drop. - thread_seeded: defer seeding + the flag until delivery into the new thread actually succeeds. Previously set pre-delivery, so an open- succeeds / deliver-fails case both stranded a seeded-but-unseen brief AND suppressed the DM-fallback mirror. - seed mirror now passes user_id='system:cron' to resolve the exact thread-keyed session row it just created. - dedupe the duplicate BasePlatformAdapter import in _deliver_result. - trim oversized docstrings to non-obvious WHY (AGENTS.md). - docs: document cron.mirror_delivery / attach_to_session in website/docs/user-guide/features/cron.md. - test: assert the cron mirror writes role='user' with the label prefix. 204 cron+mirror tests pass. --- cron/scheduler.py | 80 ++++++++++++++---------- gateway/mirror.py | 14 ++++- tests/cron/test_scheduler.py | 25 +++++++- website/docs/user-guide/features/cron.md | 33 ++++++++++ 4 files changed, 116 insertions(+), 36 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 07eac884c9b..9e05c902485 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -444,13 +444,22 @@ def _maybe_mirror_cron_delivery( try: from gateway.mirror import mirror_to_session + # Mirror as a USER turn with a labelled prefix, NOT an assistant turn. + # The brief is not the agent speaking; an assistant-role mirror lands as + # assistant→assistant after the agent's last turn and breaks strict + # alternation (issue #2221, the exact failure #2313 removed). A + # user-role turn collapses safely via repair_message_sequence's + # consecutive-user merge on every provider, and the prefix preserves the + # "this came from cron" context that the dropped SQLite mirror metadata + # would otherwise lose on replay. ok = mirror_to_session( platform_name, str(chat_id), - text, + f"[Cron delivery: {job.get('name') or job.get('id', 'cron')}]\n{text}", source_label="cron", thread_id=thread_id, user_id=user_id, + role="user", ) if ok: logger.info( @@ -476,22 +485,13 @@ def _open_continuable_cron_thread( chat_id: str, loop, ) -> Optional[str]: - """Open a dedicated thread for a continuable cron job, thread-preferred. + """Open a dedicated thread for a continuable cron job (thread-preferred). - On a thread-capable platform (Telegram forum/DM topics, Discord threads, - Slack threads) this asks the adapter to create a fresh thread under - ``chat_id`` so the job's brief — and the user's replies to it — live in - their own scrollback, isolated from the parent channel. This is the - "continuable cron job opens its own thread" interface. - - Returns the new ``thread_id`` (str) on success, or ``None`` when the - platform does not support threads (WhatsApp / Signal / SMS / DM-only) or - creation failed. A ``None`` return is the signal for the caller to fall - back to mirroring into the origin DM session instead — same fallback shape - the gateway's session-handoff watcher uses (``_process_handoff``). - - Reuses the shipped ``adapter.create_handoff_thread`` primitive; introduces - no new adapter surface. + Returns the new ``thread_id`` on success, or ``None`` when the platform has + no thread primitive (WhatsApp/Signal/SMS) or creation failed — the ``None`` + return is the caller's signal to fall back to the origin-DM mirror, the same + open-thread-or-fallback shape as ``GatewayRunner._process_handoff``. Reuses + the shipped ``adapter.create_handoff_thread``; no new adapter surface. """ create_thread = getattr(adapter, "create_handoff_thread", None) if not callable(create_thread) or loop is None: @@ -568,12 +568,19 @@ def _seed_cron_thread_session( from gateway.mirror import mirror_to_session + # User-role + labelled prefix (see _maybe_mirror_cron_delivery): the + # seeded brief must not read as an assistant turn, or the user's first + # in-thread reply produces assistant→user→... off a phantom assistant + # message. Pass the seed user_id so the mirror resolves the exact + # thread-keyed session row we just created. mirror_to_session( platform_name, str(chat_id), - text, + f"[Cron delivery: {job.get('name') or job.get('id', 'cron')}]\n{text}", source_label="cron", thread_id=str(thread_id), + user_id="system:cron", + role="user", ) logger.info( "Job '%s': opened continuable thread %s on %s:%s and seeded the brief", @@ -1066,26 +1073,24 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option else: delivery_content = content + # Extract MEDIA: tags so attachments are forwarded as files, not raw text + from gateway.platforms.base import BasePlatformAdapter + media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content) + media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files) + # Resolve the delivery-mirror gate ONCE (default off). When on, each # successful delivery is also appended to the target chat's gateway session # transcript so a user reply in that chat sees the cron output in context. - # We mirror the CLEAN, unwrapped agent output (not the cron header/footer) — - # that is what the agent "said" in the conversation. + # Mirror the CLEAN, unwrapped output (not the cron header/footer). try: mirror_enabled = _cron_mirror_delivery_enabled(job, user_cfg) except Exception: mirror_enabled = False mirror_text = "" if mirror_enabled: - from gateway.platforms.base import BasePlatformAdapter as _BPA - _, mirror_text = _BPA.extract_media(content) + _, mirror_text = BasePlatformAdapter.extract_media(content) mirror_text = (mirror_text or "").strip() - # Extract MEDIA: tags so attachments are forwarded as files, not raw text - from gateway.platforms.base import BasePlatformAdapter - media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content) - media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files) - try: config = load_gateway_config() except Exception as e: @@ -1157,6 +1162,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option # create_handoff_thread returns None and we fall back to mirroring into # the origin DM session (handled after delivery). Cf. _process_handoff. thread_seeded = False + opened_thread_id: Optional[str] = None if ( mirror_this_target and runtime_adapter is not None @@ -1167,14 +1173,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option job, runtime_adapter, chat_id, loop, ) if new_thread_id: - # Route this delivery into the new thread and seed its session. + # Route THIS delivery into the new thread now (the send needs the + # thread_id), but defer seeding the thread session until the + # delivery actually succeeds — otherwise an open-succeeds / + # deliver-fails case leaves a seeded brief the user never saw, + # and (worse) suppresses the DM-fallback mirror via thread_seeded. thread_id = new_thread_id - _seed_cron_thread_session( - job, runtime_adapter, platform_name, chat_id, - new_thread_id, mirror_text, - chat_name=origin.get("chat_name"), - ) - thread_seeded = True + opened_thread_id = new_thread_id if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): # Telegram three-mode topic routing (#22773): a private chat @@ -1388,6 +1393,15 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option if adapter_ok: logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) delivered = True + # Seed the thread session only now that delivery into it + # succeeded (deferred from thread-open above). + if opened_thread_id and not thread_seeded: + _seed_cron_thread_session( + job, runtime_adapter, platform_name, chat_id, + opened_thread_id, mirror_text, + chat_name=origin.get("chat_name"), + ) + thread_seeded = True _maybe_mirror_cron_delivery( job, platform_name, chat_id, mirror_text, thread_id=thread_id, user_id=origin_user_id, diff --git a/gateway/mirror.py b/gateway/mirror.py index b90a421b9c3..164e371794d 100644 --- a/gateway/mirror.py +++ b/gateway/mirror.py @@ -29,6 +29,7 @@ def mirror_to_session( source_label: str = "cli", thread_id: Optional[str] = None, user_id: Optional[str] = None, + role: str = "assistant", ) -> bool: """ Append a delivery-mirror message to the target session's transcript. @@ -36,6 +37,17 @@ def mirror_to_session( Finds the gateway session that matches the given platform + chat_id, then writes a mirror entry to both the JSONL transcript and SQLite DB. + ``role`` defaults to ``"assistant"`` — correct for the interactive + ``send_message`` mirror, where the mirrored text is the agent's own + outgoing reply (a genuine assistant turn). Callers mirroring text that is + NOT the agent speaking — e.g. a cron brief delivered out-of-band — must + pass ``role="user"``: the ``mirror``/``mirror_source`` metadata is dropped + at the SQLite boundary (only role+content persist), so on replay an + assistant-role mirror is indistinguishable from a real assistant turn and + produces ``assistant → assistant`` pairs that break strict-alternation + providers (issue #2221). A user-role mirror collapses safely via + ``repair_message_sequence``'s consecutive-user merge on every provider. + Returns True if mirrored successfully, False if no matching session or error. All errors are caught -- this is never fatal. """ @@ -57,7 +69,7 @@ def mirror_to_session( return False mirror_msg = { - "role": "assistant", + "role": role, "content": message_text, "timestamp": datetime.now().isoformat(), "mirror": True, diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index f62c74974f3..06bbfbf047c 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -3526,8 +3526,8 @@ class TestCronDeliveryMirror: with patch("gateway.mirror.mirror_to_session", return_value=True) as m: _maybe_mirror_cron_delivery( - {"id": "j1"}, "telegram", "123", "Daily brief Task #2", - thread_id=None, enabled=True, + {"id": "j1", "name": "Daily Brief"}, "telegram", "123", + "Daily brief Task #2", thread_id=None, enabled=True, ) m.assert_called_once() args, kwargs = m.call_args @@ -3536,6 +3536,27 @@ class TestCronDeliveryMirror: assert "Task #2" in args[2] assert kwargs.get("source_label") == "cron" + def test_mirror_writes_user_role_with_label_not_assistant(self): + """Regression for #2221 / #2313: the cron brief must mirror as a USER + turn (with a [Cron delivery: ...] label), NOT assistant — an + assistant-role mirror lands as assistant->assistant after the agent's + last turn and breaks strict alternation on non-Anthropic providers.""" + from cron.scheduler import _maybe_mirror_cron_delivery + + with patch("gateway.mirror.mirror_to_session", return_value=True) as m: + _maybe_mirror_cron_delivery( + {"id": "j1", "name": "Morning Brief"}, "telegram", "123", + "Market movers today", thread_id=None, enabled=True, + ) + m.assert_called_once() + args, kwargs = m.call_args + assert kwargs.get("role") == "user", "cron mirror must be a user turn, not assistant" + # The brief text is prefixed with a human-readable cron-delivery label + # so replay (where the mirror metadata is dropped at the SQLite + # boundary) still distinguishes it from a genuine user message. + assert args[2].startswith("[Cron delivery: Morning Brief]") + assert "Market movers today" in args[2] + def test_mirror_noop_when_disabled(self): from cron.scheduler import _maybe_mirror_cron_delivery diff --git a/website/docs/user-guide/features/cron.md b/website/docs/user-guide/features/cron.md index c3444f98d38..16da42ae22b 100644 --- a/website/docs/user-guide/features/cron.md +++ b/website/docs/user-guide/features/cron.md @@ -298,6 +298,39 @@ cron: wrap_response: false ``` +### Continuable jobs (reply to a cron delivery) + +By default a cron delivery is fire-and-forget: the message is sent, but it does +not live in the chat's conversation history, so if you reply to it the agent +has no record of what it said. Set a job **continuable** and the delivered brief +becomes a conversation you can reply into — the agent has the brief in context +instead of asking "what is Task #2?". + +Opt-in, **default off**. Enable globally in config, or per-job via the `cronjob` +tool's `attach_to_session` (which overrides the global setting for that one job): + +```yaml +# ~/.hermes/config.yaml +cron: + mirror_delivery: false # set true to make cron deliveries continuable +``` + +Behaviour is **thread-preferred**, scoped to the job's origin chat: + +- **Thread-capable platforms** (Telegram topics, Discord/Slack threads): each + delivery opens its own dedicated thread and the brief is seeded into that + thread's session, so a reply in-thread continues with full context. A + recurring job (e.g. a daily brief) opens a fresh thread per run, keeping each + delivery's follow-up discussion isolated. +- **DM-only platforms** (WhatsApp, Signal, SMS): no threads exist, so the brief + is mirrored into the origin DM session instead — the DM itself is the + continuation surface. + +Only the origin chat is ever touched: fan-out / broadcast targets (`all`, +explicit other-chat deliveries) are never made continuable. The mirror is +written as a labelled user turn (`[Cron delivery: ]`), which keeps +the conversation history alternation-safe across all model providers. + ### Silent suppression If the agent's final response contains `[SILENT]`, delivery is suppressed entirely. The output is still saved locally for audit (in `~/.hermes/cron/output/`), but no message is sent to the delivery target.