mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-24 05:41:40 +00:00
feat(session): make /handoff actually transfer the session live
Builds on @kshitijk4poor's CLI handoff stub. The original PR's flow
deferred everything to whenever a real user happened to message the
target platform; this rewrites it so the gateway picks up handoffs
immediately and the destination chat just starts working.
State machine on sessions table replaces the boolean flag:
None -> 'pending' -> 'running' -> ('completed' | 'failed')
plus handoff_error for failure reasons. CLI request_handoff /
get_handoff_state / list_pending_handoffs / claim_handoff /
complete_handoff / fail_handoff helpers wrap the transitions.
CLI side (cli.py): /handoff <platform> validates the platform's home
channel via load_gateway_config, refuses if the agent is mid-turn,
flips the row to 'pending', and poll-blocks (60s) on terminal state.
On 'completed' it prints the /resume hint and exits the CLI like
/quit. On 'failed' or timeout it surfaces the reason and the CLI
session stays intact.
Gateway side (gateway/run.py): new _handoff_watcher background task
scans state.db every 2s, atomically claims pending rows, and runs
_process_handoff for each. _process_handoff:
1. Resolves the platform's home channel.
2. Asks the adapter for a fresh thread via the new
create_handoff_thread(parent_chat_id, name) capability so the
handed-off conversation gets its own scrollback. Adapters that
don't support threads (or fail) return None and the watcher
falls back to the home channel directly.
3. Constructs a SessionSource keyed as 'thread' when a thread was
created, 'dm' otherwise, then session_store.switch_session
re-binds the destination key to the CLI session_id. The full
role-aware transcript replays via load_transcript on the next
turn (no flat-text injection into context_prompt).
4. Forges a synthetic MessageEvent(internal=True) with the handoff
notice and dispatches through _handle_message; the agent runs
against the loaded transcript and adapter.send delivers the
reply.
5. Marks the row 'completed' on success, 'failed' (+error) on any
exception.
Adapter capability (gateway/platforms/base.py): create_handoff_thread
default returns None. Three overrides:
- Telegram (gateway/platforms/telegram.py): wraps _create_dm_topic
so DM topics (Bot API 9.4+) and forum supergroups both work.
- Discord (gateway/platforms/discord.py): parent.create_thread on
text channels with a seed-message + message.create_thread
fallback for permission edge cases. Skips DMs and other
non-thread-capable parents.
- Slack (gateway/platforms/slack.py): posts a seed message and
returns its ts as the thread anchor — Slack threads are
message-anchored.
In thread mode, build_session_key keys the destination without
user_id (thread_sessions_per_user defaults to False) so the synthetic
turn and any later real-user message in the thread share the same
session_key — seamless takeover without race.
CommandDef stays cli_only=True (handoff is initiated from the CLI;
gateway exposes /resume for the reverse direction).
Removed the original PR's _handle_message_with_agent handoff hook
(transcript-as-text injection into context_prompt) and the
send_message_tool notification — both replaced by the watcher path.
Tests rewritten around the new state machine: 13/13 pass.
E2E-validated thread + no-thread paths and the failure path against
real worktree imports with mocked adapters.
This commit is contained in:
parent
878611a79d
commit
00ce5f04d9
8 changed files with 737 additions and 189 deletions
264
gateway/run.py
264
gateway/run.py
|
|
@ -3681,10 +3681,234 @@ class GatewayRunner:
|
|||
)
|
||||
asyncio.create_task(self._platform_reconnect_watcher())
|
||||
|
||||
# Start background handoff watcher — picks up CLI sessions marked
|
||||
# handoff_state='pending' in state.db and re-binds them to the
|
||||
# destination platform's home channel, then forges a synthetic user
|
||||
# turn so the agent kicks off the new chat.
|
||||
asyncio.create_task(self._handoff_watcher())
|
||||
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
return True
|
||||
|
||||
async def _handoff_watcher(self, interval: float = 2.0) -> None:
|
||||
"""Background task that processes pending CLI→gateway session handoffs.
|
||||
|
||||
Polls ``state.db`` for sessions in ``handoff_state='pending'`` and,
|
||||
for each one:
|
||||
|
||||
1. Atomically claims it (pending → running).
|
||||
2. Resolves the destination platform's configured home channel.
|
||||
3. Re-binds the gateway's session_key for that home channel to the
|
||||
CLI's existing session_id via ``session_store.switch_session`` so
|
||||
the full role-aware transcript replays on the next agent turn.
|
||||
4. Forges a synthetic ``MessageEvent`` (``internal=True``) with a
|
||||
handoff-notice text and dispatches through the normal gateway
|
||||
message pipeline so the agent runs and replies on the platform.
|
||||
5. Marks the row ``completed`` (or ``failed`` with ``handoff_error``).
|
||||
|
||||
The CLI process is poll-blocked on the row's terminal state and
|
||||
prints the result to the user.
|
||||
"""
|
||||
# Initial delay so the gateway is fully connected to its platforms
|
||||
# before we try to dispatch handoffs through them.
|
||||
await asyncio.sleep(5)
|
||||
while self._running:
|
||||
try:
|
||||
if self._session_db is None:
|
||||
await asyncio.sleep(interval)
|
||||
continue
|
||||
pending = self._session_db.list_pending_handoffs()
|
||||
for row in pending:
|
||||
session_id = row.get("id")
|
||||
if not session_id:
|
||||
continue
|
||||
if not self._session_db.claim_handoff(session_id):
|
||||
# Another tick or another gateway already claimed it.
|
||||
continue
|
||||
try:
|
||||
await self._process_handoff(row)
|
||||
self._session_db.complete_handoff(session_id)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Handoff for session %s failed: %s",
|
||||
session_id, exc, exc_info=True,
|
||||
)
|
||||
self._session_db.fail_handoff(session_id, str(exc))
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.debug("Handoff watcher tick error: %s", exc, exc_info=True)
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
async def _process_handoff(self, row: Dict[str, Any]) -> None:
|
||||
"""Execute one handoff row. Raises on failure (caller marks failed)."""
|
||||
from gateway.config import Platform
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
from gateway.platforms.base import MessageEvent
|
||||
|
||||
cli_session_id = row["id"]
|
||||
platform_name = (row.get("handoff_platform") or "").strip().lower()
|
||||
if not platform_name:
|
||||
raise RuntimeError("handoff_platform is empty")
|
||||
|
||||
# Resolve platform enum
|
||||
try:
|
||||
platform = Platform(platform_name)
|
||||
except (ValueError, KeyError):
|
||||
raise RuntimeError(f"unknown platform '{platform_name}'")
|
||||
|
||||
# Adapter must be live
|
||||
adapter = self.adapters.get(platform)
|
||||
if not adapter:
|
||||
raise RuntimeError(
|
||||
f"platform '{platform_name}' is not active in this gateway"
|
||||
)
|
||||
|
||||
# Home channel must be configured
|
||||
home = self.config.get_home_channel(platform)
|
||||
if not home or not home.chat_id:
|
||||
raise RuntimeError(
|
||||
f"no home channel configured for {platform_name}; "
|
||||
f"run /sethome on the desired chat first"
|
||||
)
|
||||
|
||||
cli_title = row.get("title") or cli_session_id[:8]
|
||||
|
||||
# Try to create a fresh thread on the destination so the handoff
|
||||
# has its own scrollback. Adapter returns None if threading isn't
|
||||
# supported (Matrix/WhatsApp/Signal/SMS) or if creation failed
|
||||
# (no permission, topics-mode off, parent is a DM, etc.). When
|
||||
# None we fall through to using the home channel directly — the
|
||||
# synthetic turn still lands; just without thread isolation.
|
||||
thread_name = f"Hermes — {cli_title}"
|
||||
try:
|
||||
new_thread_id = await adapter.create_handoff_thread(
|
||||
str(home.chat_id), thread_name,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
"Handoff: create_handoff_thread raised on %s: %s",
|
||||
platform_name, exc, exc_info=True,
|
||||
)
|
||||
new_thread_id = None
|
||||
|
||||
# Use the new thread if the adapter created one; otherwise fall
|
||||
# back to whatever thread (if any) the home channel was configured
|
||||
# with.
|
||||
effective_thread_id = new_thread_id or (
|
||||
str(home.thread_id) if home.thread_id else None
|
||||
)
|
||||
|
||||
# Determine chat_type for the destination source. If we created a
|
||||
# thread, key the session_key as a thread (build_session_key sets
|
||||
# thread sessions to user-shared by default, which is what we
|
||||
# want — the synthetic turn and any later real-user message both
|
||||
# land on the same key without needing a user_id).
|
||||
if new_thread_id:
|
||||
dest_chat_type = "thread"
|
||||
else:
|
||||
# No thread — assume DM-style for the home channel. For
|
||||
# group/channel home channels without thread support
|
||||
# (Matrix/WhatsApp/Signal), the platform's own keying makes
|
||||
# the synthetic turn shared anyway (single-DM platforms).
|
||||
dest_chat_type = "dm"
|
||||
|
||||
dest_source = SessionSource(
|
||||
platform=platform,
|
||||
chat_id=str(home.chat_id),
|
||||
chat_name=home.name,
|
||||
chat_type=dest_chat_type,
|
||||
user_id="system:handoff",
|
||||
user_name="Handoff",
|
||||
thread_id=effective_thread_id,
|
||||
)
|
||||
|
||||
# Compute the gateway's session_key for that destination using the
|
||||
# same rules its adapters use, so switch_session targets the right
|
||||
# entry. For thread destinations build_session_key keys without
|
||||
# user_id (thread_sessions_per_user defaults to False) — so the
|
||||
# next real user message in the thread shares this same session.
|
||||
platform_cfg = self.config.platforms.get(platform)
|
||||
extra = platform_cfg.extra if platform_cfg else {}
|
||||
session_key = build_session_key(
|
||||
dest_source,
|
||||
group_sessions_per_user=extra.get("group_sessions_per_user", True),
|
||||
thread_sessions_per_user=extra.get("thread_sessions_per_user", False),
|
||||
)
|
||||
|
||||
# Make sure there's an entry in the session_store for this key. If
|
||||
# the home channel has never been used, get_or_create_session
|
||||
# creates one; switch_session then re-points it.
|
||||
self.session_store.get_or_create_session(dest_source)
|
||||
|
||||
# Re-bind the destination key to the CLI session_id. switch_session
|
||||
# ends the prior session in SQLite and reopens the CLI session under
|
||||
# the new key. The CLI's transcript becomes the active one for the
|
||||
# gateway from this moment on.
|
||||
switched = self.session_store.switch_session(session_key, cli_session_id)
|
||||
if switched is None:
|
||||
raise RuntimeError(
|
||||
f"could not switch session key {session_key} → {cli_session_id}"
|
||||
)
|
||||
|
||||
# Evict any cached AIAgent for this session_key so the next dispatch
|
||||
# rebuilds it against the CLI session_id (mirrors /resume / /branch).
|
||||
self._evict_cached_agent(session_key)
|
||||
|
||||
# Cancel any in-flight running-agent state for the destination key
|
||||
# so the synthetic turn isn't queued behind a stale running flag.
|
||||
self._release_running_agent_state(session_key)
|
||||
|
||||
synthetic_text = (
|
||||
f"[Session was just handed off from CLI (\"{cli_title}\") to this "
|
||||
f"channel. The full prior conversation history is loaded above. "
|
||||
f"Briefly confirm you're working here and summarize what we were "
|
||||
f"working on, so the user can continue from this device.]"
|
||||
)
|
||||
|
||||
synthetic_event = MessageEvent(
|
||||
text=synthetic_text,
|
||||
source=dest_source,
|
||||
internal=True,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Handoff: dispatching synthetic turn for CLI session %s → %s "
|
||||
"(home=%s, thread=%s, session_key=%s)",
|
||||
cli_session_id, platform_name, home.chat_id, effective_thread_id,
|
||||
session_key,
|
||||
)
|
||||
|
||||
# Dispatch through the runner directly. Going through
|
||||
# adapter.handle_message would spawn a background task and we'd
|
||||
# lose synchronous error visibility; calling _handle_message inline
|
||||
# keeps the success/failure path observable for the watcher.
|
||||
response_text = await self._handle_message(synthetic_event)
|
||||
if not response_text:
|
||||
# Streaming may have already delivered the response inline.
|
||||
# Either way, agent ran without raising — count as success.
|
||||
return
|
||||
|
||||
# Send the agent's reply to the destination. Route to the new
|
||||
# thread if we created one; otherwise the configured home channel
|
||||
# (which may itself carry a thread_id).
|
||||
send_metadata: Dict[str, Any] = {}
|
||||
if effective_thread_id:
|
||||
send_metadata["thread_id"] = effective_thread_id
|
||||
try:
|
||||
result = await adapter.send(
|
||||
chat_id=str(home.chat_id),
|
||||
content=response_text,
|
||||
metadata=send_metadata or None,
|
||||
)
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"adapter.send failed: {exc}") from exc
|
||||
|
||||
if not getattr(result, "success", True):
|
||||
err = getattr(result, "error", "send returned success=False")
|
||||
raise RuntimeError(f"adapter.send failed: {err}")
|
||||
|
||||
async def _session_expiry_watcher(self, interval: int = 300):
|
||||
"""Background task that finalizes expired sessions.
|
||||
|
||||
|
|
@ -6648,46 +6872,6 @@ class GatewayRunner:
|
|||
|
||||
# Build the context prompt to inject
|
||||
context_prompt = build_session_context_prompt(context, redact_pii=_redact_pii)
|
||||
|
||||
# Check for pending CLI handoff
|
||||
if _is_new_session and self._session_db:
|
||||
try:
|
||||
platform_key = source.platform.value if source.platform else ""
|
||||
handoff = self._session_db.find_pending_handoff(platform_key)
|
||||
if handoff:
|
||||
cli_session_id = handoff["id"]
|
||||
cli_messages = self._session_db.get_messages(cli_session_id)
|
||||
if cli_messages:
|
||||
# Cap to last 200 messages to avoid context blowup
|
||||
cli_messages = cli_messages[-200:]
|
||||
transcript = []
|
||||
for msg in cli_messages:
|
||||
role = msg.get("role", "unknown")
|
||||
content = str(msg.get("content") or "")
|
||||
if content.strip():
|
||||
label = {"user": "User", "assistant": "Assistant",
|
||||
"system": "System", "tool": "Tool"}.get(role, role.title())
|
||||
transcript.append(f"{label}: {content}")
|
||||
if transcript:
|
||||
handoff_title = handoff.get("title") or "untitled"
|
||||
handoff_context = (
|
||||
f"[Handoff from CLI session '{handoff_title}'. "
|
||||
f"Continue the conversation below where it left off.]"
|
||||
)
|
||||
context_prompt = (
|
||||
handoff_context
|
||||
+ "\n\n--- Previous conversation ---\n"
|
||||
+ "\n\n".join(transcript)
|
||||
+ "\n--- End of previous conversation ---\n\n"
|
||||
+ context_prompt
|
||||
)
|
||||
self._session_db.clear_handoff_pending(cli_session_id)
|
||||
logger.info(
|
||||
"Handoff: CLI session %s handed off to %s chat %s",
|
||||
cli_session_id, platform_key, source.chat_id,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Handoff check failed", exc_info=True)
|
||||
|
||||
# If the previous session expired and was auto-reset, prepend a notice
|
||||
# so the agent knows this is a fresh conversation (not an intentional /reset).
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue