mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
fix(cron): mirror continuable cron as a labelled user turn (alternation-safe)
Addresses review on #51077 (kxee). The continuable-cron mirror reused
gateway.mirror.mirror_to_session, which writes role=assistant — re-
introducing the exact alternation violation #2313 (37a997945)
deliberately removed: a cron brief landing as assistant after the
agent's last turn yields assistant->assistant, which breaks strict-
alternation providers (OpenAI/OpenRouter) per issue #2221. The mirror/
mirror_source metadata is also dropped at the SQLite boundary, so the
[Delivered from cron] label is lost on replay.
This is an intentional, opt-in (default OFF) reversal of #2313's
'cron output does not belong in interactive history' for the reply-to-
cron use case — gated behind cron.mirror_delivery / attach_to_session.
Fixes:
- mirror_to_session gains a role param (default 'assistant' — interactive
send_message mirror unchanged, it IS the agent speaking). Cron paths
pass role='user' with a '[Cron delivery: <task>]' prefix so the brief
collapses via repair_message_sequence's consecutive-user merge on every
provider, and stays distinguishable on replay despite the metadata drop.
- thread_seeded: defer seeding + the flag until delivery into the new
thread actually succeeds. Previously set pre-delivery, so an open-
succeeds / deliver-fails case both stranded a seeded-but-unseen brief
AND suppressed the DM-fallback mirror.
- seed mirror now passes user_id='system:cron' to resolve the exact
thread-keyed session row it just created.
- dedupe the duplicate BasePlatformAdapter import in _deliver_result.
- trim oversized docstrings to non-obvious WHY (AGENTS.md).
- docs: document cron.mirror_delivery / attach_to_session in
website/docs/user-guide/features/cron.md.
- test: assert the cron mirror writes role='user' with the label prefix.
204 cron+mirror tests pass.
This commit is contained in:
parent
b693bee100
commit
b177d4ee48
4 changed files with 116 additions and 36 deletions
|
|
@ -444,13 +444,22 @@ def _maybe_mirror_cron_delivery(
|
|||
try:
|
||||
from gateway.mirror import mirror_to_session
|
||||
|
||||
# Mirror as a USER turn with a labelled prefix, NOT an assistant turn.
|
||||
# The brief is not the agent speaking; an assistant-role mirror lands as
|
||||
# assistant→assistant after the agent's last turn and breaks strict
|
||||
# alternation (issue #2221, the exact failure #2313 removed). A
|
||||
# user-role turn collapses safely via repair_message_sequence's
|
||||
# consecutive-user merge on every provider, and the prefix preserves the
|
||||
# "this came from cron" context that the dropped SQLite mirror metadata
|
||||
# would otherwise lose on replay.
|
||||
ok = mirror_to_session(
|
||||
platform_name,
|
||||
str(chat_id),
|
||||
text,
|
||||
f"[Cron delivery: {job.get('name') or job.get('id', 'cron')}]\n{text}",
|
||||
source_label="cron",
|
||||
thread_id=thread_id,
|
||||
user_id=user_id,
|
||||
role="user",
|
||||
)
|
||||
if ok:
|
||||
logger.info(
|
||||
|
|
@ -476,22 +485,13 @@ def _open_continuable_cron_thread(
|
|||
chat_id: str,
|
||||
loop,
|
||||
) -> Optional[str]:
|
||||
"""Open a dedicated thread for a continuable cron job, thread-preferred.
|
||||
"""Open a dedicated thread for a continuable cron job (thread-preferred).
|
||||
|
||||
On a thread-capable platform (Telegram forum/DM topics, Discord threads,
|
||||
Slack threads) this asks the adapter to create a fresh thread under
|
||||
``chat_id`` so the job's brief — and the user's replies to it — live in
|
||||
their own scrollback, isolated from the parent channel. This is the
|
||||
"continuable cron job opens its own thread" interface.
|
||||
|
||||
Returns the new ``thread_id`` (str) on success, or ``None`` when the
|
||||
platform does not support threads (WhatsApp / Signal / SMS / DM-only) or
|
||||
creation failed. A ``None`` return is the signal for the caller to fall
|
||||
back to mirroring into the origin DM session instead — same fallback shape
|
||||
the gateway's session-handoff watcher uses (``_process_handoff``).
|
||||
|
||||
Reuses the shipped ``adapter.create_handoff_thread`` primitive; introduces
|
||||
no new adapter surface.
|
||||
Returns the new ``thread_id`` on success, or ``None`` when the platform has
|
||||
no thread primitive (WhatsApp/Signal/SMS) or creation failed — the ``None``
|
||||
return is the caller's signal to fall back to the origin-DM mirror, the same
|
||||
open-thread-or-fallback shape as ``GatewayRunner._process_handoff``. Reuses
|
||||
the shipped ``adapter.create_handoff_thread``; no new adapter surface.
|
||||
"""
|
||||
create_thread = getattr(adapter, "create_handoff_thread", None)
|
||||
if not callable(create_thread) or loop is None:
|
||||
|
|
@ -568,12 +568,19 @@ def _seed_cron_thread_session(
|
|||
|
||||
from gateway.mirror import mirror_to_session
|
||||
|
||||
# User-role + labelled prefix (see _maybe_mirror_cron_delivery): the
|
||||
# seeded brief must not read as an assistant turn, or the user's first
|
||||
# in-thread reply produces assistant→user→... off a phantom assistant
|
||||
# message. Pass the seed user_id so the mirror resolves the exact
|
||||
# thread-keyed session row we just created.
|
||||
mirror_to_session(
|
||||
platform_name,
|
||||
str(chat_id),
|
||||
text,
|
||||
f"[Cron delivery: {job.get('name') or job.get('id', 'cron')}]\n{text}",
|
||||
source_label="cron",
|
||||
thread_id=str(thread_id),
|
||||
user_id="system:cron",
|
||||
role="user",
|
||||
)
|
||||
logger.info(
|
||||
"Job '%s': opened continuable thread %s on %s:%s and seeded the brief",
|
||||
|
|
@ -1066,26 +1073,24 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
else:
|
||||
delivery_content = content
|
||||
|
||||
# Extract MEDIA: tags so attachments are forwarded as files, not raw text
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
|
||||
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
|
||||
|
||||
# Resolve the delivery-mirror gate ONCE (default off). When on, each
|
||||
# successful delivery is also appended to the target chat's gateway session
|
||||
# transcript so a user reply in that chat sees the cron output in context.
|
||||
# We mirror the CLEAN, unwrapped agent output (not the cron header/footer) —
|
||||
# that is what the agent "said" in the conversation.
|
||||
# Mirror the CLEAN, unwrapped output (not the cron header/footer).
|
||||
try:
|
||||
mirror_enabled = _cron_mirror_delivery_enabled(job, user_cfg)
|
||||
except Exception:
|
||||
mirror_enabled = False
|
||||
mirror_text = ""
|
||||
if mirror_enabled:
|
||||
from gateway.platforms.base import BasePlatformAdapter as _BPA
|
||||
_, mirror_text = _BPA.extract_media(content)
|
||||
_, mirror_text = BasePlatformAdapter.extract_media(content)
|
||||
mirror_text = (mirror_text or "").strip()
|
||||
|
||||
# Extract MEDIA: tags so attachments are forwarded as files, not raw text
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
|
||||
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
|
||||
|
||||
try:
|
||||
config = load_gateway_config()
|
||||
except Exception as e:
|
||||
|
|
@ -1157,6 +1162,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
# create_handoff_thread returns None and we fall back to mirroring into
|
||||
# the origin DM session (handled after delivery). Cf. _process_handoff.
|
||||
thread_seeded = False
|
||||
opened_thread_id: Optional[str] = None
|
||||
if (
|
||||
mirror_this_target
|
||||
and runtime_adapter is not None
|
||||
|
|
@ -1167,14 +1173,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
job, runtime_adapter, chat_id, loop,
|
||||
)
|
||||
if new_thread_id:
|
||||
# Route this delivery into the new thread and seed its session.
|
||||
# Route THIS delivery into the new thread now (the send needs the
|
||||
# thread_id), but defer seeding the thread session until the
|
||||
# delivery actually succeeds — otherwise an open-succeeds /
|
||||
# deliver-fails case leaves a seeded brief the user never saw,
|
||||
# and (worse) suppresses the DM-fallback mirror via thread_seeded.
|
||||
thread_id = new_thread_id
|
||||
_seed_cron_thread_session(
|
||||
job, runtime_adapter, platform_name, chat_id,
|
||||
new_thread_id, mirror_text,
|
||||
chat_name=origin.get("chat_name"),
|
||||
)
|
||||
thread_seeded = True
|
||||
opened_thread_id = new_thread_id
|
||||
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
# Telegram three-mode topic routing (#22773): a private chat
|
||||
|
|
@ -1388,6 +1393,15 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
if adapter_ok:
|
||||
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
||||
delivered = True
|
||||
# Seed the thread session only now that delivery into it
|
||||
# succeeded (deferred from thread-open above).
|
||||
if opened_thread_id and not thread_seeded:
|
||||
_seed_cron_thread_session(
|
||||
job, runtime_adapter, platform_name, chat_id,
|
||||
opened_thread_id, mirror_text,
|
||||
chat_name=origin.get("chat_name"),
|
||||
)
|
||||
thread_seeded = True
|
||||
_maybe_mirror_cron_delivery(
|
||||
job, platform_name, chat_id, mirror_text,
|
||||
thread_id=thread_id, user_id=origin_user_id,
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ def mirror_to_session(
|
|||
source_label: str = "cli",
|
||||
thread_id: Optional[str] = None,
|
||||
user_id: Optional[str] = None,
|
||||
role: str = "assistant",
|
||||
) -> bool:
|
||||
"""
|
||||
Append a delivery-mirror message to the target session's transcript.
|
||||
|
|
@ -36,6 +37,17 @@ def mirror_to_session(
|
|||
Finds the gateway session that matches the given platform + chat_id,
|
||||
then writes a mirror entry to both the JSONL transcript and SQLite DB.
|
||||
|
||||
``role`` defaults to ``"assistant"`` — correct for the interactive
|
||||
``send_message`` mirror, where the mirrored text is the agent's own
|
||||
outgoing reply (a genuine assistant turn). Callers mirroring text that is
|
||||
NOT the agent speaking — e.g. a cron brief delivered out-of-band — must
|
||||
pass ``role="user"``: the ``mirror``/``mirror_source`` metadata is dropped
|
||||
at the SQLite boundary (only role+content persist), so on replay an
|
||||
assistant-role mirror is indistinguishable from a real assistant turn and
|
||||
produces ``assistant → assistant`` pairs that break strict-alternation
|
||||
providers (issue #2221). A user-role mirror collapses safely via
|
||||
``repair_message_sequence``'s consecutive-user merge on every provider.
|
||||
|
||||
Returns True if mirrored successfully, False if no matching session or error.
|
||||
All errors are caught -- this is never fatal.
|
||||
"""
|
||||
|
|
@ -57,7 +69,7 @@ def mirror_to_session(
|
|||
return False
|
||||
|
||||
mirror_msg = {
|
||||
"role": "assistant",
|
||||
"role": role,
|
||||
"content": message_text,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"mirror": True,
|
||||
|
|
|
|||
|
|
@ -3526,8 +3526,8 @@ class TestCronDeliveryMirror:
|
|||
|
||||
with patch("gateway.mirror.mirror_to_session", return_value=True) as m:
|
||||
_maybe_mirror_cron_delivery(
|
||||
{"id": "j1"}, "telegram", "123", "Daily brief Task #2",
|
||||
thread_id=None, enabled=True,
|
||||
{"id": "j1", "name": "Daily Brief"}, "telegram", "123",
|
||||
"Daily brief Task #2", thread_id=None, enabled=True,
|
||||
)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
|
|
@ -3536,6 +3536,27 @@ class TestCronDeliveryMirror:
|
|||
assert "Task #2" in args[2]
|
||||
assert kwargs.get("source_label") == "cron"
|
||||
|
||||
def test_mirror_writes_user_role_with_label_not_assistant(self):
|
||||
"""Regression for #2221 / #2313: the cron brief must mirror as a USER
|
||||
turn (with a [Cron delivery: ...] label), NOT assistant — an
|
||||
assistant-role mirror lands as assistant->assistant after the agent's
|
||||
last turn and breaks strict alternation on non-Anthropic providers."""
|
||||
from cron.scheduler import _maybe_mirror_cron_delivery
|
||||
|
||||
with patch("gateway.mirror.mirror_to_session", return_value=True) as m:
|
||||
_maybe_mirror_cron_delivery(
|
||||
{"id": "j1", "name": "Morning Brief"}, "telegram", "123",
|
||||
"Market movers today", thread_id=None, enabled=True,
|
||||
)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
assert kwargs.get("role") == "user", "cron mirror must be a user turn, not assistant"
|
||||
# The brief text is prefixed with a human-readable cron-delivery label
|
||||
# so replay (where the mirror metadata is dropped at the SQLite
|
||||
# boundary) still distinguishes it from a genuine user message.
|
||||
assert args[2].startswith("[Cron delivery: Morning Brief]")
|
||||
assert "Market movers today" in args[2]
|
||||
|
||||
def test_mirror_noop_when_disabled(self):
|
||||
from cron.scheduler import _maybe_mirror_cron_delivery
|
||||
|
||||
|
|
|
|||
|
|
@ -298,6 +298,39 @@ cron:
|
|||
wrap_response: false
|
||||
```
|
||||
|
||||
### Continuable jobs (reply to a cron delivery)
|
||||
|
||||
By default a cron delivery is fire-and-forget: the message is sent, but it does
|
||||
not live in the chat's conversation history, so if you reply to it the agent
|
||||
has no record of what it said. Set a job **continuable** and the delivered brief
|
||||
becomes a conversation you can reply into — the agent has the brief in context
|
||||
instead of asking "what is Task #2?".
|
||||
|
||||
Opt-in, **default off**. Enable globally in config, or per-job via the `cronjob`
|
||||
tool's `attach_to_session` (which overrides the global setting for that one job):
|
||||
|
||||
```yaml
|
||||
# ~/.hermes/config.yaml
|
||||
cron:
|
||||
mirror_delivery: false # set true to make cron deliveries continuable
|
||||
```
|
||||
|
||||
Behaviour is **thread-preferred**, scoped to the job's origin chat:
|
||||
|
||||
- **Thread-capable platforms** (Telegram topics, Discord/Slack threads): each
|
||||
delivery opens its own dedicated thread and the brief is seeded into that
|
||||
thread's session, so a reply in-thread continues with full context. A
|
||||
recurring job (e.g. a daily brief) opens a fresh thread per run, keeping each
|
||||
delivery's follow-up discussion isolated.
|
||||
- **DM-only platforms** (WhatsApp, Signal, SMS): no threads exist, so the brief
|
||||
is mirrored into the origin DM session instead — the DM itself is the
|
||||
continuation surface.
|
||||
|
||||
Only the origin chat is ever touched: fan-out / broadcast targets (`all`,
|
||||
explicit other-chat deliveries) are never made continuable. The mirror is
|
||||
written as a labelled user turn (`[Cron delivery: <task name>]`), which keeps
|
||||
the conversation history alternation-safe across all model providers.
|
||||
|
||||
### Silent suppression
|
||||
|
||||
If the agent's final response contains `[SILENT]`, delivery is suppressed entirely. The output is still saved locally for audit (in `~/.hermes/cron/output/`), but no message is sent to the delivery target.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue