fix(gateway): defer goal status notices until after response delivery

Route goal status notices through the platform adapter send API and register post-delivery callbacks so completed-goal notices appear after the final assistant response. Also cancel queued synthetic goal continuations on /goal pause and /goal clear while preserving normal queued user messages.
This commit is contained in:
JC 2026-05-03 08:57:28 +08:00 committed by Teknium
parent 7d66d30d77
commit 03ddff8897
3 changed files with 300 additions and 36 deletions

View file

@ -1903,6 +1903,59 @@ class GatewayRunner:
depth += 1
return depth
@staticmethod
def _is_goal_continuation_event(event_or_text: Any) -> bool:
"""Return True for synthetic /goal continuation turns.
Goal continuations are normal queued user-role events, so pause/clear
must distinguish them from real user /queue messages before removing or
suppressing them.
"""
text = getattr(event_or_text, "text", event_or_text) or ""
return str(text).startswith("[Continuing toward your standing goal]\nGoal:")
def _clear_goal_pending_continuations(self, session_key: str, adapter: Any) -> int:
"""Remove queued synthetic /goal continuations for one session.
User-issued /goal pause/clear can race with a continuation already
queued by the judge. Remove only synthetic goal continuations while
preserving normal /queue and user follow-up events.
"""
removed = 0
pending_slot = getattr(adapter, "_pending_messages", None) if adapter is not None else None
if isinstance(pending_slot, dict):
pending_event = pending_slot.get(session_key)
if self._is_goal_continuation_event(pending_event):
pending_slot.pop(session_key, None)
removed += 1
queued_events = getattr(self, "_queued_events", None)
if isinstance(queued_events, dict):
overflow = queued_events.get(session_key) or []
if overflow:
kept = []
for queued_event in overflow:
if self._is_goal_continuation_event(queued_event):
removed += 1
else:
kept.append(queued_event)
if kept:
queued_events[session_key] = kept
else:
queued_events.pop(session_key, None)
return removed
def _goal_still_active_for_session(self, session_id: str) -> bool:
"""Best-effort fresh DB check before running a queued continuation."""
if not session_id:
return False
try:
from hermes_cli.goals import GoalManager
return GoalManager(session_id=session_id).is_active()
except Exception as exc:
logger.debug("goal continuation: active-state recheck failed: %s", exc)
return False
def _update_runtime_status(self, gateway_state: Optional[str] = None, exit_reason: Optional[str] = None) -> None:
try:
from gateway.status import write_runtime_status
@ -5836,7 +5889,7 @@ class GatewayRunner:
except Exception:
session_entry = None
if session_entry is not None:
self._post_turn_goal_continuation(
await self._post_turn_goal_continuation(
session_entry=session_entry,
source=source,
final_response=_final_text,
@ -8404,6 +8457,13 @@ class GatewayRunner:
state = mgr.pause(reason="user-paused")
if state is None:
return "No goal set."
try:
adapter = self.adapters.get(event.source.platform) if event.source else None
_quick_key = self._session_key_for_source(event.source) if event.source else None
if adapter and _quick_key:
self._clear_goal_pending_continuations(_quick_key, adapter)
except Exception as exc:
logger.debug("goal pause: pending continuation cleanup failed: %s", exc)
return f"⏸ Goal paused: {state.goal}"
if lower == "resume":
@ -8418,6 +8478,13 @@ class GatewayRunner:
if lower in ("clear", "stop", "done"):
had = mgr.has_goal()
mgr.clear()
try:
adapter = self.adapters.get(event.source.platform) if event.source else None
_quick_key = self._session_key_for_source(event.source) if event.source else None
if adapter and _quick_key:
self._clear_goal_pending_continuations(_quick_key, adapter)
except Exception as exc:
logger.debug("goal clear: pending continuation cleanup failed: %s", exc)
return t("gateway.goal_cleared") if had else t("gateway.no_active_goal")
# Otherwise — treat the remaining text as the new goal.
@ -8449,7 +8516,69 @@ class GatewayRunner:
"Controls: /goal status · /goal pause · /goal resume · /goal clear"
)
def _post_turn_goal_continuation(
async def _send_goal_status_notice(self, source: Any, message: str) -> None:
"""Send a /goal judge status line back to the originating chat/thread."""
adapter = self.adapters.get(source.platform)
if not adapter:
logger.debug("goal continuation: no adapter for %s", getattr(source, "platform", None))
return
try:
metadata = self._thread_metadata_for_source(source)
except Exception:
metadata = {"thread_id": source.thread_id} if getattr(source, "thread_id", None) else None
result = await adapter.send(source.chat_id, message, metadata=metadata)
if result is not None and not getattr(result, "success", True):
logger.warning(
"goal continuation: status send failed: %s",
getattr(result, "error", "unknown error"),
)
async def _defer_goal_status_notice_after_delivery(self, source: Any, message: str) -> None:
"""Send a /goal status line after the main response is delivered.
The gateway message handler returns the agent response to the platform
adapter, which sends it after this method's caller has returned. For a
natural Discord/Telegram reading order, goal status belongs after that
send. Platform adapters provide a one-shot post-delivery callback for
exactly this boundary; when unavailable, fall back to direct awaited
delivery rather than silently dropping the notice.
"""
adapter = self.adapters.get(source.platform)
if not adapter:
logger.debug("goal continuation: no adapter for %s", getattr(source, "platform", None))
return
async def _deliver() -> None:
try:
await self._send_goal_status_notice(source, message)
except Exception as exc:
logger.warning("goal continuation: status send failed: %s", exc, exc_info=True)
try:
session_key = self._session_key_for_source(source)
except Exception:
session_key = None
if session_key and hasattr(adapter, "register_post_delivery_callback"):
try:
generation = None
active = getattr(adapter, "_active_sessions", {}).get(session_key)
if active is not None:
generation = getattr(active, "_hermes_run_generation", None)
adapter.register_post_delivery_callback(
session_key,
_deliver,
generation=generation,
)
return
except Exception as exc:
logger.debug("goal continuation: post-delivery callback registration failed: %s", exc)
await _deliver()
async def _post_turn_goal_continuation(
self,
*,
session_entry: Any,
@ -8485,38 +8614,14 @@ class GatewayRunner:
decision = mgr.evaluate_after_turn(final_response or "", user_initiated=True)
msg = decision.get("message") or ""
# Send the status line back to the user so they see the judge's
# verdict. Fire-and-forget via the adapter's ``send()`` method —
# adapters expose ``send(chat_id, content, reply_to, metadata)``,
# not a ``send_message(source, msg)`` wrapper, so an earlier
# ``hasattr(adapter, "send_message")`` gate here was dead code and
# users never saw ``✓ Goal achieved`` / ``⏸ budget exhausted``
# verdicts.
# Defer the status line until after the adapter has delivered the
# agent's visible final response. The judge runs after the response is
# produced but before BasePlatformAdapter sends it, so sending here
# would show "✓ Goal achieved" before the answer itself. Registering
# an awaited post-delivery callback preserves delivery reliability
# without reversing the user-visible ordering.
if msg and source is not None:
try:
adapter = self.adapters.get(source.platform)
if adapter is not None and hasattr(adapter, "send"):
import asyncio as _asyncio
thread_meta = (
{"thread_id": source.thread_id} if source.thread_id else None
)
coro = adapter.send(
chat_id=source.chat_id,
content=msg,
metadata=thread_meta,
)
if _asyncio.iscoroutine(coro):
try:
loop = _asyncio.get_running_loop()
loop.create_task(coro)
except RuntimeError:
# No running loop in this thread — best effort.
try:
_asyncio.run(coro)
except Exception:
pass
except Exception as exc:
logger.debug("goal continuation: status send failed: %s", exc)
await self._defer_goal_status_notice_after_delivery(source, msg)
if not decision.get("should_continue"):
return
@ -14768,14 +14873,18 @@ class GatewayRunner:
)
if callable(_bg_cb):
try:
_bg_cb()
_bg_result = _bg_cb()
if inspect.isawaitable(_bg_result):
await _bg_result
except Exception:
pass
elif adapter and hasattr(adapter, "_post_delivery_callbacks"):
_bg_cb = adapter._post_delivery_callbacks.pop(session_key, None)
if callable(_bg_cb):
try:
_bg_cb()
_bg_result = _bg_cb()
if inspect.isawaitable(_bg_result):
await _bg_result
except Exception:
pass
# else: interrupted — discard the interrupted response ("Operation
@ -14789,6 +14898,12 @@ class GatewayRunner:
next_channel_prompt = None
if pending_event is not None:
next_source = getattr(pending_event, "source", None) or source
if self._is_goal_continuation_event(pending_event) and not self._goal_still_active_for_session(session_id):
logger.info(
"Discarding stale goal continuation for session %s — goal is no longer active",
session_key or "?",
)
return result
next_message = await self._prepare_inbound_message_text(
event=pending_event,
source=next_source,