diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index c718cce89..ddee844f4 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -839,6 +839,11 @@ class BasePlatformAdapter(ABC): # Gateway shutdown cancels these so an old gateway instance doesn't keep # working on a task after --replace or manual restarts. self._background_tasks: set[asyncio.Task] = set() + # One-shot callbacks to fire after the main response is delivered. + # Keyed by session_key. GatewayRunner uses this to defer + # background-review notifications ("💾 Skill created") until the + # primary reply has been sent. + self._post_delivery_callbacks: Dict[str, Callable] = {} self._expected_cancelled_tasks: set[asyncio.Task] = set() self._busy_session_handler: Optional[Callable[[MessageEvent, str], Awaitable[bool]]] = None # Chats where auto-TTS on voice input is disabled (set by /voice off) @@ -1894,6 +1899,14 @@ class BasePlatformAdapter(ABC): except Exception: pass # Last resort — don't let error reporting crash the handler finally: + # Fire any one-shot post-delivery callback registered for this + # session (e.g. deferred background-review notifications). + _post_cb = getattr(self, "_post_delivery_callbacks", {}).pop(session_key, None) + if callable(_post_cb): + try: + _post_cb() + except Exception: + pass # Stop typing indicator typing_task.cancel() try: diff --git a/gateway/run.py b/gateway/run.py index a95ca159b..16027bfd3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -8616,8 +8616,11 @@ class GatewayRunner: agent.service_tier = self._service_tier agent.request_overrides = turn_route.get("request_overrides") - # Background review delivery — send "💾 Memory updated" etc. to user - def _bg_review_send(message: str) -> None: + _bg_review_release = threading.Event() + _bg_review_pending: list[str] = [] + _bg_review_pending_lock = threading.Lock() + + def _deliver_bg_review_message(message: str) -> None: if not _status_adapter: return try: @@ -8632,7 +8635,32 @@ class GatewayRunner: except Exception as _e: logger.debug("background_review_callback error: %s", _e) + def _release_bg_review_messages() -> None: + _bg_review_release.set() + with _bg_review_pending_lock: + pending = list(_bg_review_pending) + _bg_review_pending.clear() + for queued in pending: + _deliver_bg_review_message(queued) + + # Background review delivery — send "💾 Memory updated" etc. to user + def _bg_review_send(message: str) -> None: + if not _status_adapter: + return + if not _bg_review_release.is_set(): + with _bg_review_pending_lock: + if not _bg_review_release.is_set(): + _bg_review_pending.append(message) + return + _deliver_bg_review_message(message) + agent.background_review_callback = _bg_review_send + # Register the release hook on the adapter so base.py's finally + # block can fire it after delivering the main response. + if _status_adapter and session_key: + _pdc = getattr(_status_adapter, "_post_delivery_callbacks", None) + if _pdc is not None: + _pdc[session_key] = _release_bg_review_messages # Store agent reference for interrupt support agent_holder[0] = agent @@ -9356,6 +9384,17 @@ class GatewayRunner: ) except Exception as e: logger.warning("Failed to send first response before queued message: %s", e) + # Release deferred bg-review notifications now that the + # first response has been delivered. Pop from the + # adapter's callback dict (prevents double-fire in + # base.py's finally block) and call it. + if adapter and hasattr(adapter, "_post_delivery_callbacks"): + _bg_cb = adapter._post_delivery_callbacks.pop(session_key, None) + if callable(_bg_cb): + try: + _bg_cb() + except Exception: + pass # else: interrupted — discard the interrupted response ("Operation # interrupted." is just noise; the user already knows they sent a # new message). diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index 1b7829616..4878f2fae 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -1,5 +1,6 @@ """Tests for topic-aware gateway progress updates.""" +import asyncio import importlib import sys import time @@ -415,6 +416,21 @@ class QueuedCommentaryAgent: } +class BackgroundReviewAgent: + def __init__(self, **kwargs): + self.background_review_callback = kwargs.get("background_review_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.background_review_callback: + self.background_review_callback("💾 Skill 'prospect-scanner' created.") + return { + "final_response": "done", + "messages": [], + "api_calls": 1, + } + + class VerboseAgent: """Agent that emits a tool call with args whose JSON exceeds 200 chars.""" LONG_CODE = "x" * 300 @@ -668,6 +684,66 @@ async def test_run_agent_queued_message_does_not_treat_commentary_as_final(monke assert "final response 1" in sent_texts +@pytest.mark.asyncio +async def test_run_agent_defers_background_review_notification_until_release(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + BackgroundReviewAgent, + session_id="sess-bg-review-order", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + assert result["final_response"] == "done" + assert adapter.sent == [] + + +@pytest.mark.asyncio +async def test_base_processing_releases_post_delivery_callback_after_main_send(): + """Post-delivery callbacks on the adapter fire after the main response.""" + adapter = ProgressCaptureAdapter() + + async def _handler(event): + return "done" + + adapter.set_message_handler(_handler) + + released = [] + + def _post_delivery_cb(): + released.append(True) + adapter.sent.append( + { + "chat_id": "bg-review", + "content": "💾 Skill 'prospect-scanner' created.", + "reply_to": None, + "metadata": None, + } + ) + + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + ) + event = MessageEvent( + text="hello", + message_type=MessageType.TEXT, + source=source, + message_id="msg-1", + ) + session_key = "agent:main:telegram:group:-1001:17585" + adapter._active_sessions[session_key] = asyncio.Event() + adapter._post_delivery_callbacks[session_key] = _post_delivery_cb + + await adapter._process_message_background(event, session_key) + + sent_texts = [call["content"] for call in adapter.sent] + assert sent_texts == ["done", "💾 Skill 'prospect-scanner' created."] + assert released == [True] + + @pytest.mark.asyncio async def test_verbose_mode_does_not_truncate_args_by_default(monkeypatch, tmp_path): """Verbose mode with default tool_preview_length (0) should NOT truncate args.