From 1b181724fae7799cf45dec3aca03ac44063175e7 Mon Sep 17 00:00:00 2001 From: Victor Kyriazakos Date: Mon, 22 Jun 2026 18:26:08 +0300 Subject: [PATCH] feat(cron): optional mirror of cron delivery into target chat session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in path so a cron job's delivered output is also appended to the TARGET chat's gateway session transcript (as an assistant turn), so a user reply to a recurring delivery (daily brief, reminder) is answered with the delivery in context instead of 'what is that?' amnesia. - Reuses the shipped gateway.mirror.mirror_to_session — the same primitive interactive send_message mirroring already uses. No messaging-toolset change (cron still can't call send_message; this rides delivery). - Gated: per-job attach_to_session overrides global cron.mirror_delivery (config.yaml). Default OFF — historical isolation preserved byte-for-byte. - Mirrors the CLEAN agent output, not the cron header/footer wrapper. - Alternation/cache-safe: append lands at a turn boundary, never mid-loop, never mutates the cached system prompt. Cold-start (no target session) is a silent no-op; mirror errors never fail a successful delivery. - Surfaced on the cronjob tool (attach_to_session) + config schema. Driven by enterprise cron-as-control-plane use case. 10 new tests; full cron + cronjob-tool suites pass (600). --- cron/jobs.py | 7 ++ cron/scheduler.py | 107 +++++++++++++++++++++++++++ hermes_cli/config.py | 11 +++ tests/cron/test_scheduler.py | 139 +++++++++++++++++++++++++++++++++++ tools/cronjob_tools.py | 8 ++ 5 files changed, 272 insertions(+) diff --git a/cron/jobs.py b/cron/jobs.py index 9a78b8e9e3a..a3e5381c772 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -788,6 +788,7 @@ def create_job( enabled_toolsets: Optional[List[str]] = None, workdir: Optional[str] = None, no_agent: bool = False, + attach_to_session: Optional[bool] = None, ) -> Dict[str, Any]: """ Create a new cron job. @@ -866,6 +867,7 @@ def create_job( normalized_toolsets = normalized_toolsets or None normalized_workdir = _normalize_workdir(workdir) normalized_no_agent = bool(no_agent) + normalized_attach = attach_to_session if isinstance(attach_to_session, bool) else None # no_agent jobs are meaningless without a script — the script IS the job. # Surface this as a clear ValueError at create time so bad configs never @@ -966,6 +968,11 @@ def create_job( "enabled_toolsets": normalized_toolsets, "workdir": normalized_workdir, } + # Only persist attach_to_session when explicitly set, so existing jobs and + # the common case stay byte-identical (absent key => fall back to the + # global cron.mirror_delivery config, default off). + if normalized_attach is not None: + job["attach_to_session"] = normalized_attach with _jobs_lock(): jobs = load_jobs() diff --git a/cron/scheduler.py b/cron/scheduler.py index 2d9d7761e71..83bddb87628 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -344,6 +344,89 @@ def _resolve_origin(job: dict) -> Optional[dict]: return None +def _cron_mirror_delivery_enabled(job: dict, cfg: Optional[dict] = None) -> bool: + """Whether a cron delivery should also be mirrored into the target chat's + gateway session transcript. + + Default OFF — preserves the historical isolation guarantee (cron deliveries + live only in the cron job's own session, never the target chat's history) + byte-for-byte for everyone who does not opt in. + + Precedence (first decisive value wins): + 1. Per-job ``attach_to_session`` (bool) — set via the ``cronjob`` tool, + lets one briefing job opt in without flipping global behaviour. + 2. Global ``cron.mirror_delivery`` (bool) in config.yaml. + 3. False. + + When enabled, the cron's final output is appended to the target session as + an assistant turn via the existing ``gateway.mirror.mirror_to_session`` — + the same primitive ``send_message`` uses — so the next user reply in that + chat sees the brief in context (no "what is Task #2?" amnesia). This is + alternation- and cache-safe: the append lands at a turn boundary between + user turns, never mid-loop, and never mutates the cached system prompt. + """ + per_job = job.get("attach_to_session") + if isinstance(per_job, bool): + return per_job + try: + if cfg is None: + cfg = load_config() or {} + return bool((cfg.get("cron", {}) or {}).get("mirror_delivery", False)) + except Exception: + return False + + +def _maybe_mirror_cron_delivery( + job: dict, + platform_name: str, + chat_id: str, + mirror_text: str, + thread_id: Optional[str] = None, + *, + enabled: bool = False, +) -> None: + """Best-effort mirror of a cron delivery into the target chat's session. + + No-op unless ``enabled`` (resolved once by the caller). Reuses the shipped + ``mirror_to_session`` so cron rides exactly the same path that interactive + ``send_message`` mirroring already uses. All failures are swallowed — a + delivery that succeeded must never be reported as failed because the + transcript mirror could not find a session (the cold-start case: the target + chat has no gateway session yet, so there is nothing to mirror into). + """ + if not enabled: + return + text = (mirror_text or "").strip() + if not text: + return + try: + from gateway.mirror import mirror_to_session + + ok = mirror_to_session( + platform_name, + str(chat_id), + text, + source_label="cron", + thread_id=thread_id, + ) + if ok: + logger.info( + "Job '%s': mirrored delivery into %s:%s session transcript", + job.get("id", "?"), platform_name, chat_id, + ) + else: + logger.debug( + "Job '%s': delivery mirror skipped for %s:%s " + "(no matching gateway session — cold start)", + job.get("id", "?"), platform_name, chat_id, + ) + except Exception as e: + logger.debug( + "Job '%s': delivery mirror failed for %s:%s: %s", + job.get("id", "?"), platform_name, chat_id, e, + ) + + def _cron_job_origin_log_suffix(job: dict) -> str: """Return safe provenance details for security warnings about a cron job. @@ -804,6 +887,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option # is a cron delivery. Wrapping is on by default; set cron.wrap_response: false # in config.yaml for clean output. wrap_response = True + user_cfg = None try: user_cfg = load_config() wrap_response = user_cfg.get("cron", {}).get("wrap_response", True) @@ -823,6 +907,21 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option else: delivery_content = content + # Resolve the delivery-mirror gate ONCE (default off). When on, each + # successful delivery is also appended to the target chat's gateway session + # transcript so a user reply in that chat sees the cron output in context. + # We mirror the CLEAN, unwrapped agent output (not the cron header/footer) — + # that is what the agent "said" in the conversation. + try: + mirror_enabled = _cron_mirror_delivery_enabled(job, user_cfg) + except Exception: + mirror_enabled = False + mirror_text = "" + if mirror_enabled: + from gateway.platforms.base import BasePlatformAdapter as _BPA + _, mirror_text = _BPA.extract_media(content) + mirror_text = (mirror_text or "").strip() + # Extract MEDIA: tags so attachments are forwarded as files, not raw text from gateway.platforms.base import BasePlatformAdapter media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content) @@ -1091,6 +1190,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option if adapter_ok: logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) delivered = True + _maybe_mirror_cron_delivery( + job, platform_name, chat_id, mirror_text, + thread_id=thread_id, enabled=mirror_enabled, + ) except Exception as e: err_msg = f"live adapter delivery to {platform_name}:{chat_id} failed: {e}" if not any(err_msg in err for err in target_errors): @@ -1129,6 +1232,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option continue logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) + _maybe_mirror_cron_delivery( + job, platform_name, chat_id, mirror_text, + thread_id=thread_id, enabled=mirror_enabled, + ) if delivery_errors: return "; ".join(delivery_errors) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index d250f25bd0d..e8263038a90 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2381,6 +2381,17 @@ DEFAULT_CONFIG = { # Wrap delivered cron responses with a header (task name) and footer # ("The agent cannot see this message"). Set to false for clean output. "wrap_response": True, + # Mirror each cron delivery into the TARGET chat's gateway session + # transcript (as an assistant turn), so a user reply in that chat sees + # the cron output in context instead of "what is Task #2?" amnesia. + # Default False preserves the historical isolation guarantee (cron + # deliveries live only in the cron job's own session). Per-job + # `attach_to_session` overrides this for a single job. Rides the same + # gateway.mirror.mirror_to_session path interactive send_message uses; + # alternation- and cache-safe (appended at a turn boundary, never + # mid-loop, never mutates the cached system prompt). When the target + # chat has no session yet (cold start) the mirror is a silent no-op. + "mirror_delivery": False, # Maximum number of due jobs to run in parallel per tick. # null/0 = unbounded (limited only by thread count). # 1 = serial (pre-v0.9 behaviour). diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index f766d4474f3..81b19b12858 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -3486,3 +3486,142 @@ class TestHomeTargetEnvVarRegistry: from cron.scheduler import _HOME_TARGET_ENV_VARS assert _HOME_TARGET_ENV_VARS.get("whatsapp") == "WHATSAPP_HOME_CHANNEL" + + +class TestCronDeliveryMirror: + """cron.mirror_delivery / per-job attach_to_session: opt-in append of a + cron delivery into the target chat's gateway session transcript. + + Default OFF preserves the historical isolation guarantee byte-for-byte. + When enabled, delivery rides the existing gateway.mirror.mirror_to_session + so cron uses exactly the same path interactive send_message mirroring uses. + """ + + def test_gate_default_off(self): + from cron.scheduler import _cron_mirror_delivery_enabled + + # No per-job flag, no config -> off (historical behaviour). + assert _cron_mirror_delivery_enabled({}, {}) is False + assert _cron_mirror_delivery_enabled({"id": "x"}, {"cron": {}}) is False + + def test_gate_global_config_on(self): + from cron.scheduler import _cron_mirror_delivery_enabled + + assert _cron_mirror_delivery_enabled({}, {"cron": {"mirror_delivery": True}}) is True + + def test_gate_per_job_overrides_global(self): + from cron.scheduler import _cron_mirror_delivery_enabled + + # Per-job False wins even if global is on. + assert _cron_mirror_delivery_enabled( + {"attach_to_session": False}, {"cron": {"mirror_delivery": True}} + ) is False + # Per-job True wins even if global is off/absent. + assert _cron_mirror_delivery_enabled( + {"attach_to_session": True}, {"cron": {"mirror_delivery": False}} + ) is True + + def test_mirror_calls_mirror_to_session_when_enabled(self): + from cron.scheduler import _maybe_mirror_cron_delivery + + with patch("gateway.mirror.mirror_to_session", return_value=True) as m: + _maybe_mirror_cron_delivery( + {"id": "j1"}, "telegram", "123", "Daily brief Task #2", + thread_id=None, enabled=True, + ) + m.assert_called_once() + args, kwargs = m.call_args + assert args[0] == "telegram" + assert args[1] == "123" + assert "Task #2" in args[2] + assert kwargs.get("source_label") == "cron" + + def test_mirror_noop_when_disabled(self): + from cron.scheduler import _maybe_mirror_cron_delivery + + with patch("gateway.mirror.mirror_to_session", return_value=True) as m: + _maybe_mirror_cron_delivery( + {"id": "j1"}, "telegram", "123", "should not mirror", + enabled=False, + ) + m.assert_not_called() + + def test_mirror_noop_on_empty_text(self): + from cron.scheduler import _maybe_mirror_cron_delivery + + with patch("gateway.mirror.mirror_to_session", return_value=True) as m: + _maybe_mirror_cron_delivery({"id": "j1"}, "telegram", "123", " ", enabled=True) + m.assert_not_called() + + def test_mirror_swallows_cold_start_miss(self): + """A missing target session (cold start) must NOT raise — delivery + already succeeded; the mirror is best-effort.""" + from cron.scheduler import _maybe_mirror_cron_delivery + + with patch("gateway.mirror.mirror_to_session", return_value=False) as m: + # Should not raise. + _maybe_mirror_cron_delivery( + {"id": "j1"}, "telegram", "123", "brief", enabled=True + ) + m.assert_called_once() + + def test_mirror_swallows_exceptions(self): + from cron.scheduler import _maybe_mirror_cron_delivery + + with patch("gateway.mirror.mirror_to_session", side_effect=RuntimeError("boom")): + # Must not propagate — a delivery that succeeded is never failed by + # a mirror error. + _maybe_mirror_cron_delivery( + {"id": "j1"}, "telegram", "123", "brief", enabled=True + ) + + def test_delivery_mirrors_clean_content_not_wrapped(self): + """When enabled, the mirror receives the CLEAN agent output, not the + cron header/footer-wrapped delivery text.""" + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \ + patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock: + job = { + "id": "test-job", + "name": "daily-report", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + "attach_to_session": True, + } + _deliver_result(job, "Here is today's summary.") + + mirror_mock.assert_called_once() + mirrored_text = mirror_mock.call_args[0][2] + # Clean content, no cron wrapper. + assert "Here is today's summary." in mirrored_text + assert "Cronjob Response:" not in mirrored_text + assert "To stop or manage this job" not in mirrored_text + + def test_delivery_does_not_mirror_when_gate_off(self): + """Default path: a job with no opt-in must never touch the mirror.""" + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \ + patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock: + job = { + "id": "test-job", + "name": "daily-report", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + _deliver_result(job, "Here is today's summary.") + + mirror_mock.assert_not_called() diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 2af43c38169..697c25e0696 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -576,6 +576,7 @@ def cronjob( enabled_toolsets: Optional[List[str]] = None, workdir: Optional[str] = None, no_agent: Optional[bool] = None, + attach_to_session: Optional[bool] = None, task_id: str = None, ) -> str: """Unified cron job management tool.""" @@ -642,6 +643,7 @@ def cronjob( enabled_toolsets=enabled_toolsets or None, workdir=_normalize_optional_job_value(workdir), no_agent=_no_agent, + attach_to_session=attach_to_session, ) _notify_provider_jobs_changed_safe() _create_message = f"Cron job '{job['name']}' created." @@ -794,6 +796,8 @@ def cronjob( updates["context_from"] = refs or None if enabled_toolsets is not None: updates["enabled_toolsets"] = enabled_toolsets or None + if attach_to_session is not None: + updates["attach_to_session"] = bool(attach_to_session) if workdir is not None: # Empty string clears the field (restores old behaviour); # otherwise pass raw — update_job() validates / normalizes. @@ -952,6 +956,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr "type": "string", "description": "Optional absolute path to run the job from. When set, AGENTS.md / CLAUDE.md / .cursorrules from that directory are injected into the system prompt, and the terminal/file/code_exec tools use it as their working directory — useful for running a job inside a specific project repo. Must be an absolute path that exists. When unset (default), preserves the original behaviour: no project context files, tools use the scheduler's cwd. On update, pass an empty string to clear. Jobs with workdir run sequentially (not parallel) to keep per-job directories isolated." }, + "attach_to_session": { + "type": "boolean", + "description": "When True, this job's delivered output is also mirrored into the TARGET chat's conversation history (as an assistant turn), so when the user replies to the delivery the agent sees it in context instead of asking 'what is that?'. Use this for conversational recurring jobs the user will reply to — daily briefings, reminders that kick off follow-up work, anything where the cron output should be part of the ongoing chat. Leave unset for fire-and-forget alerts/watchdogs. Overrides the global cron.mirror_delivery config for this one job. No effect when deliver='local'. If the target chat has no conversation yet, the mirror is a silent no-op." + }, }, "required": ["action"] }