feat(cron): optional mirror of cron delivery into target chat session

Adds an opt-in path so a cron job's delivered output is also appended to
the TARGET chat's gateway session transcript (as an assistant turn), so a
user reply to a recurring delivery (daily brief, reminder) is answered with
the delivery in context instead of 'what is that?' amnesia.

- Reuses the shipped gateway.mirror.mirror_to_session — the same primitive
  interactive send_message mirroring already uses. No messaging-toolset
  change (cron still can't call send_message; this rides delivery).
- Gated: per-job attach_to_session overrides global cron.mirror_delivery
  (config.yaml). Default OFF — historical isolation preserved byte-for-byte.
- Mirrors the CLEAN agent output, not the cron header/footer wrapper.
- Alternation/cache-safe: append lands at a turn boundary, never mid-loop,
  never mutates the cached system prompt. Cold-start (no target session)
  is a silent no-op; mirror errors never fail a successful delivery.
- Surfaced on the cronjob tool (attach_to_session) + config schema.

Driven by enterprise cron-as-control-plane use case. 10 new tests; full
cron + cronjob-tool suites pass (600).
This commit is contained in:
Victor Kyriazakos 2026-06-22 18:26:08 +03:00 committed by Teknium
parent 532b7ed408
commit 1b181724fa
5 changed files with 272 additions and 0 deletions

View file

@ -788,6 +788,7 @@ def create_job(
enabled_toolsets: Optional[List[str]] = None,
workdir: Optional[str] = None,
no_agent: bool = False,
attach_to_session: Optional[bool] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@ -866,6 +867,7 @@ def create_job(
normalized_toolsets = normalized_toolsets or None
normalized_workdir = _normalize_workdir(workdir)
normalized_no_agent = bool(no_agent)
normalized_attach = attach_to_session if isinstance(attach_to_session, bool) else None
# no_agent jobs are meaningless without a script — the script IS the job.
# Surface this as a clear ValueError at create time so bad configs never
@ -966,6 +968,11 @@ def create_job(
"enabled_toolsets": normalized_toolsets,
"workdir": normalized_workdir,
}
# Only persist attach_to_session when explicitly set, so existing jobs and
# the common case stay byte-identical (absent key => fall back to the
# global cron.mirror_delivery config, default off).
if normalized_attach is not None:
job["attach_to_session"] = normalized_attach
with _jobs_lock():
jobs = load_jobs()

View file

@ -344,6 +344,89 @@ def _resolve_origin(job: dict) -> Optional[dict]:
return None
def _cron_mirror_delivery_enabled(job: dict, cfg: Optional[dict] = None) -> bool:
"""Whether a cron delivery should also be mirrored into the target chat's
gateway session transcript.
Default OFF preserves the historical isolation guarantee (cron deliveries
live only in the cron job's own session, never the target chat's history)
byte-for-byte for everyone who does not opt in.
Precedence (first decisive value wins):
1. Per-job ``attach_to_session`` (bool) set via the ``cronjob`` tool,
lets one briefing job opt in without flipping global behaviour.
2. Global ``cron.mirror_delivery`` (bool) in config.yaml.
3. False.
When enabled, the cron's final output is appended to the target session as
an assistant turn via the existing ``gateway.mirror.mirror_to_session``
the same primitive ``send_message`` uses so the next user reply in that
chat sees the brief in context (no "what is Task #2?" amnesia). This is
alternation- and cache-safe: the append lands at a turn boundary between
user turns, never mid-loop, and never mutates the cached system prompt.
"""
per_job = job.get("attach_to_session")
if isinstance(per_job, bool):
return per_job
try:
if cfg is None:
cfg = load_config() or {}
return bool((cfg.get("cron", {}) or {}).get("mirror_delivery", False))
except Exception:
return False
def _maybe_mirror_cron_delivery(
job: dict,
platform_name: str,
chat_id: str,
mirror_text: str,
thread_id: Optional[str] = None,
*,
enabled: bool = False,
) -> None:
"""Best-effort mirror of a cron delivery into the target chat's session.
No-op unless ``enabled`` (resolved once by the caller). Reuses the shipped
``mirror_to_session`` so cron rides exactly the same path that interactive
``send_message`` mirroring already uses. All failures are swallowed a
delivery that succeeded must never be reported as failed because the
transcript mirror could not find a session (the cold-start case: the target
chat has no gateway session yet, so there is nothing to mirror into).
"""
if not enabled:
return
text = (mirror_text or "").strip()
if not text:
return
try:
from gateway.mirror import mirror_to_session
ok = mirror_to_session(
platform_name,
str(chat_id),
text,
source_label="cron",
thread_id=thread_id,
)
if ok:
logger.info(
"Job '%s': mirrored delivery into %s:%s session transcript",
job.get("id", "?"), platform_name, chat_id,
)
else:
logger.debug(
"Job '%s': delivery mirror skipped for %s:%s "
"(no matching gateway session — cold start)",
job.get("id", "?"), platform_name, chat_id,
)
except Exception as e:
logger.debug(
"Job '%s': delivery mirror failed for %s:%s: %s",
job.get("id", "?"), platform_name, chat_id, e,
)
def _cron_job_origin_log_suffix(job: dict) -> str:
"""Return safe provenance details for security warnings about a cron job.
@ -804,6 +887,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
# is a cron delivery. Wrapping is on by default; set cron.wrap_response: false
# in config.yaml for clean output.
wrap_response = True
user_cfg = None
try:
user_cfg = load_config()
wrap_response = user_cfg.get("cron", {}).get("wrap_response", True)
@ -823,6 +907,21 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
else:
delivery_content = content
# Resolve the delivery-mirror gate ONCE (default off). When on, each
# successful delivery is also appended to the target chat's gateway session
# transcript so a user reply in that chat sees the cron output in context.
# We mirror the CLEAN, unwrapped agent output (not the cron header/footer) —
# that is what the agent "said" in the conversation.
try:
mirror_enabled = _cron_mirror_delivery_enabled(job, user_cfg)
except Exception:
mirror_enabled = False
mirror_text = ""
if mirror_enabled:
from gateway.platforms.base import BasePlatformAdapter as _BPA
_, mirror_text = _BPA.extract_media(content)
mirror_text = (mirror_text or "").strip()
# Extract MEDIA: tags so attachments are forwarded as files, not raw text
from gateway.platforms.base import BasePlatformAdapter
media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
@ -1091,6 +1190,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
if adapter_ok:
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
delivered = True
_maybe_mirror_cron_delivery(
job, platform_name, chat_id, mirror_text,
thread_id=thread_id, enabled=mirror_enabled,
)
except Exception as e:
err_msg = f"live adapter delivery to {platform_name}:{chat_id} failed: {e}"
if not any(err_msg in err for err in target_errors):
@ -1129,6 +1232,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
continue
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
_maybe_mirror_cron_delivery(
job, platform_name, chat_id, mirror_text,
thread_id=thread_id, enabled=mirror_enabled,
)
if delivery_errors:
return "; ".join(delivery_errors)

View file

@ -2381,6 +2381,17 @@ DEFAULT_CONFIG = {
# Wrap delivered cron responses with a header (task name) and footer
# ("The agent cannot see this message"). Set to false for clean output.
"wrap_response": True,
# Mirror each cron delivery into the TARGET chat's gateway session
# transcript (as an assistant turn), so a user reply in that chat sees
# the cron output in context instead of "what is Task #2?" amnesia.
# Default False preserves the historical isolation guarantee (cron
# deliveries live only in the cron job's own session). Per-job
# `attach_to_session` overrides this for a single job. Rides the same
# gateway.mirror.mirror_to_session path interactive send_message uses;
# alternation- and cache-safe (appended at a turn boundary, never
# mid-loop, never mutates the cached system prompt). When the target
# chat has no session yet (cold start) the mirror is a silent no-op.
"mirror_delivery": False,
# Maximum number of due jobs to run in parallel per tick.
# null/0 = unbounded (limited only by thread count).
# 1 = serial (pre-v0.9 behaviour).

View file

@ -3486,3 +3486,142 @@ class TestHomeTargetEnvVarRegistry:
from cron.scheduler import _HOME_TARGET_ENV_VARS
assert _HOME_TARGET_ENV_VARS.get("whatsapp") == "WHATSAPP_HOME_CHANNEL"
class TestCronDeliveryMirror:
"""cron.mirror_delivery / per-job attach_to_session: opt-in append of a
cron delivery into the target chat's gateway session transcript.
Default OFF preserves the historical isolation guarantee byte-for-byte.
When enabled, delivery rides the existing gateway.mirror.mirror_to_session
so cron uses exactly the same path interactive send_message mirroring uses.
"""
def test_gate_default_off(self):
from cron.scheduler import _cron_mirror_delivery_enabled
# No per-job flag, no config -> off (historical behaviour).
assert _cron_mirror_delivery_enabled({}, {}) is False
assert _cron_mirror_delivery_enabled({"id": "x"}, {"cron": {}}) is False
def test_gate_global_config_on(self):
from cron.scheduler import _cron_mirror_delivery_enabled
assert _cron_mirror_delivery_enabled({}, {"cron": {"mirror_delivery": True}}) is True
def test_gate_per_job_overrides_global(self):
from cron.scheduler import _cron_mirror_delivery_enabled
# Per-job False wins even if global is on.
assert _cron_mirror_delivery_enabled(
{"attach_to_session": False}, {"cron": {"mirror_delivery": True}}
) is False
# Per-job True wins even if global is off/absent.
assert _cron_mirror_delivery_enabled(
{"attach_to_session": True}, {"cron": {"mirror_delivery": False}}
) is True
def test_mirror_calls_mirror_to_session_when_enabled(self):
from cron.scheduler import _maybe_mirror_cron_delivery
with patch("gateway.mirror.mirror_to_session", return_value=True) as m:
_maybe_mirror_cron_delivery(
{"id": "j1"}, "telegram", "123", "Daily brief Task #2",
thread_id=None, enabled=True,
)
m.assert_called_once()
args, kwargs = m.call_args
assert args[0] == "telegram"
assert args[1] == "123"
assert "Task #2" in args[2]
assert kwargs.get("source_label") == "cron"
def test_mirror_noop_when_disabled(self):
from cron.scheduler import _maybe_mirror_cron_delivery
with patch("gateway.mirror.mirror_to_session", return_value=True) as m:
_maybe_mirror_cron_delivery(
{"id": "j1"}, "telegram", "123", "should not mirror",
enabled=False,
)
m.assert_not_called()
def test_mirror_noop_on_empty_text(self):
from cron.scheduler import _maybe_mirror_cron_delivery
with patch("gateway.mirror.mirror_to_session", return_value=True) as m:
_maybe_mirror_cron_delivery({"id": "j1"}, "telegram", "123", " ", enabled=True)
m.assert_not_called()
def test_mirror_swallows_cold_start_miss(self):
"""A missing target session (cold start) must NOT raise — delivery
already succeeded; the mirror is best-effort."""
from cron.scheduler import _maybe_mirror_cron_delivery
with patch("gateway.mirror.mirror_to_session", return_value=False) as m:
# Should not raise.
_maybe_mirror_cron_delivery(
{"id": "j1"}, "telegram", "123", "brief", enabled=True
)
m.assert_called_once()
def test_mirror_swallows_exceptions(self):
from cron.scheduler import _maybe_mirror_cron_delivery
with patch("gateway.mirror.mirror_to_session", side_effect=RuntimeError("boom")):
# Must not propagate — a delivery that succeeded is never failed by
# a mirror error.
_maybe_mirror_cron_delivery(
{"id": "j1"}, "telegram", "123", "brief", enabled=True
)
def test_delivery_mirrors_clean_content_not_wrapped(self):
"""When enabled, the mirror receives the CLEAN agent output, not the
cron header/footer-wrapped delivery text."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
"attach_to_session": True,
}
_deliver_result(job, "Here is today's summary.")
mirror_mock.assert_called_once()
mirrored_text = mirror_mock.call_args[0][2]
# Clean content, no cron wrapper.
assert "Here is today's summary." in mirrored_text
assert "Cronjob Response:" not in mirrored_text
assert "To stop or manage this job" not in mirrored_text
def test_delivery_does_not_mirror_when_gate_off(self):
"""Default path: a job with no opt-in must never touch the mirror."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Here is today's summary.")
mirror_mock.assert_not_called()

View file

@ -576,6 +576,7 @@ def cronjob(
enabled_toolsets: Optional[List[str]] = None,
workdir: Optional[str] = None,
no_agent: Optional[bool] = None,
attach_to_session: Optional[bool] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@ -642,6 +643,7 @@ def cronjob(
enabled_toolsets=enabled_toolsets or None,
workdir=_normalize_optional_job_value(workdir),
no_agent=_no_agent,
attach_to_session=attach_to_session,
)
_notify_provider_jobs_changed_safe()
_create_message = f"Cron job '{job['name']}' created."
@ -794,6 +796,8 @@ def cronjob(
updates["context_from"] = refs or None
if enabled_toolsets is not None:
updates["enabled_toolsets"] = enabled_toolsets or None
if attach_to_session is not None:
updates["attach_to_session"] = bool(attach_to_session)
if workdir is not None:
# Empty string clears the field (restores old behaviour);
# otherwise pass raw — update_job() validates / normalizes.
@ -952,6 +956,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"type": "string",
"description": "Optional absolute path to run the job from. When set, AGENTS.md / CLAUDE.md / .cursorrules from that directory are injected into the system prompt, and the terminal/file/code_exec tools use it as their working directory — useful for running a job inside a specific project repo. Must be an absolute path that exists. When unset (default), preserves the original behaviour: no project context files, tools use the scheduler's cwd. On update, pass an empty string to clear. Jobs with workdir run sequentially (not parallel) to keep per-job directories isolated."
},
"attach_to_session": {
"type": "boolean",
"description": "When True, this job's delivered output is also mirrored into the TARGET chat's conversation history (as an assistant turn), so when the user replies to the delivery the agent sees it in context instead of asking 'what is that?'. Use this for conversational recurring jobs the user will reply to — daily briefings, reminders that kick off follow-up work, anything where the cron output should be part of the ongoing chat. Leave unset for fire-and-forget alerts/watchdogs. Overrides the global cron.mirror_delivery config for this one job. No effect when deliver='local'. If the target chat has no conversation yet, the mirror is a silent no-op."
},
},
"required": ["action"]
}