diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 0c238d4d09..3e8c1433e6 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -3146,7 +3146,9 @@ class BasePlatformAdapter(ABC): _post_cb = getattr(self, "_post_delivery_callbacks", {}).pop(session_key, None) if callable(_post_cb): try: - _post_cb() + _post_result = _post_cb() + if inspect.isawaitable(_post_result): + await _post_result except Exception: pass # Stop typing indicator diff --git a/gateway/run.py b/gateway/run.py index 24ed660895..321f9b5ad1 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1903,6 +1903,59 @@ class GatewayRunner: depth += 1 return depth + @staticmethod + def _is_goal_continuation_event(event_or_text: Any) -> bool: + """Return True for synthetic /goal continuation turns. + + Goal continuations are normal queued user-role events, so pause/clear + must distinguish them from real user /queue messages before removing or + suppressing them. + """ + text = getattr(event_or_text, "text", event_or_text) or "" + return str(text).startswith("[Continuing toward your standing goal]\nGoal:") + + def _clear_goal_pending_continuations(self, session_key: str, adapter: Any) -> int: + """Remove queued synthetic /goal continuations for one session. + + User-issued /goal pause/clear can race with a continuation already + queued by the judge. Remove only synthetic goal continuations while + preserving normal /queue and user follow-up events. + """ + removed = 0 + pending_slot = getattr(adapter, "_pending_messages", None) if adapter is not None else None + if isinstance(pending_slot, dict): + pending_event = pending_slot.get(session_key) + if self._is_goal_continuation_event(pending_event): + pending_slot.pop(session_key, None) + removed += 1 + + queued_events = getattr(self, "_queued_events", None) + if isinstance(queued_events, dict): + overflow = queued_events.get(session_key) or [] + if overflow: + kept = [] + for queued_event in overflow: + if self._is_goal_continuation_event(queued_event): + removed += 1 + else: + kept.append(queued_event) + if kept: + queued_events[session_key] = kept + else: + queued_events.pop(session_key, None) + return removed + + def _goal_still_active_for_session(self, session_id: str) -> bool: + """Best-effort fresh DB check before running a queued continuation.""" + if not session_id: + return False + try: + from hermes_cli.goals import GoalManager + return GoalManager(session_id=session_id).is_active() + except Exception as exc: + logger.debug("goal continuation: active-state recheck failed: %s", exc) + return False + def _update_runtime_status(self, gateway_state: Optional[str] = None, exit_reason: Optional[str] = None) -> None: try: from gateway.status import write_runtime_status @@ -5836,7 +5889,7 @@ class GatewayRunner: except Exception: session_entry = None if session_entry is not None: - self._post_turn_goal_continuation( + await self._post_turn_goal_continuation( session_entry=session_entry, source=source, final_response=_final_text, @@ -8404,6 +8457,13 @@ class GatewayRunner: state = mgr.pause(reason="user-paused") if state is None: return "No goal set." + try: + adapter = self.adapters.get(event.source.platform) if event.source else None + _quick_key = self._session_key_for_source(event.source) if event.source else None + if adapter and _quick_key: + self._clear_goal_pending_continuations(_quick_key, adapter) + except Exception as exc: + logger.debug("goal pause: pending continuation cleanup failed: %s", exc) return f"⏸ Goal paused: {state.goal}" if lower == "resume": @@ -8418,6 +8478,13 @@ class GatewayRunner: if lower in ("clear", "stop", "done"): had = mgr.has_goal() mgr.clear() + try: + adapter = self.adapters.get(event.source.platform) if event.source else None + _quick_key = self._session_key_for_source(event.source) if event.source else None + if adapter and _quick_key: + self._clear_goal_pending_continuations(_quick_key, adapter) + except Exception as exc: + logger.debug("goal clear: pending continuation cleanup failed: %s", exc) return t("gateway.goal_cleared") if had else t("gateway.no_active_goal") # Otherwise — treat the remaining text as the new goal. @@ -8449,7 +8516,69 @@ class GatewayRunner: "Controls: /goal status · /goal pause · /goal resume · /goal clear" ) - def _post_turn_goal_continuation( + async def _send_goal_status_notice(self, source: Any, message: str) -> None: + """Send a /goal judge status line back to the originating chat/thread.""" + adapter = self.adapters.get(source.platform) + if not adapter: + logger.debug("goal continuation: no adapter for %s", getattr(source, "platform", None)) + return + + try: + metadata = self._thread_metadata_for_source(source) + except Exception: + metadata = {"thread_id": source.thread_id} if getattr(source, "thread_id", None) else None + + result = await adapter.send(source.chat_id, message, metadata=metadata) + if result is not None and not getattr(result, "success", True): + logger.warning( + "goal continuation: status send failed: %s", + getattr(result, "error", "unknown error"), + ) + + async def _defer_goal_status_notice_after_delivery(self, source: Any, message: str) -> None: + """Send a /goal status line after the main response is delivered. + + The gateway message handler returns the agent response to the platform + adapter, which sends it after this method's caller has returned. For a + natural Discord/Telegram reading order, goal status belongs after that + send. Platform adapters provide a one-shot post-delivery callback for + exactly this boundary; when unavailable, fall back to direct awaited + delivery rather than silently dropping the notice. + """ + adapter = self.adapters.get(source.platform) + if not adapter: + logger.debug("goal continuation: no adapter for %s", getattr(source, "platform", None)) + return + + async def _deliver() -> None: + try: + await self._send_goal_status_notice(source, message) + except Exception as exc: + logger.warning("goal continuation: status send failed: %s", exc, exc_info=True) + + try: + session_key = self._session_key_for_source(source) + except Exception: + session_key = None + + if session_key and hasattr(adapter, "register_post_delivery_callback"): + try: + generation = None + active = getattr(adapter, "_active_sessions", {}).get(session_key) + if active is not None: + generation = getattr(active, "_hermes_run_generation", None) + adapter.register_post_delivery_callback( + session_key, + _deliver, + generation=generation, + ) + return + except Exception as exc: + logger.debug("goal continuation: post-delivery callback registration failed: %s", exc) + + await _deliver() + + async def _post_turn_goal_continuation( self, *, session_entry: Any, @@ -8485,38 +8614,14 @@ class GatewayRunner: decision = mgr.evaluate_after_turn(final_response or "", user_initiated=True) msg = decision.get("message") or "" - # Send the status line back to the user so they see the judge's - # verdict. Fire-and-forget via the adapter's ``send()`` method — - # adapters expose ``send(chat_id, content, reply_to, metadata)``, - # not a ``send_message(source, msg)`` wrapper, so an earlier - # ``hasattr(adapter, "send_message")`` gate here was dead code and - # users never saw ``✓ Goal achieved`` / ``⏸ budget exhausted`` - # verdicts. + # Defer the status line until after the adapter has delivered the + # agent's visible final response. The judge runs after the response is + # produced but before BasePlatformAdapter sends it, so sending here + # would show "✓ Goal achieved" before the answer itself. Registering + # an awaited post-delivery callback preserves delivery reliability + # without reversing the user-visible ordering. if msg and source is not None: - try: - adapter = self.adapters.get(source.platform) - if adapter is not None and hasattr(adapter, "send"): - import asyncio as _asyncio - thread_meta = ( - {"thread_id": source.thread_id} if source.thread_id else None - ) - coro = adapter.send( - chat_id=source.chat_id, - content=msg, - metadata=thread_meta, - ) - if _asyncio.iscoroutine(coro): - try: - loop = _asyncio.get_running_loop() - loop.create_task(coro) - except RuntimeError: - # No running loop in this thread — best effort. - try: - _asyncio.run(coro) - except Exception: - pass - except Exception as exc: - logger.debug("goal continuation: status send failed: %s", exc) + await self._defer_goal_status_notice_after_delivery(source, msg) if not decision.get("should_continue"): return @@ -14768,14 +14873,18 @@ class GatewayRunner: ) if callable(_bg_cb): try: - _bg_cb() + _bg_result = _bg_cb() + if inspect.isawaitable(_bg_result): + await _bg_result except Exception: pass elif adapter and hasattr(adapter, "_post_delivery_callbacks"): _bg_cb = adapter._post_delivery_callbacks.pop(session_key, None) if callable(_bg_cb): try: - _bg_cb() + _bg_result = _bg_cb() + if inspect.isawaitable(_bg_result): + await _bg_result except Exception: pass # else: interrupted — discard the interrupted response ("Operation @@ -14789,6 +14898,12 @@ class GatewayRunner: next_channel_prompt = None if pending_event is not None: next_source = getattr(pending_event, "source", None) or source + if self._is_goal_continuation_event(pending_event) and not self._goal_still_active_for_session(session_id): + logger.info( + "Discarding stale goal continuation for session %s — goal is no longer active", + session_key or "?", + ) + return result next_message = await self._prepare_inbound_message_text( event=pending_event, source=next_source, diff --git a/tests/gateway/test_goal_status_notice.py b/tests/gateway/test_goal_status_notice.py new file mode 100644 index 0000000000..a45958cf95 --- /dev/null +++ b/tests/gateway/test_goal_status_notice.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from gateway.config import Platform +from gateway.platforms.base import MessageEvent, MessageType +from gateway.run import GatewayRunner +from gateway.session import SessionSource +from hermes_cli.goals import CONTINUATION_PROMPT_TEMPLATE + + +class FakeAdapter: + def __init__(self): + self.calls = [] + self.callbacks = {} + self._active_sessions = {} + + async def send(self, chat_id, content, reply_to=None, metadata=None): + self.calls.append( + { + "chat_id": chat_id, + "content": content, + "reply_to": reply_to, + "metadata": metadata, + } + ) + return SimpleNamespace(success=True) + + def register_post_delivery_callback(self, session_key, callback, *, generation=None): + self.callbacks[session_key] = (generation, callback) + + +def _goal_continuation_event(source, goal="finish the task"): + return MessageEvent( + text=CONTINUATION_PROMPT_TEMPLATE.format(goal=goal), + message_type=MessageType.TEXT, + source=source, + ) + + +@pytest.mark.asyncio +async def test_goal_status_notice_uses_adapter_send_with_thread_metadata(): + """Regression: /goal judge status must use BasePlatformAdapter.send(). + + The old implementation checked for a non-existent send_message() method, + so the goal could be marked done in state_meta without the visible + "✓ Goal achieved" status line being delivered to Discord/Telegram. + """ + runner = GatewayRunner.__new__(GatewayRunner) + adapter = FakeAdapter() + runner.adapters = {Platform.DISCORD: adapter} + + source = SessionSource( + platform=Platform.DISCORD, + chat_id="parent-channel", + thread_id="thread-123", + ) + + await runner._send_goal_status_notice(source, "✓ Goal achieved: done") + + assert adapter.calls == [ + { + "chat_id": "parent-channel", + "content": "✓ Goal achieved: done", + "reply_to": None, + "metadata": {"thread_id": "thread-123"}, + } + ] + + +@pytest.mark.asyncio +async def test_goal_status_notice_defers_until_post_delivery_callback(): + """Regression: goal status must appear after the agent's visible reply. + + _post_turn_goal_continuation runs before BasePlatformAdapter sends the + returned final response. It should therefore register a post-delivery + callback, not send the judge status immediately. + """ + runner = GatewayRunner.__new__(GatewayRunner) + adapter = FakeAdapter() + runner.adapters = {Platform.DISCORD: adapter} + runner.config = SimpleNamespace(group_sessions_per_user=True, thread_sessions_per_user=False) + + source = SessionSource( + platform=Platform.DISCORD, + chat_id="parent-channel", + thread_id="thread-123", + user_id="user-1", + ) + + await runner._defer_goal_status_notice_after_delivery(source, "✓ Goal achieved: done") + + assert adapter.calls == [] + assert len(adapter.callbacks) == 1 + + _, callback = next(iter(adapter.callbacks.values())) + result = callback() + if hasattr(result, "__await__"): + await result + + assert adapter.calls == [ + { + "chat_id": "parent-channel", + "content": "✓ Goal achieved: done", + "reply_to": None, + "metadata": {"thread_id": "thread-123"}, + } + ] + + +def test_clear_goal_pending_continuations_removes_slot_and_overflow_only(): + """Regression: /goal pause/clear must cancel queued self-continuations. + + A user-issued /goal pause can arrive after the judge queued the next + continuation but before that queued turn runs. The queued synthetic goal + continuation should be removed without dropping normal user /queue items. + """ + runner = GatewayRunner.__new__(GatewayRunner) + adapter = FakeAdapter() + adapter._pending_messages = {} + runner._queued_events = {} + + source = SessionSource( + platform=Platform.DISCORD, + chat_id="parent-channel", + thread_id="thread-123", + ) + session_key = "discord:parent-channel:thread-123" + normal_event = MessageEvent( + text="normal queued user message", + message_type=MessageType.TEXT, + source=source, + ) + + adapter._pending_messages[session_key] = _goal_continuation_event(source) + runner._queued_events[session_key] = [ + normal_event, + _goal_continuation_event(source, goal="second continuation"), + ] + + removed = runner._clear_goal_pending_continuations(session_key, adapter) + + assert removed == 2 + assert adapter._pending_messages.get(session_key) is None + assert runner._queued_events[session_key] == [normal_event]