From 00ce5f04d9cfad2e30d5e08e7a6bf135f6e53030 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sun, 10 May 2026 12:56:31 -0700 Subject: [PATCH] feat(session): make /handoff actually transfer the session live MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on @kshitijk4poor's CLI handoff stub. The original PR's flow deferred everything to whenever a real user happened to message the target platform; this rewrites it so the gateway picks up handoffs immediately and the destination chat just starts working. State machine on sessions table replaces the boolean flag: None -> 'pending' -> 'running' -> ('completed' | 'failed') plus handoff_error for failure reasons. CLI request_handoff / get_handoff_state / list_pending_handoffs / claim_handoff / complete_handoff / fail_handoff helpers wrap the transitions. CLI side (cli.py): /handoff validates the platform's home channel via load_gateway_config, refuses if the agent is mid-turn, flips the row to 'pending', and poll-blocks (60s) on terminal state. On 'completed' it prints the /resume hint and exits the CLI like /quit. On 'failed' or timeout it surfaces the reason and the CLI session stays intact. Gateway side (gateway/run.py): new _handoff_watcher background task scans state.db every 2s, atomically claims pending rows, and runs _process_handoff for each. _process_handoff: 1. Resolves the platform's home channel. 2. Asks the adapter for a fresh thread via the new create_handoff_thread(parent_chat_id, name) capability so the handed-off conversation gets its own scrollback. Adapters that don't support threads (or fail) return None and the watcher falls back to the home channel directly. 3. Constructs a SessionSource keyed as 'thread' when a thread was created, 'dm' otherwise, then session_store.switch_session re-binds the destination key to the CLI session_id. The full role-aware transcript replays via load_transcript on the next turn (no flat-text injection into context_prompt). 4. Forges a synthetic MessageEvent(internal=True) with the handoff notice and dispatches through _handle_message; the agent runs against the loaded transcript and adapter.send delivers the reply. 5. Marks the row 'completed' on success, 'failed' (+error) on any exception. Adapter capability (gateway/platforms/base.py): create_handoff_thread default returns None. Three overrides: - Telegram (gateway/platforms/telegram.py): wraps _create_dm_topic so DM topics (Bot API 9.4+) and forum supergroups both work. - Discord (gateway/platforms/discord.py): parent.create_thread on text channels with a seed-message + message.create_thread fallback for permission edge cases. Skips DMs and other non-thread-capable parents. - Slack (gateway/platforms/slack.py): posts a seed message and returns its ts as the thread anchor — Slack threads are message-anchored. In thread mode, build_session_key keys the destination without user_id (thread_sessions_per_user defaults to False) so the synthetic turn and any later real-user message in the thread share the same session_key — seamless takeover without race. CommandDef stays cli_only=True (handoff is initiated from the CLI; gateway exposes /resume for the reverse direction). Removed the original PR's _handle_message_with_agent handoff hook (transcript-as-text injection into context_prompt) and the send_message_tool notification — both replaced by the watcher path. Tests rewritten around the new state machine: 13/13 pass. E2E-validated thread + no-thread paths and the failure path against real worktree imports with mocked adapters. --- cli.py | 191 ++++++++++------ gateway/platforms/base.py | 27 +++ gateway/platforms/discord.py | 78 +++++++ gateway/platforms/slack.py | 35 +++ gateway/platforms/telegram.py | 18 ++ gateway/run.py | 264 +++++++++++++++++++---- hermes_state.py | 94 ++++++-- tests/hermes_cli/test_session_handoff.py | 219 +++++++++++++------ 8 files changed, 737 insertions(+), 189 deletions(-) diff --git a/cli.py b/cli.py index 704478e5245..b63cde590b7 100644 --- a/cli.py +++ b/cli.py @@ -5484,87 +5484,155 @@ class HermesCLI: else: print("(^_^)v New session started!") - def _handle_handoff_command(self, cmd_original: str) -> None: - """Handle /handoff — hand off current session to a messaging platform.""" + def _handle_handoff_command(self, cmd_original: str) -> bool: + """Handle ``/handoff `` — transfer this CLI session to a gateway platform. + + Flow: + 1. Validate platform name + the gateway has a home channel for it. + 2. Reject if the agent is currently running (the in-flight turn + would race with the gateway's switch_session). + 3. Write ``handoff_state='pending'`` on this session row. + 4. Block-poll ``state.db`` for terminal state (timeout 60s). + 5. On ``completed`` → print resume hint and signal CLI exit by + returning False (the caller honors that like ``/quit``). + 6. On ``failed`` / timeout → print error and return True so the + user keeps their CLI session. + + Returns: + False to signal CLI exit, True to keep going. + """ from hermes_state import format_session_db_unavailable parts = cmd_original.split(maxsplit=1) if len(parts) < 2 or not parts[1].strip(): _cprint(" Usage: /handoff ") - _cprint(" Supported: telegram, discord, slack, whatsapp, signal, matrix") - _cprint(" The session will become available on that platform's home channel.") - return + _cprint(" Hands the current session off to that platform's home channel.") + _cprint(" The CLI session ends here; resume it later with /resume.") + return True - platform = parts[1].strip().lower() - supported = {"telegram", "discord", "slack", "whatsapp", "signal", "matrix"} - if platform not in supported: - _cprint(f" Unknown platform '{platform}'. Supported: {', '.join(sorted(supported))}") - return + platform_name = parts[1].strip().lower() - # Ensure session is in the DB + # Validate platform name + home channel via the live gateway config. + try: + from gateway.config import load_gateway_config, Platform + except Exception as exc: # pragma: no cover — gateway pkg always shipped + _cprint(f" Could not load gateway config: {exc}") + return True + + try: + platform = Platform(platform_name) + except (ValueError, KeyError): + _cprint(f" Unknown platform '{platform_name}'.") + return True + + try: + gw_config = load_gateway_config() + except Exception as exc: + _cprint(f" Could not load gateway config: {exc}") + return True + + pcfg = gw_config.platforms.get(platform) + if not pcfg or not pcfg.enabled: + _cprint(f" Platform '{platform_name}' is not configured/enabled in the gateway.") + return True + + home = gw_config.get_home_channel(platform) + if not home or not home.chat_id: + _cprint(f" No home channel configured for {platform_name}.") + _cprint(f" Set one with /sethome on the destination chat first.") + return True + + # Refuse mid-turn: an in-flight agent run would race with the + # gateway's switch_session and the synthetic turn dispatch. + if getattr(self, "_agent_running", False): + _cprint(" Agent is busy. Wait for the current turn to finish, then retry /handoff.") + return True + + # Make sure we have a SessionDB handle. if not self._session_db: - from hermes_state import SessionDB - self._session_db = SessionDB() - + try: + from hermes_state import SessionDB + self._session_db = SessionDB() + except Exception: + pass if not self._session_db: _cprint(f" {format_session_db_unavailable()}") - return + return True - # Make sure the session has a title + # Make sure the session row exists in state.db. Most CLI sessions + # are written via _flush_messages_to_session_db on the first turn + # already, but if the user tries to hand off an empty session we + # still want a row to mark. + try: + row = self._session_db.get_session(self.session_id) + if not row: + # Nothing has flushed yet. Create a stub so the gateway has + # something to switch_session onto. Inserting via title-set + # is the simplest path because set_session_title's INSERT OR + # IGNORE creates the row. + placeholder_title = f"handoff-{self.session_id[:8]}" + self._session_db.set_session_title(self.session_id, placeholder_title) + except Exception as exc: + _cprint(f" Could not ensure session row in state.db: {exc}") + return True + + # Display title for messaging. session_title = "" try: - session_meta = self._session_db.get_session(self.session_id) - if session_meta: - session_title = session_meta.get("title") or "" + row = self._session_db.get_session(self.session_id) + if row: + session_title = row.get("title") or "" except Exception: pass - if not session_title: - # Auto-title from conversation if not set - if hasattr(self, "agent") and self.agent and self.conversation_history: - last_user_msgs = [m for m in self.conversation_history[-6:] if m.get("role") == "user"] - if last_user_msgs: - title = last_user_msgs[0].get("content", "")[:60] - title = title.replace("\n", " ").strip() - if title: - session_title = title - self._session_db.set_session_title(self.session_id, title) + session_title = self.session_id[:8] - if not session_title: - session_title = "untitled session" - - # Mark session for handoff - ok = self._session_db.set_handoff_pending(self.session_id, platform) + # Mark pending — gateway watcher will pick this up. + ok = self._session_db.request_handoff(self.session_id, platform_name) if not ok: - _cprint(f" Session is already pending handoff or not found.") - return + _cprint(" Session is already in flight for handoff. Wait for it to settle, then retry.") + return True - _cprint(f" Session '{session_title}' queued for handoff to {platform}.") - _cprint(f" The session will resume when the next message arrives on the {platform} home channel.") + _cprint(f" Queued handoff of '{session_title}' → {platform_name} (home: {home.name}).") + _cprint(f" Waiting for the gateway to pick it up...") - # Also try to send a notification via send_message + # Poll-block on terminal state. Tick every 0.5s; bail at ~60s. + import time as _time + deadline = _time.time() + 60.0 + last_state = "pending" + while _time.time() < deadline: + try: + state_row = self._session_db.get_handoff_state(self.session_id) + except Exception: + state_row = None + current = (state_row or {}).get("state") or "pending" + if current != last_state: + if current == "running": + _cprint(" Gateway picked it up; transferring...") + last_state = current + if current == "completed": + _cprint("") + _cprint(f" ↻ Handoff complete. The session is now active on {platform_name}.") + _cprint(f" Resume it on this CLI later with: /resume {session_title}") + _cprint("") + # End the CLI cleanly — same exit semantics as /quit. + self._should_exit = True + return False + if current == "failed": + err = (state_row or {}).get("error") or "unknown error" + _cprint(f" Handoff failed: {err}") + _cprint(" Your CLI session is intact. Try /handoff again, or /resume on the platform manually.") + return True + _time.sleep(0.5) + + # Timed out. Clear the pending flag so the user can retry. try: - summary_lines = ["Handoff from CLI", f"Session: {session_title}"] - if hasattr(self, "agent") and self.agent: - last_msgs = self.conversation_history[-4:] if self.conversation_history else [] - for msg in last_msgs: - role = msg.get("role", "") - content = str(msg.get("content", ""))[:120] - if content.strip(): - summary_lines.append(f"[{role}] {content}") - summary = "\n".join(summary_lines) - - from tools.send_message_tool import send_message_tool - result_json = send_message_tool({"target": platform, "message": summary}) - import json - result = json.loads(result_json) - if result.get("success"): - _cprint(f" Notification sent to {platform} home channel.") - else: - err = result.get("error", "unknown error") - _cprint(f" Could not send notification to {platform}: {err}") - except Exception as e: - _cprint(f" Could not send notification: {e}") + self._session_db.fail_handoff(self.session_id, "timed out waiting for gateway") + except Exception: + pass + _cprint(" Timed out waiting for the gateway. Is `hermes gateway` running?") + _cprint(" Your CLI session is intact.") + return True def _handle_resume_command(self, cmd_original: str) -> None: """Handle /resume — switch to a previous session mid-conversation.""" @@ -6993,7 +7061,8 @@ class HermesCLI: from hermes_state import format_session_db_unavailable _cprint(f" {format_session_db_unavailable()}") elif canonical == "handoff": - self._handle_handoff_command(cmd_original) + if not self._handle_handoff_command(cmd_original): + return False elif canonical == "new": parts = cmd_original.split(maxsplit=1) title = parts[1].strip() if len(parts) > 1 else None diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 413cebfbe87..d471818a27c 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1511,6 +1511,33 @@ class BasePlatformAdapter(ABC): # property) so the stream consumer knows not to short-circuit. REQUIRES_EDIT_FINALIZE: bool = False + async def create_handoff_thread( + self, + parent_chat_id: str, + name: str, + ) -> Optional[str]: + """Create a fresh thread under ``parent_chat_id`` for a session handoff. + + Used by the gateway's handoff watcher when transferring a CLI + session to a thread-capable platform — the new thread isolates the + handed-off conversation from any pre-existing chat in the home + channel and gives users a clean per-handoff scrollback. + + Returns the new thread/topic id (as a string) on success, or + ``None`` if the platform doesn't support threading or the + attempt failed (permissions, topics-mode off, etc.). When ``None`` + is returned the watcher falls back to using ``parent_chat_id`` + directly. + + Default implementation returns ``None`` — adapters that support + threads override this. See: + - Telegram: forum topics in groups, DM topics with bot API 9.4+ + - Discord: text-channel threads (1440-min auto-archive) + - Slack: seed-message thread anchoring + """ + return None + + async def edit_message( self, chat_id: str, diff --git a/gateway/platforms/discord.py b/gateway/platforms/discord.py index ae107cdfb2b..f7483397f95 100644 --- a/gateway/platforms/discord.py +++ b/gateway/platforms/discord.py @@ -3689,6 +3689,84 @@ class DiscordAdapter(BasePlatformAdapter): ) return None + async def create_handoff_thread( + self, + parent_chat_id: str, + name: str, + ) -> Optional[str]: + """Create a Discord thread under a text channel for a handoff. + + Falls back to a seed-message + ``message.create_thread`` path if + ``parent.create_thread`` is rejected (some channel types or + permission setups). Returns the new thread id as a string, or + ``None`` on failure or when the parent isn't a text channel + (DMs, voice channels, threads themselves can't host threads). + """ + if not self._client or not DISCORD_AVAILABLE: + return None + + try: + parent_id = int(parent_chat_id) + except (TypeError, ValueError): + return None + + try: + parent = self._client.get_channel(parent_id) + if parent is None: + parent = await self._client.fetch_channel(parent_id) + except Exception as exc: + logger.warning( + "[%s] Handoff thread: cannot resolve parent %s: %s", + self.name, parent_chat_id, exc, + ) + return None + + # DMs, voice channels, and existing threads can't host child threads. + if isinstance(parent, getattr(discord, "DMChannel", tuple())): + logger.info( + "[%s] Handoff thread: parent %s is a DM; threads not supported here", + self.name, parent_chat_id, + ) + return None + + thread_name = (name or "handoff").strip()[:80] or "handoff" + reason = "Hermes session handoff" + + # First try: create a thread directly on the channel. + try: + create = getattr(parent, "create_thread", None) + if create is not None: + thread = await create( + name=thread_name, + auto_archive_duration=1440, + reason=reason, + ) + return str(thread.id) + except Exception as direct_error: + logger.debug( + "[%s] Handoff thread: direct create failed (%s); trying seed-message fallback", + self.name, direct_error, + ) + + # Fallback: post a seed message and create the thread from it. + try: + send = getattr(parent, "send", None) + if send is None: + return None + seed_msg = await send(f"\U0001f9f5 Hermes handoff: **{thread_name}**") + thread = await seed_msg.create_thread( + name=thread_name, + auto_archive_duration=1440, + reason=reason, + ) + return str(thread.id) + except Exception as fallback_error: + logger.warning( + "[%s] Handoff thread: both create paths failed for parent %s: %s", + self.name, parent_chat_id, fallback_error, + ) + return None + async def send_exec_approval( self, chat_id: str, command: str, session_key: str, description: str = "dangerous command", diff --git a/gateway/platforms/slack.py b/gateway/platforms/slack.py index 843fb78959c..60912bc18e0 100644 --- a/gateway/platforms/slack.py +++ b/gateway/platforms/slack.py @@ -679,6 +679,41 @@ class SlackAdapter(BasePlatformAdapter): if lock_acquired and not self._running: self._release_platform_lock() + async def create_handoff_thread( + self, + parent_chat_id: str, + name: str, + ) -> Optional[str]: + """Create a Slack thread anchor for a session handoff. + + Slack threads are anchored to a parent message (``thread_ts``), not + a channel-level construct. So we post a seed message into the home + channel and return its ``ts`` — the watcher uses that as the + ``thread_id`` for subsequent sends. + + Returns the seed message ts as a string, or ``None`` on failure. + """ + if not self._app: + return None + try: + client = self._get_client(parent_chat_id) + if client is None: + return None + seed_text = f":thread: Hermes handoff — *{(name or 'session').strip()[:80]}*" + result = await client.chat_postMessage( + channel=parent_chat_id, + text=seed_text, + ) + ts = result.get("ts") if isinstance(result, dict) else getattr(result, "get", lambda _k, _d=None: None)("ts") + if ts: + return str(ts) + except Exception as exc: + logger.warning( + "[%s] Handoff thread: seed-post failed for channel %s: %s", + self.name, parent_chat_id, exc, + ) + return None + async def disconnect(self) -> None: """Disconnect from Slack.""" if self._handler: diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 9bae59a3497..ae34ee9210c 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -865,6 +865,24 @@ class TelegramAdapter(BasePlatformAdapter): ) return None + async def create_handoff_thread( + self, + parent_chat_id: str, + name: str, + ) -> Optional[str]: + """Create a forum topic for a session handoff. + + Works for DM topics (Bot API 9.4+, requires user to enable Topics + in their chat with the bot) and forum supergroups. Returns the + ``message_thread_id`` as a string, or ``None`` on failure. + """ + try: + chat_id_int = int(parent_chat_id) + except (TypeError, ValueError): + return None + thread_id = await self._create_dm_topic(chat_id_int, name=name) + return str(thread_id) if thread_id else None + async def rename_dm_topic( self, chat_id: int, diff --git a/gateway/run.py b/gateway/run.py index a63dcc91400..ec2fdcb249e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3681,10 +3681,234 @@ class GatewayRunner: ) asyncio.create_task(self._platform_reconnect_watcher()) + # Start background handoff watcher — picks up CLI sessions marked + # handoff_state='pending' in state.db and re-binds them to the + # destination platform's home channel, then forges a synthetic user + # turn so the agent kicks off the new chat. + asyncio.create_task(self._handoff_watcher()) + logger.info("Press Ctrl+C to stop") return True + async def _handoff_watcher(self, interval: float = 2.0) -> None: + """Background task that processes pending CLI→gateway session handoffs. + + Polls ``state.db`` for sessions in ``handoff_state='pending'`` and, + for each one: + + 1. Atomically claims it (pending → running). + 2. Resolves the destination platform's configured home channel. + 3. Re-binds the gateway's session_key for that home channel to the + CLI's existing session_id via ``session_store.switch_session`` so + the full role-aware transcript replays on the next agent turn. + 4. Forges a synthetic ``MessageEvent`` (``internal=True``) with a + handoff-notice text and dispatches through the normal gateway + message pipeline so the agent runs and replies on the platform. + 5. Marks the row ``completed`` (or ``failed`` with ``handoff_error``). + + The CLI process is poll-blocked on the row's terminal state and + prints the result to the user. + """ + # Initial delay so the gateway is fully connected to its platforms + # before we try to dispatch handoffs through them. + await asyncio.sleep(5) + while self._running: + try: + if self._session_db is None: + await asyncio.sleep(interval) + continue + pending = self._session_db.list_pending_handoffs() + for row in pending: + session_id = row.get("id") + if not session_id: + continue + if not self._session_db.claim_handoff(session_id): + # Another tick or another gateway already claimed it. + continue + try: + await self._process_handoff(row) + self._session_db.complete_handoff(session_id) + except Exception as exc: + logger.warning( + "Handoff for session %s failed: %s", + session_id, exc, exc_info=True, + ) + self._session_db.fail_handoff(session_id, str(exc)) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.debug("Handoff watcher tick error: %s", exc, exc_info=True) + await asyncio.sleep(interval) + + async def _process_handoff(self, row: Dict[str, Any]) -> None: + """Execute one handoff row. Raises on failure (caller marks failed).""" + from gateway.config import Platform + from gateway.session import SessionSource, build_session_key + from gateway.platforms.base import MessageEvent + + cli_session_id = row["id"] + platform_name = (row.get("handoff_platform") or "").strip().lower() + if not platform_name: + raise RuntimeError("handoff_platform is empty") + + # Resolve platform enum + try: + platform = Platform(platform_name) + except (ValueError, KeyError): + raise RuntimeError(f"unknown platform '{platform_name}'") + + # Adapter must be live + adapter = self.adapters.get(platform) + if not adapter: + raise RuntimeError( + f"platform '{platform_name}' is not active in this gateway" + ) + + # Home channel must be configured + home = self.config.get_home_channel(platform) + if not home or not home.chat_id: + raise RuntimeError( + f"no home channel configured for {platform_name}; " + f"run /sethome on the desired chat first" + ) + + cli_title = row.get("title") or cli_session_id[:8] + + # Try to create a fresh thread on the destination so the handoff + # has its own scrollback. Adapter returns None if threading isn't + # supported (Matrix/WhatsApp/Signal/SMS) or if creation failed + # (no permission, topics-mode off, parent is a DM, etc.). When + # None we fall through to using the home channel directly — the + # synthetic turn still lands; just without thread isolation. + thread_name = f"Hermes — {cli_title}" + try: + new_thread_id = await adapter.create_handoff_thread( + str(home.chat_id), thread_name, + ) + except Exception as exc: + logger.debug( + "Handoff: create_handoff_thread raised on %s: %s", + platform_name, exc, exc_info=True, + ) + new_thread_id = None + + # Use the new thread if the adapter created one; otherwise fall + # back to whatever thread (if any) the home channel was configured + # with. + effective_thread_id = new_thread_id or ( + str(home.thread_id) if home.thread_id else None + ) + + # Determine chat_type for the destination source. If we created a + # thread, key the session_key as a thread (build_session_key sets + # thread sessions to user-shared by default, which is what we + # want — the synthetic turn and any later real-user message both + # land on the same key without needing a user_id). + if new_thread_id: + dest_chat_type = "thread" + else: + # No thread — assume DM-style for the home channel. For + # group/channel home channels without thread support + # (Matrix/WhatsApp/Signal), the platform's own keying makes + # the synthetic turn shared anyway (single-DM platforms). + dest_chat_type = "dm" + + dest_source = SessionSource( + platform=platform, + chat_id=str(home.chat_id), + chat_name=home.name, + chat_type=dest_chat_type, + user_id="system:handoff", + user_name="Handoff", + thread_id=effective_thread_id, + ) + + # Compute the gateway's session_key for that destination using the + # same rules its adapters use, so switch_session targets the right + # entry. For thread destinations build_session_key keys without + # user_id (thread_sessions_per_user defaults to False) — so the + # next real user message in the thread shares this same session. + platform_cfg = self.config.platforms.get(platform) + extra = platform_cfg.extra if platform_cfg else {} + session_key = build_session_key( + dest_source, + group_sessions_per_user=extra.get("group_sessions_per_user", True), + thread_sessions_per_user=extra.get("thread_sessions_per_user", False), + ) + + # Make sure there's an entry in the session_store for this key. If + # the home channel has never been used, get_or_create_session + # creates one; switch_session then re-points it. + self.session_store.get_or_create_session(dest_source) + + # Re-bind the destination key to the CLI session_id. switch_session + # ends the prior session in SQLite and reopens the CLI session under + # the new key. The CLI's transcript becomes the active one for the + # gateway from this moment on. + switched = self.session_store.switch_session(session_key, cli_session_id) + if switched is None: + raise RuntimeError( + f"could not switch session key {session_key} → {cli_session_id}" + ) + + # Evict any cached AIAgent for this session_key so the next dispatch + # rebuilds it against the CLI session_id (mirrors /resume / /branch). + self._evict_cached_agent(session_key) + + # Cancel any in-flight running-agent state for the destination key + # so the synthetic turn isn't queued behind a stale running flag. + self._release_running_agent_state(session_key) + + synthetic_text = ( + f"[Session was just handed off from CLI (\"{cli_title}\") to this " + f"channel. The full prior conversation history is loaded above. " + f"Briefly confirm you're working here and summarize what we were " + f"working on, so the user can continue from this device.]" + ) + + synthetic_event = MessageEvent( + text=synthetic_text, + source=dest_source, + internal=True, + ) + + logger.info( + "Handoff: dispatching synthetic turn for CLI session %s → %s " + "(home=%s, thread=%s, session_key=%s)", + cli_session_id, platform_name, home.chat_id, effective_thread_id, + session_key, + ) + + # Dispatch through the runner directly. Going through + # adapter.handle_message would spawn a background task and we'd + # lose synchronous error visibility; calling _handle_message inline + # keeps the success/failure path observable for the watcher. + response_text = await self._handle_message(synthetic_event) + if not response_text: + # Streaming may have already delivered the response inline. + # Either way, agent ran without raising — count as success. + return + + # Send the agent's reply to the destination. Route to the new + # thread if we created one; otherwise the configured home channel + # (which may itself carry a thread_id). + send_metadata: Dict[str, Any] = {} + if effective_thread_id: + send_metadata["thread_id"] = effective_thread_id + try: + result = await adapter.send( + chat_id=str(home.chat_id), + content=response_text, + metadata=send_metadata or None, + ) + except Exception as exc: + raise RuntimeError(f"adapter.send failed: {exc}") from exc + + if not getattr(result, "success", True): + err = getattr(result, "error", "send returned success=False") + raise RuntimeError(f"adapter.send failed: {err}") + async def _session_expiry_watcher(self, interval: int = 300): """Background task that finalizes expired sessions. @@ -6648,46 +6872,6 @@ class GatewayRunner: # Build the context prompt to inject context_prompt = build_session_context_prompt(context, redact_pii=_redact_pii) - - # Check for pending CLI handoff - if _is_new_session and self._session_db: - try: - platform_key = source.platform.value if source.platform else "" - handoff = self._session_db.find_pending_handoff(platform_key) - if handoff: - cli_session_id = handoff["id"] - cli_messages = self._session_db.get_messages(cli_session_id) - if cli_messages: - # Cap to last 200 messages to avoid context blowup - cli_messages = cli_messages[-200:] - transcript = [] - for msg in cli_messages: - role = msg.get("role", "unknown") - content = str(msg.get("content") or "") - if content.strip(): - label = {"user": "User", "assistant": "Assistant", - "system": "System", "tool": "Tool"}.get(role, role.title()) - transcript.append(f"{label}: {content}") - if transcript: - handoff_title = handoff.get("title") or "untitled" - handoff_context = ( - f"[Handoff from CLI session '{handoff_title}'. " - f"Continue the conversation below where it left off.]" - ) - context_prompt = ( - handoff_context - + "\n\n--- Previous conversation ---\n" - + "\n\n".join(transcript) - + "\n--- End of previous conversation ---\n\n" - + context_prompt - ) - self._session_db.clear_handoff_pending(cli_session_id) - logger.info( - "Handoff: CLI session %s handed off to %s chat %s", - cli_session_id, platform_key, source.chat_id, - ) - except Exception: - logger.debug("Handoff check failed", exc_info=True) # If the previous session expired and was auto-reset, prepend a notice # so the agent knows this is a fresh conversation (not an intentional /reset). diff --git a/hermes_state.py b/hermes_state.py index c435ef4cd64..7fdf875c30f 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -215,8 +215,9 @@ CREATE TABLE IF NOT EXISTS sessions ( pricing_version TEXT, title TEXT, api_call_count INTEGER DEFAULT 0, - handoff_pending INTEGER DEFAULT 0, + handoff_state TEXT, handoff_platform TEXT, + handoff_error TEXT, FOREIGN KEY (parent_session_id) REFERENCES sessions(id) ); @@ -2864,45 +2865,102 @@ class SessionDB: return result # ── Handoff (cross-platform session transfer) ────────────────────────── + # + # State machine: + # None — no handoff in flight + # "pending" — CLI requested handoff, gateway hasn't picked it up yet + # "running" — gateway is processing (session switch + synthetic turn) + # "completed"— gateway successfully delivered the synthetic turn + # "failed" — gateway hit an error; reason in handoff_error + # + # The CLI writes "pending" then poll-waits for terminal state. The gateway + # watcher transitions pending→running→{completed,failed}. - def set_handoff_pending(self, session_id: str, platform: str) -> bool: + def request_handoff(self, session_id: str, platform: str) -> bool: """Mark a session as pending handoff to the given platform. - Returns True if the session was found and updated. + Returns True if the row was found and not already in flight; False if + the session is already in a non-terminal handoff state. """ def _do(conn): cur = conn.execute( - "UPDATE sessions SET handoff_pending = 1, handoff_platform = ? " - "WHERE id = ? AND handoff_pending = 0", + "UPDATE sessions " + "SET handoff_state = 'pending', " + " handoff_platform = ?, " + " handoff_error = NULL " + "WHERE id = ? AND (handoff_state IS NULL " + " OR handoff_state IN ('completed', 'failed'))", (platform, session_id), ) return cur.rowcount > 0 return self._execute_write(_do) - def find_pending_handoff(self, platform: str) -> Optional[Dict[str, Any]]: - """Find the most recent session pending handoff for a platform. + def get_handoff_state(self, session_id: str) -> Optional[Dict[str, Any]]: + """Read the current handoff state for a session. - Returns the session dict or None. + Returns ``{"state", "platform", "error"}`` or None if the session has + no handoff record. + """ + try: + cur = self._conn.execute( + "SELECT handoff_state, handoff_platform, handoff_error " + "FROM sessions WHERE id = ?", + (session_id,), + ) + row = cur.fetchone() + if not row: + return None + return { + "state": row["handoff_state"], + "platform": row["handoff_platform"], + "error": row["handoff_error"], + } + except Exception: + return None + + def list_pending_handoffs(self) -> List[Dict[str, Any]]: + """Return all sessions in handoff_state='pending', oldest first. + + Used by the gateway's handoff watcher. """ try: cur = self._conn.execute( "SELECT * FROM sessions " - "WHERE handoff_pending = 1 AND handoff_platform = ? " - "ORDER BY started_at DESC LIMIT 1", - (platform,), + "WHERE handoff_state = 'pending' " + "ORDER BY started_at ASC" ) - row = cur.fetchone() - return dict(row) if row else None + return [dict(r) for r in cur.fetchall()] except Exception: - return None + return [] - def clear_handoff_pending(self, session_id: str) -> None: - """Clear the handoff_pending flag on a session.""" + def claim_handoff(self, session_id: str) -> bool: + """Atomically transition pending → running. Returns True if claimed.""" + def _do(conn): + cur = conn.execute( + "UPDATE sessions SET handoff_state = 'running' " + "WHERE id = ? AND handoff_state = 'pending'", + (session_id,), + ) + return cur.rowcount > 0 + return self._execute_write(_do) + + def complete_handoff(self, session_id: str) -> None: + """Mark a handoff as completed.""" def _do(conn): conn.execute( - "UPDATE sessions SET handoff_pending = 0, handoff_platform = NULL " - "WHERE id = ?", + "UPDATE sessions SET handoff_state = 'completed', " + "handoff_error = NULL WHERE id = ?", (session_id,), ) self._execute_write(_do) + def fail_handoff(self, session_id: str, error: str) -> None: + """Mark a handoff as failed and record the reason.""" + def _do(conn): + conn.execute( + "UPDATE sessions SET handoff_state = 'failed', " + "handoff_error = ? WHERE id = ?", + (error[:500], session_id), + ) + self._execute_write(_do) + diff --git a/tests/hermes_cli/test_session_handoff.py b/tests/hermes_cli/test_session_handoff.py index 6172da1e45a..2fd9e9e1ab9 100644 --- a/tests/hermes_cli/test_session_handoff.py +++ b/tests/hermes_cli/test_session_handoff.py @@ -1,25 +1,33 @@ -"""Tests for session handoff (CLI to gateway platform).""" +"""Tests for session handoff (CLI to gateway platform). + +The handoff state machine lives on the ``sessions`` table: + + None → "pending" → "running" → ("completed" | "failed") + +CLI side calls ``request_handoff`` and poll-waits on ``get_handoff_state``. +Gateway side iterates ``list_pending_handoffs``, calls ``claim_handoff`` to +flip pending → running, and finishes with ``complete_handoff`` or +``fail_handoff``. +""" from __future__ import annotations import time -from unittest.mock import patch import pytest from hermes_state import SessionDB -class TestHandoffDB: - """Test the handoff columns and helper methods on SessionDB.""" +class TestHandoffStateDB: + """Test the handoff schema + helper methods on SessionDB.""" @pytest.fixture def db(self, tmp_path, monkeypatch): home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) - db = SessionDB(db_path=home / "state.db") - yield db + return SessionDB(db_path=home / "state.db") def _make_session(self, db, session_id, source="cli", title=None): """Insert a session row directly for testing.""" @@ -31,83 +39,152 @@ class TestHandoffDB: ) db._execute_write(_do) - def test_handoff_columns_exist(self, db): - """Verify handoff columns are in the sessions table after init.""" - db._conn.execute("SELECT handoff_pending, handoff_platform FROM sessions LIMIT 0") + def test_columns_exist(self, db): + db._conn.execute( + "SELECT handoff_state, handoff_platform, handoff_error " + "FROM sessions LIMIT 0" + ) - def test_set_handoff_pending(self, db): - """Mark a session for handoff.""" - session_id = "test-session-001" - self._make_session(db, session_id) - ok = db.set_handoff_pending(session_id, "telegram") - assert ok is True - - session = db.get_session(session_id) - assert session["handoff_pending"] == 1 - assert session["handoff_platform"] == "telegram" - - def test_set_handoff_pending_no_double_mark(self, db): - """Re-marking an already-pending session returns False.""" - session_id = "test-session-002" - self._make_session(db, session_id) - ok1 = db.set_handoff_pending(session_id, "telegram") - assert ok1 is True - ok2 = db.set_handoff_pending(session_id, "discord") - assert ok2 is False # already pending - - def test_find_pending_handoff(self, db): - """Find a session pending handoff for a given platform.""" - sid = "test-session-003" + def test_request_handoff_marks_pending(self, db): + sid = "sess-1" self._make_session(db, sid) - db.set_handoff_pending(sid, "telegram") - handoff = db.find_pending_handoff("telegram") - assert handoff is not None - assert handoff["id"] == sid + assert db.request_handoff(sid, "telegram") is True - # Should not find for other platforms - assert db.find_pending_handoff("discord") is None + state = db.get_handoff_state(sid) + assert state == { + "state": "pending", + "platform": "telegram", + "error": None, + } - def test_clear_handoff_pending(self, db): - """Clear the handoff flag.""" - sid = "test-session-004" + def test_request_handoff_rejects_in_flight(self, db): + sid = "sess-2" self._make_session(db, sid) - db.set_handoff_pending(sid, "telegram") - db.clear_handoff_pending(sid) - session = db.get_session(sid) - assert session["handoff_pending"] == 0 + assert db.request_handoff(sid, "telegram") is True + # Still pending → reject re-request + assert db.request_handoff(sid, "discord") is False - def test_full_handoff_flow(self, db): - """End-to-end: mark → find → load messages → clear.""" - sid = "test-session-005" + # And after gateway claims it (running) → still rejected + assert db.claim_handoff(sid) is True + assert db.request_handoff(sid, "discord") is False + + def test_request_handoff_after_terminal_state_resets_error(self, db): + sid = "sess-3" + self._make_session(db, sid) + db.request_handoff(sid, "telegram") + db.claim_handoff(sid) + db.fail_handoff(sid, "earlier failure") + + # User retries — should be allowed and clear the prior error. + assert db.request_handoff(sid, "discord") is True + state = db.get_handoff_state(sid) + assert state["state"] == "pending" + assert state["platform"] == "discord" + assert state["error"] is None + + def test_list_pending_handoffs_excludes_running_and_terminal(self, db): + a, b, c, d = "sess-a", "sess-b", "sess-c", "sess-d" + for sid in (a, b, c, d): + self._make_session(db, sid) + + db.request_handoff(a, "telegram") + db.request_handoff(b, "discord") + db.request_handoff(c, "telegram") + db.claim_handoff(c) # c is now running, not pending + db.request_handoff(d, "slack") + db.claim_handoff(d) + db.complete_handoff(d) # d is terminal + + pending = db.list_pending_handoffs() + ids = [r["id"] for r in pending] + assert set(ids) == {a, b} + + def test_claim_handoff_is_atomic(self, db): + sid = "sess-claim" + self._make_session(db, sid) + db.request_handoff(sid, "telegram") + + # First claim wins + assert db.claim_handoff(sid) is True + # Second claim is a no-op (state is now "running", not "pending") + assert db.claim_handoff(sid) is False + assert db.get_handoff_state(sid)["state"] == "running" + + def test_complete_handoff_clears_error(self, db): + sid = "sess-complete" + self._make_session(db, sid) + db.request_handoff(sid, "telegram") + db.claim_handoff(sid) + db.fail_handoff(sid, "transient") + # User retries; mock the watcher path + db.request_handoff(sid, "telegram") + db.claim_handoff(sid) + db.complete_handoff(sid) + + state = db.get_handoff_state(sid) + assert state["state"] == "completed" + assert state["error"] is None + + def test_fail_handoff_records_reason(self, db): + sid = "sess-fail" + self._make_session(db, sid) + db.request_handoff(sid, "telegram") + db.claim_handoff(sid) + db.fail_handoff(sid, "no home channel for telegram") + + state = db.get_handoff_state(sid) + assert state["state"] == "failed" + assert state["error"] == "no home channel for telegram" + + def test_fail_handoff_truncates_long_reasons(self, db): + sid = "sess-fail-long" + self._make_session(db, sid) + db.request_handoff(sid, "telegram") + db.claim_handoff(sid) + + # 1000-character error string + big_err = "x" * 1000 + db.fail_handoff(sid, big_err) + + state = db.get_handoff_state(sid) + assert len(state["error"]) <= 500 + + def test_get_handoff_state_for_unknown_session(self, db): + assert db.get_handoff_state("does-not-exist") is None + + def test_full_pending_to_completed_flow(self, db): + """End-to-end sequence the CLI + gateway watcher follow.""" + sid = "sess-flow" self._make_session(db, sid, title="my session") db.append_message(sid, "user", "Hello") db.append_message(sid, "assistant", "Hi there!") - # CLI side: mark for handoff - ok = db.set_handoff_pending(sid, "telegram") - assert ok is True + # CLI: request handoff + assert db.request_handoff(sid, "telegram") is True + assert db.get_handoff_state(sid)["state"] == "pending" - # Gateway side: find pending handoff - handoff = db.find_pending_handoff("telegram") - assert handoff is not None - assert handoff["id"] == sid - assert handoff["title"] == "my session" + # Gateway watcher: discover + claim + pending = db.list_pending_handoffs() + assert len(pending) == 1 + assert pending[0]["id"] == sid + assert db.claim_handoff(sid) is True + assert db.get_handoff_state(sid)["state"] == "running" - # Load messages for context + # Gateway uses get_messages to load the transcript (real flow uses + # session_store.switch_session which reads the same table). messages = db.get_messages(sid) - assert len(messages) == 2 - assert messages[0]["role"] == "user" - assert messages[1]["role"] == "assistant" + assert [m["role"] for m in messages] == ["user", "assistant"] - # Clear after injecting - db.clear_handoff_pending(sid) - assert db.find_pending_handoff("telegram") is None + # Gateway: mark completed + db.complete_handoff(sid) + assert db.get_handoff_state(sid)["state"] == "completed" + assert db.list_pending_handoffs() == [] -class TestHandoffCommand: - """Test the CLI /handoff command handler.""" +class TestHandoffCommandRegistration: + """Slash-command surface checks.""" def test_command_registered(self): from hermes_cli.commands import resolve_command @@ -116,8 +193,10 @@ class TestHandoffCommand: assert cmd.name == "handoff" assert cmd.category == "Session" - def test_invalid_platform(self): - """Test that unknown platforms are rejected.""" - supported = {"telegram", "discord", "slack", "whatsapp", "signal", "matrix"} - assert "telegram" in supported - assert "foo" not in supported + def test_command_is_cli_only(self): + """`/handoff` is initiated from the CLI; gateway shouldn't expose it.""" + from hermes_cli.commands import resolve_command, GATEWAY_KNOWN_COMMANDS + cmd = resolve_command("handoff") + assert cmd is not None + assert cmd.cli_only is True + assert "handoff" not in GATEWAY_KNOWN_COMMANDS