diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index bba93214f..8b9de642f 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -900,10 +900,16 @@ class BasePlatformAdapter(ABC): self._fatal_error_retryable = True self._fatal_error_handler: Optional[Callable[["BasePlatformAdapter"], Awaitable[None] | None]] = None - # Track active message handlers per session for interrupt support - # Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt) + # Track active message handlers per session for interrupt support. + # _active_sessions stores the per-session interrupt Event; _session_tasks + # maps session → the specific Task currently processing it so that + # session-terminating commands (/stop, /new, /reset) can cancel the + # right task and release the adapter-level guard deterministically. + # Without the owner-task map, an old task's finally block could delete + # a newer task's guard, leaving stale busy state. self._active_sessions: Dict[str, asyncio.Event] = {} self._pending_messages: Dict[str, MessageEvent] = {} + self._session_tasks: Dict[str, asyncio.Task] = {} # Background message-processing tasks spawned by handle_message(). # Gateway shutdown cancels these so an old gateway instance doesn't keep # working on a task after --replace or manual restarts. @@ -1680,6 +1686,222 @@ class BasePlatformAdapter(ABC): return f"{existing_text}\n\n{new_text}".strip() return existing_text + # ------------------------------------------------------------------ + # Session task + guard ownership helpers + # ------------------------------------------------------------------ + # These were introduced together with the _session_tasks owner map to + # make session lifecycle reconciliation deterministic across (a) the + # normal completion path, (b) /stop/ /new/ /reset bypass commands, + # and (c) stale-lock self-heal on the next inbound message. + + def _release_session_guard( + self, + session_key: str, + *, + guard: Optional[asyncio.Event] = None, + ) -> None: + """Release the adapter-level guard for a session. + + When ``guard`` is provided, only release the entry if it still points + at that exact Event. This lets reset-like commands swap in a temporary + guard while the old processing task unwinds, without having the old + task's cleanup accidentally clear the replacement guard. + """ + current_guard = self._active_sessions.get(session_key) + if current_guard is None: + return + if guard is not None and current_guard is not guard: + return + del self._active_sessions[session_key] + + def _session_task_is_stale(self, session_key: str) -> bool: + """Return True if the owner task for ``session_key`` is done/cancelled. + + A lock is "stale" when the adapter still has ``_active_sessions[key]`` + AND a known owner task in ``_session_tasks`` that has already exited. + When there is no owner task at all, that usually means the guard was + installed by some path other than handle_message() (tests sometimes + install guards directly) — don't treat that as stale. The on-entry + self-heal only needs to handle the production split-brain case where + an owner task was recorded, then exited without clearing its guard. + """ + task = self._session_tasks.get(session_key) + if task is None: + return False + done = getattr(task, "done", None) + return bool(done and done()) + + def _heal_stale_session_lock(self, session_key: str) -> bool: + """Clear a stale session lock if the owner task is already gone. + + Returns True if a stale lock was healed. Returns False if there is + no lock, or the owner task is still alive (the normal busy case). + + This is the on-entry safety net sidbin's issue #11016 analysis calls + for: without it, a split-brain — adapter still thinks the session is + active, but nothing is actually processing — traps the chat in + infinite "Interrupting current task..." until the gateway is + restarted. + """ + if session_key not in self._active_sessions: + return False + if not self._session_task_is_stale(session_key): + return False + logger.warning( + "[%s] Healing stale session lock for %s (owner task is done/absent)", + self.name, + session_key, + ) + self._active_sessions.pop(session_key, None) + self._pending_messages.pop(session_key, None) + self._session_tasks.pop(session_key, None) + return True + + def _start_session_processing( + self, + event: MessageEvent, + session_key: str, + *, + interrupt_event: Optional[asyncio.Event] = None, + ) -> bool: + """Spawn a background processing task under the given session guard. + + Returns True on success. If the runtime stubs ``create_task`` with a + non-Task sentinel (some tests do this), the guard is rolled back and + False is returned so the caller isn't left holding a half-installed + session lock. + """ + guard = interrupt_event or asyncio.Event() + self._active_sessions[session_key] = guard + + task = asyncio.create_task(self._process_message_background(event, session_key)) + self._session_tasks[session_key] = task + try: + self._background_tasks.add(task) + except TypeError: + # Tests stub create_task() with lightweight sentinels that are not + # hashable and do not support lifecycle callbacks. + self._session_tasks.pop(session_key, None) + self._release_session_guard(session_key, guard=guard) + return False + if hasattr(task, "add_done_callback"): + task.add_done_callback(self._background_tasks.discard) + task.add_done_callback(self._expected_cancelled_tasks.discard) + return True + + async def cancel_session_processing( + self, + session_key: str, + *, + release_guard: bool = True, + discard_pending: bool = True, + ) -> None: + """Cancel in-flight processing for a single session. + + ``release_guard=False`` keeps the adapter-level session guard in place + so reset-like commands can finish atomically before follow-up messages + are allowed to start a fresh background task. + """ + task = self._session_tasks.pop(session_key, None) + if task is not None and not task.done(): + logger.debug( + "[%s] Cancelling active processing for session %s", + self.name, + session_key, + ) + self._expected_cancelled_tasks.add(task) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception: + logger.debug( + "[%s] Session cancellation raised while unwinding %s", + self.name, + session_key, + exc_info=True, + ) + if discard_pending: + self._pending_messages.pop(session_key, None) + if release_guard: + self._release_session_guard(session_key) + + async def _drain_pending_after_session_command( + self, + session_key: str, + command_guard: asyncio.Event, + ) -> None: + """Resume the latest queued follow-up once a session command completes. + + Called at the tail of /stop, /new, and /reset dispatch. Releases the + command-scoped guard, then — if a follow-up message landed while the + command was running — spawns a fresh processing task for it. + """ + pending_event = self._pending_messages.pop(session_key, None) + self._release_session_guard(session_key, guard=command_guard) + if pending_event is None: + return + self._start_session_processing(pending_event, session_key) + + async def _dispatch_active_session_command( + self, + event: MessageEvent, + session_key: str, + cmd: str, + ) -> None: + """Dispatch a reset-like bypass command while preserving guard ordering. + + /stop, /new, and /reset must: + 1. Keep the session guard installed while the runner processes the + command (so a racing follow-up message stays queued, not + dispatched as a second parallel run). + 2. Cancel the old in-flight adapter task only AFTER the runner has + finished handling the command (so the runner sees consistent + state and its response is sent in order). + 3. Release the command-scoped guard and drain the latest queued + follow-up exactly once, after 1 and 2 complete. + """ + logger.debug( + "[%s] Command '/%s' bypassing active-session guard for %s", + self.name, + cmd, + session_key, + ) + + current_guard = self._active_sessions.get(session_key) + command_guard = asyncio.Event() + self._active_sessions[session_key] = command_guard + thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None + + try: + response = await self._message_handler(event) + # Old adapter task (if any) is cancelled AFTER the runner has + # fully handled the command — keeps ordering deterministic. + await self.cancel_session_processing( + session_key, + release_guard=False, + discard_pending=False, + ) + if response: + await self._send_with_retry( + chat_id=event.source.chat_id, + content=response, + reply_to=event.message_id, + metadata=thread_meta, + ) + except Exception: + # On failure, restore the original guard if one still exists so + # we don't leave the session in a half-reset state. + if self._active_sessions.get(session_key) is command_guard: + if session_key in self._session_tasks and current_guard is not None: + self._active_sessions[session_key] = current_guard + else: + self._release_session_guard(session_key, guard=command_guard) + raise + + await self._drain_pending_after_session_command(session_key, command_guard) + async def handle_message(self, event: MessageEvent) -> None: """ Process an incoming message. @@ -1696,7 +1918,15 @@ class BasePlatformAdapter(ABC): group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) - + + # On-entry self-heal: if the adapter still has an _active_sessions + # entry for this key but the owner task has already exited (done or + # cancelled), the lock is stale. Clear it and fall through to + # normal dispatch so the user isn't trapped behind a dead guard — + # this is the split-brain tail described in issue #11016. + if session_key in self._active_sessions: + self._heal_stale_session_lock(session_key) + # Check if there's already an active handler for this session if session_key in self._active_sessions: # Certain commands must bypass the active-session guard and be @@ -1713,6 +1943,23 @@ class BasePlatformAdapter(ABC): from hermes_cli.commands import should_bypass_active_session if should_bypass_active_session(cmd): + # /stop, /new, /reset must cancel the in-flight adapter task + # and preserve ordering of queued follow-ups. Route those + # through the dedicated handoff path that serializes + # cancellation + runner response + pending drain. + if cmd in ("stop", "new", "reset"): + try: + await self._dispatch_active_session_command(event, session_key, cmd) + except Exception as e: + logger.error( + "[%s] Command '/%s' dispatch failed: %s", + self.name, cmd, e, exc_info=True, + ) + return + + # Other bypass commands (/approve, /deny, /status, + # /background, /restart) just need direct dispatch — they + # don't cancel the running task. logger.debug( "[%s] Command '/%s' bypassing active-session guard for %s", self.name, cmd, session_key, @@ -1758,19 +2005,9 @@ class BasePlatformAdapter(ABC): # starts would also pass the _active_sessions check and spawn a # duplicate task. (grammY sequentialize / aiogram EventIsolation # pattern — set the guard synchronously, not inside the task.) - self._active_sessions[session_key] = asyncio.Event() - - # Spawn background task to process this message - task = asyncio.create_task(self._process_message_background(event, session_key)) - try: - self._background_tasks.add(task) - except TypeError: - # Some tests stub create_task() with lightweight sentinels that are not - # hashable and do not support lifecycle callbacks. - return - if hasattr(task, "add_done_callback"): - task.add_done_callback(self._background_tasks.discard) - task.add_done_callback(self._expected_cancelled_tasks.discard) + # _start_session_processing installs the guard AND the owner-task + # mapping atomically so stale-lock detection works. + self._start_session_processing(event, session_key) @staticmethod def _get_human_delay() -> float: @@ -2130,6 +2367,9 @@ class BasePlatformAdapter(ABC): drain_task = asyncio.create_task( self._process_message_background(late_pending, session_key) ) + # Hand ownership of the session to the drain task so stale-lock + # detection keeps working while it runs. + self._session_tasks[session_key] = drain_task try: self._background_tasks.add(drain_task) drain_task.add_done_callback(self._background_tasks.discard) @@ -2139,7 +2379,13 @@ class BasePlatformAdapter(ABC): # Leave _active_sessions[session_key] populated — the drain # task's own lifecycle will clean it up. else: - # Clean up session tracking + # Clean up session tracking. Only release the guard if this + # task still owns it (protects against the case where a + # reset-like command already swapped in its own guard while + # we were unwinding). + current_task = asyncio.current_task() + if current_task is not None and self._session_tasks.get(session_key) is current_task: + del self._session_tasks[session_key] if session_key in self._active_sessions: del self._active_sessions[session_key] @@ -2171,6 +2417,7 @@ class BasePlatformAdapter(ABC): # will be in self._background_tasks now. Re-check. self._background_tasks.clear() self._expected_cancelled_tasks.clear() + self._session_tasks.clear() self._pending_messages.clear() self._active_sessions.clear()