refactor(cron): scope delivery mirror to the origin conversation

The cron->session mirror now fires ONLY for the delivery target that
equals the job's origin (platform+chat_id[+thread_id]). A job created
from a live gateway chat stamps that chat as origin, and that session is
guaranteed to exist (it is the conversation the user scheduled the job
in). Fan-out / broadcast / home-channel-fallback targets are never
mirrored: they are not a continuation of a conversation and may have no
session at all.

This makes the prior 'cold-start session seeding' concern a non-case by
construction: when the mirror semantically applies the session exists;
when none exists the target was never the origin, so we no-op.

Adds _target_matches_origin() + origin-scoping tests (exact match,
other-chat/other-platform/no-origin rejection, thread scoping, fan-out
mirrors only the origin target).
This commit is contained in:
Victor Kyriazakos 2026-06-22 19:02:23 +03:00 committed by Teknium
parent 1b181724fa
commit c06ceb3232
2 changed files with 137 additions and 6 deletions

View file

@ -376,6 +376,39 @@ def _cron_mirror_delivery_enabled(job: dict, cfg: Optional[dict] = None) -> bool
return False
def _target_matches_origin(origin: dict, platform_name: str, chat_id: str,
thread_id: Optional[str]) -> bool:
"""True when a delivery target is the job's own origin conversation.
Mirroring is scoped to the origin session by design (see
``_maybe_mirror_cron_delivery``). A job created from a live gateway chat
stamps that chat as ``origin`` (``cronjob_tools._origin_from_env``), and
that session is guaranteed to exist it is the very conversation the user
was in when they scheduled the job. Fan-out targets (``deliver=all``,
explicit ``platform:chat_id`` to some *other* chat, or a home-channel
fallback for an origin-less API/script job) are deliberately NOT mirrored:
they are broadcasts, not a continuation of a conversation, and may point at
a chat the user never opened an agent session in.
This makes the historical "cold-start" worry a non-case: when the mirror
semantically applies (target == origin) the session always exists; when no
session exists, the target was never the origin conversation, so we simply
do not mirror.
"""
if not origin:
return False
if str(origin.get("platform", "")).lower() != str(platform_name).lower():
return False
if str(origin.get("chat_id", "")) != str(chat_id):
return False
# thread_id must match when the origin pins one (topic-scoped chats); a
# target that lost the thread_id is not the same conversation lane.
origin_thread = origin.get("thread_id")
if origin_thread is not None and str(origin_thread) != str(thread_id or ""):
return False
return True
def _maybe_mirror_cron_delivery(
job: dict,
platform_name: str,
@ -385,14 +418,20 @@ def _maybe_mirror_cron_delivery(
*,
enabled: bool = False,
) -> None:
"""Best-effort mirror of a cron delivery into the target chat's session.
"""Best-effort mirror of a cron delivery into the origin chat's session.
No-op unless ``enabled`` (resolved once by the caller). Reuses the shipped
No-op unless ``enabled`` (resolved once by the caller, and already scoped to
the origin target see ``_target_matches_origin``). Reuses the shipped
``mirror_to_session`` so cron rides exactly the same path that interactive
``send_message`` mirroring already uses. All failures are swallowed a
delivery that succeeded must never be reported as failed because the
transcript mirror could not find a session (the cold-start case: the target
chat has no gateway session yet, so there is nothing to mirror into).
transcript mirror hit a problem.
Because the caller only enables this for the target that equals the job's
origin conversation, the session is expected to exist (the job was born in
that session). A missing session therefore indicates an origin-less /
fan-out delivery that should not have been mirrored anyway, and is treated
as a silent no-op never a synthetic session is created.
"""
if not enabled:
return
@ -956,6 +995,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
job["id"], platform_name, chat_id, thread_id,
)
# Mirror is scoped to the ORIGIN conversation only. A fan-out / broadcast
# / home-channel-fallback target is never mirrored (it is not the
# conversation the job was created in, and may have no session at all).
mirror_this_target = mirror_enabled and _target_matches_origin(
origin, platform_name, chat_id, thread_id
)
# Built-in names resolve to their enum member; plugin platform names
# create dynamic members via Platform._missing_().
try:
@ -1192,7 +1238,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
delivered = True
_maybe_mirror_cron_delivery(
job, platform_name, chat_id, mirror_text,
thread_id=thread_id, enabled=mirror_enabled,
thread_id=thread_id, enabled=mirror_this_target,
)
except Exception as e:
err_msg = f"live adapter delivery to {platform_name}:{chat_id} failed: {e}"
@ -1234,7 +1280,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
_maybe_mirror_cron_delivery(
job, platform_name, chat_id, mirror_text,
thread_id=thread_id, enabled=mirror_enabled,
thread_id=thread_id, enabled=mirror_this_target,
)
if delivery_errors:

View file

@ -3625,3 +3625,88 @@ class TestCronDeliveryMirror:
_deliver_result(job, "Here is today's summary.")
mirror_mock.assert_not_called()
# --- origin-scoping (mirror only into the conversation that created the job) ---
def test_target_matches_origin_exact(self):
from cron.scheduler import _target_matches_origin
origin = {"platform": "telegram", "chat_id": "123"}
assert _target_matches_origin(origin, "telegram", "123", None) is True
# Case-insensitive platform match.
assert _target_matches_origin(origin, "Telegram", "123", None) is True
def test_target_matches_origin_rejects_other_chat(self):
from cron.scheduler import _target_matches_origin
origin = {"platform": "telegram", "chat_id": "123"}
# Different chat (fan-out / explicit other target) -> not the origin.
assert _target_matches_origin(origin, "telegram", "999", None) is False
# Different platform (deliver=all broadcast) -> not the origin.
assert _target_matches_origin(origin, "discord", "123", None) is False
# No origin at all (API/script job, home-channel fallback) -> never.
assert _target_matches_origin({}, "telegram", "123", None) is False
def test_target_matches_origin_thread_scoped(self):
from cron.scheduler import _target_matches_origin
origin = {"platform": "telegram", "chat_id": "123", "thread_id": "17"}
assert _target_matches_origin(origin, "telegram", "123", "17") is True
# Same chat, wrong/lost thread lane -> not the same conversation.
assert _target_matches_origin(origin, "telegram", "123", None) is False
assert _target_matches_origin(origin, "telegram", "123", "99") is False
def test_delivery_does_not_mirror_fanout_non_origin_target(self):
"""Even with the gate ON, a delivery to a chat that is NOT the job's
origin (explicit fan-out target) must not be mirrored the mirror is
scoped to the origin conversation, and the fan-out chat may have no
session at all."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
job = {
"id": "test-job",
"name": "daily-report",
# Explicit delivery to a DIFFERENT chat than the origin.
"deliver": "telegram:999",
"origin": {"platform": "telegram", "chat_id": "123"},
"attach_to_session": True,
}
_deliver_result(job, "Here is today's summary.")
# Delivered to 999, but origin is 123 -> no mirror.
mirror_mock.assert_not_called()
def test_delivery_mirrors_only_origin_target_in_fanout(self):
"""deliver to BOTH origin and another chat: only the origin target is
mirrored."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
job = {
"id": "test-job",
"name": "daily-report",
# Fan out to the origin chat (123) AND another chat (999).
"deliver": "telegram:123,telegram:999",
"origin": {"platform": "telegram", "chat_id": "123"},
"attach_to_session": True,
}
_deliver_result(job, "Here is today's summary.")
# Exactly one mirror, and it is the origin chat (123) — not 999.
mirror_mock.assert_called_once()
assert mirror_mock.call_args[0][1] == "123"