mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
feat(cron): thread-preferred continuable delivery (open a thread, mirror DM fallback)
Continuable cron jobs (attach_to_session / cron.mirror_delivery, default OFF) now prefer a dedicated thread on thread-capable platforms, falling back to origin-DM mirroring where threads don't exist. - Thread-capable (Telegram topics, Discord/Slack threads): open a fresh thread for the job via the shipped adapter.create_handoff_thread, route the brief into it, and seed the thread-keyed session so the user's in-thread reply continues with full context. This is the 'continuable cron opens its own thread' interface. - DM-only (WhatsApp/Signal/SMS): create_handoff_thread returns None -> fall back to mirroring into the origin DM session (existing behaviour). Reuses existing infrastructure end-to-end — no new adapter surface, no provider-chain signature change: - adapter.create_handoff_thread (already implemented per-platform, returns None on unsupported platforms = the fallback signal) - the live SessionStore via adapter._session_store (already set on every adapter), reached without threading a new param through the frozen CronScheduler.start() contract - gateway.mirror.mirror_to_session for the seed/append - existing per-target delivery routing carries the new thread_id for free Mirrors GatewayRunner._process_handoff's open-thread-or-fallback + seed pattern, standalone for the cron delivery path. thread_seeded guards against a double-mirror after seeding. Scoped to the origin target only; fan-out/broadcast targets are never threaded or mirrored. Config docs updated (cron.mirror_delivery) + cronjob tool attach_to_session description reframed around continuable/thread-preferred. Tests: +5 (thread id returned on thread platform; None on DM platform; None without capability/loop; seed creates thread session + mirrors; seed no-op on empty). 22/22 in TestCronDeliveryMirror; 532 cron tests pass (4 failures pre-existing: croniter-not-installed + TZ).
This commit is contained in:
parent
98f3c19282
commit
b693bee100
4 changed files with 262 additions and 10 deletions
|
|
@ -470,6 +470,122 @@ def _maybe_mirror_cron_delivery(
|
|||
)
|
||||
|
||||
|
||||
def _open_continuable_cron_thread(
|
||||
job: dict,
|
||||
adapter,
|
||||
chat_id: str,
|
||||
loop,
|
||||
) -> Optional[str]:
|
||||
"""Open a dedicated thread for a continuable cron job, thread-preferred.
|
||||
|
||||
On a thread-capable platform (Telegram forum/DM topics, Discord threads,
|
||||
Slack threads) this asks the adapter to create a fresh thread under
|
||||
``chat_id`` so the job's brief — and the user's replies to it — live in
|
||||
their own scrollback, isolated from the parent channel. This is the
|
||||
"continuable cron job opens its own thread" interface.
|
||||
|
||||
Returns the new ``thread_id`` (str) on success, or ``None`` when the
|
||||
platform does not support threads (WhatsApp / Signal / SMS / DM-only) or
|
||||
creation failed. A ``None`` return is the signal for the caller to fall
|
||||
back to mirroring into the origin DM session instead — same fallback shape
|
||||
the gateway's session-handoff watcher uses (``_process_handoff``).
|
||||
|
||||
Reuses the shipped ``adapter.create_handoff_thread`` primitive; introduces
|
||||
no new adapter surface.
|
||||
"""
|
||||
create_thread = getattr(adapter, "create_handoff_thread", None)
|
||||
if not callable(create_thread) or loop is None:
|
||||
return None
|
||||
task_name = job.get("name") or job.get("id", "cron")
|
||||
thread_name = f"Hermes — {task_name}"
|
||||
try:
|
||||
from agent.async_utils import safe_schedule_threadsafe
|
||||
|
||||
coro = create_thread(str(chat_id), thread_name)
|
||||
future = safe_schedule_threadsafe(coro, loop) # type: ignore[arg-type]
|
||||
if future is None:
|
||||
return None
|
||||
new_thread_id = future.result(timeout=30)
|
||||
return str(new_thread_id) if new_thread_id else None
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Job '%s': create_handoff_thread failed on %s — falling back to "
|
||||
"DM-session mirror: %s",
|
||||
job.get("id", "?"), getattr(adapter, "name", "?"), e,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _seed_cron_thread_session(
|
||||
job: dict,
|
||||
adapter,
|
||||
platform_name: str,
|
||||
chat_id: str,
|
||||
thread_id: str,
|
||||
mirror_text: str,
|
||||
chat_name: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Seed the freshly-opened cron thread's session with the brief.
|
||||
|
||||
Without this the brief is *visible* in the new thread but absent from any
|
||||
transcript, so the user's first reply in-thread would hit a session with no
|
||||
record of it ("what is Task #2?"). We create the thread-keyed session (the
|
||||
same key the user's reply will resolve to — ``build_session_key`` keys
|
||||
threads as participant-shared, so no ``user_id`` is needed) and append the
|
||||
brief as an assistant turn via the shipped ``mirror_to_session``.
|
||||
|
||||
Mirrors ``GatewayRunner._process_handoff``'s seed step, but standalone:
|
||||
cron reaches the live ``SessionStore`` through the adapter's
|
||||
``_session_store`` handle rather than the gateway object. Best-effort — a
|
||||
delivery that already succeeded is never failed by a seeding problem.
|
||||
"""
|
||||
text = (mirror_text or "").strip()
|
||||
if not text:
|
||||
return
|
||||
try:
|
||||
from gateway.config import Platform
|
||||
from gateway.session import SessionSource
|
||||
|
||||
session_store = getattr(adapter, "_session_store", None)
|
||||
if session_store is not None:
|
||||
try:
|
||||
platform_enum = Platform(platform_name.lower())
|
||||
except (ValueError, KeyError):
|
||||
platform_enum = None
|
||||
if platform_enum is not None:
|
||||
dest_source = SessionSource(
|
||||
platform=platform_enum,
|
||||
chat_id=str(chat_id),
|
||||
chat_name=chat_name,
|
||||
chat_type="thread",
|
||||
user_id="system:cron",
|
||||
user_name="Cron",
|
||||
thread_id=str(thread_id),
|
||||
)
|
||||
# Ensure the thread-keyed session row exists so the mirror has
|
||||
# a target and the user's later reply joins the same session.
|
||||
session_store.get_or_create_session(dest_source)
|
||||
|
||||
from gateway.mirror import mirror_to_session
|
||||
|
||||
mirror_to_session(
|
||||
platform_name,
|
||||
str(chat_id),
|
||||
text,
|
||||
source_label="cron",
|
||||
thread_id=str(thread_id),
|
||||
)
|
||||
logger.info(
|
||||
"Job '%s': opened continuable thread %s on %s:%s and seeded the brief",
|
||||
job.get("id", "?"), thread_id, platform_name, chat_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Job '%s': seeding cron thread session failed for %s:%s:%s: %s",
|
||||
job.get("id", "?"), platform_name, chat_id, thread_id, e,
|
||||
)
|
||||
|
||||
|
||||
def _cron_job_origin_log_suffix(job: dict) -> str:
|
||||
"""Return safe provenance details for security warnings about a cron job.
|
||||
|
||||
|
|
@ -1031,6 +1147,35 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
runtime_adapter = (adapters or {}).get(platform)
|
||||
delivered = False
|
||||
target_errors = []
|
||||
|
||||
# Continuable cron (thread-preferred): when mirroring is enabled for the
|
||||
# origin target and the gateway is live, try to open a DEDICATED thread
|
||||
# for this job and deliver the brief into it. On thread-capable
|
||||
# platforms (Telegram/Discord/Slack) the brief + the user's replies live
|
||||
# in their own scrollback; the thread-keyed session is seeded so a reply
|
||||
# continues with full context. On DM-only platforms (WhatsApp/Signal)
|
||||
# create_handoff_thread returns None and we fall back to mirroring into
|
||||
# the origin DM session (handled after delivery). Cf. _process_handoff.
|
||||
thread_seeded = False
|
||||
if (
|
||||
mirror_this_target
|
||||
and runtime_adapter is not None
|
||||
and loop is not None
|
||||
and not thread_id # never override an explicit origin thread/topic
|
||||
):
|
||||
new_thread_id = _open_continuable_cron_thread(
|
||||
job, runtime_adapter, chat_id, loop,
|
||||
)
|
||||
if new_thread_id:
|
||||
# Route this delivery into the new thread and seed its session.
|
||||
thread_id = new_thread_id
|
||||
_seed_cron_thread_session(
|
||||
job, runtime_adapter, platform_name, chat_id,
|
||||
new_thread_id, mirror_text,
|
||||
chat_name=origin.get("chat_name"),
|
||||
)
|
||||
thread_seeded = True
|
||||
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
# Telegram three-mode topic routing (#22773): a private chat
|
||||
# (positive chat_id) with a NUMERIC topic id is a Bot API Direct
|
||||
|
|
@ -1246,7 +1391,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
_maybe_mirror_cron_delivery(
|
||||
job, platform_name, chat_id, mirror_text,
|
||||
thread_id=thread_id, user_id=origin_user_id,
|
||||
enabled=mirror_this_target,
|
||||
enabled=mirror_this_target and not thread_seeded,
|
||||
)
|
||||
except Exception as e:
|
||||
err_msg = f"live adapter delivery to {platform_name}:{chat_id} failed: {e}"
|
||||
|
|
@ -1289,7 +1434,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
_maybe_mirror_cron_delivery(
|
||||
job, platform_name, chat_id, mirror_text,
|
||||
thread_id=thread_id, user_id=origin_user_id,
|
||||
enabled=mirror_this_target,
|
||||
enabled=mirror_this_target and not thread_seeded,
|
||||
)
|
||||
|
||||
if delivery_errors:
|
||||
|
|
|
|||
|
|
@ -2381,16 +2381,26 @@ DEFAULT_CONFIG = {
|
|||
# Wrap delivered cron responses with a header (task name) and footer
|
||||
# ("The agent cannot see this message"). Set to false for clean output.
|
||||
"wrap_response": True,
|
||||
# Mirror each cron delivery into the TARGET chat's gateway session
|
||||
# transcript (as an assistant turn), so a user reply in that chat sees
|
||||
# the cron output in context instead of "what is Task #2?" amnesia.
|
||||
# Make cron deliveries CONTINUABLE: a user can reply to a cron brief
|
||||
# and the agent has it in context (no "what is Task #2?" amnesia).
|
||||
# Default False preserves the historical isolation guarantee (cron
|
||||
# deliveries live only in the cron job's own session). Per-job
|
||||
# `attach_to_session` overrides this for a single job. Rides the same
|
||||
# gateway.mirror.mirror_to_session path interactive send_message uses;
|
||||
# `attach_to_session` overrides this for a single job.
|
||||
#
|
||||
# Behaviour is THREAD-PREFERRED, scoped to the job's origin chat:
|
||||
# - Thread-capable platforms (Telegram forum/DM topics, Discord
|
||||
# threads, Slack threads): a dedicated thread is opened for the job
|
||||
# via the adapter's create_handoff_thread, the brief is delivered
|
||||
# into it, and that thread's session is seeded so the user's reply
|
||||
# in-thread continues with full context. Each continuable job gets
|
||||
# its own scrollback, isolated from the parent channel.
|
||||
# - DM-only platforms (WhatsApp / Signal / SMS): no threads exist, so
|
||||
# the brief is mirrored into the origin DM session instead — the
|
||||
# DM itself is the continuation surface.
|
||||
# Both paths ride the shipped gateway.mirror.mirror_to_session and are
|
||||
# alternation- and cache-safe (appended at a turn boundary, never
|
||||
# mid-loop, never mutates the cached system prompt). When the target
|
||||
# chat has no session yet (cold start) the mirror is a silent no-op.
|
||||
# mid-loop, never mutating the cached system prompt). Only the origin
|
||||
# chat is ever touched — fan-out / broadcast targets are never mirrored.
|
||||
"mirror_delivery": False,
|
||||
# Maximum number of due jobs to run in parallel per tick.
|
||||
# null/0 = unbounded (limited only by thread count).
|
||||
|
|
|
|||
|
|
@ -3751,3 +3751,100 @@ class TestCronDeliveryMirror:
|
|||
|
||||
mirror_mock.assert_called_once()
|
||||
assert mirror_mock.call_args.kwargs.get("user_id") == "U42"
|
||||
|
||||
# --- continuable cron: thread-preferred (Teknium's interface) ---
|
||||
|
||||
def test_open_thread_returns_id_on_thread_platform(self):
|
||||
"""On a thread-capable adapter, _open_continuable_cron_thread returns
|
||||
the new thread id from create_handoff_thread."""
|
||||
from cron.scheduler import _open_continuable_cron_thread
|
||||
|
||||
adapter = MagicMock()
|
||||
adapter.create_handoff_thread = AsyncMock(return_value="9001")
|
||||
|
||||
# safe_schedule_threadsafe hands the coro to the gateway loop and
|
||||
# returns a future. Patch it to close the coro and return a ready
|
||||
# future carrying the adapter's thread id.
|
||||
def _run_now(coro, _loop):
|
||||
coro.close()
|
||||
fut = MagicMock()
|
||||
fut.result.return_value = "9001"
|
||||
return fut
|
||||
|
||||
with patch("agent.async_utils.safe_schedule_threadsafe", side_effect=_run_now):
|
||||
tid = _open_continuable_cron_thread(
|
||||
{"id": "j1", "name": "Brief"}, adapter, "123", loop=MagicMock(),
|
||||
)
|
||||
assert tid == "9001"
|
||||
|
||||
def test_open_thread_returns_none_on_dm_platform(self):
|
||||
"""A DM-only adapter (WhatsApp) inherits the base create_handoff_thread
|
||||
that returns None → _open_continuable_cron_thread returns None so the
|
||||
caller falls back to DM-session mirroring."""
|
||||
from cron.scheduler import _open_continuable_cron_thread
|
||||
|
||||
adapter = MagicMock()
|
||||
adapter.create_handoff_thread = AsyncMock(return_value=None)
|
||||
|
||||
def _run_now(coro, _loop):
|
||||
fut = MagicMock()
|
||||
fut.result.return_value = None
|
||||
coro.close()
|
||||
return fut
|
||||
|
||||
with patch("agent.async_utils.safe_schedule_threadsafe", side_effect=_run_now):
|
||||
tid = _open_continuable_cron_thread(
|
||||
{"id": "j1", "name": "Brief"}, adapter, "123", loop=MagicMock(),
|
||||
)
|
||||
assert tid is None
|
||||
|
||||
def test_open_thread_none_without_capability_or_loop(self):
|
||||
"""No create_handoff_thread attr, or no loop → None (no crash)."""
|
||||
from cron.scheduler import _open_continuable_cron_thread
|
||||
|
||||
adapter_no_cap = MagicMock(spec=[]) # no create_handoff_thread
|
||||
assert _open_continuable_cron_thread(
|
||||
{"id": "j1"}, adapter_no_cap, "123", loop=MagicMock(),
|
||||
) is None
|
||||
|
||||
adapter = MagicMock()
|
||||
adapter.create_handoff_thread = AsyncMock(return_value="9001")
|
||||
assert _open_continuable_cron_thread(
|
||||
{"id": "j1"}, adapter, "123", loop=None,
|
||||
) is None
|
||||
|
||||
def test_seed_thread_session_creates_session_and_mirrors(self):
|
||||
"""Seeding a freshly-opened thread creates the thread-keyed session via
|
||||
the adapter's live store and appends the brief via mirror_to_session."""
|
||||
from cron.scheduler import _seed_cron_thread_session
|
||||
|
||||
store = MagicMock()
|
||||
adapter = MagicMock()
|
||||
adapter._session_store = store
|
||||
|
||||
with patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
_seed_cron_thread_session(
|
||||
{"id": "j1"}, adapter, "telegram", "123", "9001",
|
||||
"Daily brief Task #2", chat_name="Ops",
|
||||
)
|
||||
|
||||
# Session row created for the thread, then brief mirrored into it.
|
||||
store.get_or_create_session.assert_called_once()
|
||||
seeded_source = store.get_or_create_session.call_args[0][0]
|
||||
assert seeded_source.chat_type == "thread"
|
||||
assert seeded_source.thread_id == "9001"
|
||||
mirror_mock.assert_called_once()
|
||||
assert mirror_mock.call_args.kwargs.get("thread_id") == "9001"
|
||||
|
||||
def test_seed_thread_session_noop_on_empty_text(self):
|
||||
from cron.scheduler import _seed_cron_thread_session
|
||||
|
||||
store = MagicMock()
|
||||
adapter = MagicMock()
|
||||
adapter._session_store = store
|
||||
with patch("gateway.mirror.mirror_to_session") as mirror_mock:
|
||||
_seed_cron_thread_session(
|
||||
{"id": "j1"}, adapter, "telegram", "123", "9001", " ",
|
||||
)
|
||||
store.get_or_create_session.assert_not_called()
|
||||
mirror_mock.assert_not_called()
|
||||
|
|
|
|||
|
|
@ -964,7 +964,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
|||
},
|
||||
"attach_to_session": {
|
||||
"type": "boolean",
|
||||
"description": "When True, this job's delivered output is also mirrored into the TARGET chat's conversation history (as an assistant turn), so when the user replies to the delivery the agent sees it in context instead of asking 'what is that?'. Use this for conversational recurring jobs the user will reply to — daily briefings, reminders that kick off follow-up work, anything where the cron output should be part of the ongoing chat. Leave unset for fire-and-forget alerts/watchdogs. Overrides the global cron.mirror_delivery config for this one job. No effect when deliver='local'. If the target chat has no conversation yet, the mirror is a silent no-op."
|
||||
"description": "When True, this job becomes CONTINUABLE: the user can reply to its delivery and the agent has the brief in context instead of asking 'what is that?'. On thread-capable platforms (Telegram topics, Discord/Slack threads) a dedicated thread is opened for the job and its replies; on DM-only platforms (WhatsApp/Signal) the brief is mirrored into the origin DM session. Use this for conversational recurring jobs the user will reply to — daily briefings, reminders that kick off follow-up work. Leave unset for fire-and-forget alerts/watchdogs. Overrides the global cron.mirror_delivery config for this one job. Only the origin chat is touched (never fan-out targets); no effect when deliver='local'."
|
||||
},
|
||||
},
|
||||
"required": ["action"]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue