diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 963268d4ba..871f452902 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -875,6 +875,22 @@ display: # Toggle at runtime with /verbose in the CLI tool_progress: all + # Auto-cleanup of temporary progress bubbles after the final response lands. + # On platforms that support message deletion (currently Telegram), this + # removes the tool-progress bubble, "⏳ Still working..." notices, and + # context-pressure status messages once the final reply has been delivered — + # keeping long-running turns visible live, then tidy afterward. Failed runs + # leave the bubbles in place as breadcrumbs. Off by default. + # Per-platform override: display.platforms.telegram.cleanup_progress + # true: Delete tracked progress/status bubbles on successful turn + # false: Leave everything in place (default) + # Example: + # display: + # platforms: + # telegram: + # cleanup_progress: true + cleanup_progress: false + # Gateway-only natural mid-turn assistant updates. # When true, completed assistant status messages are sent as separate chat # messages. This is independent of tool_progress and gateway streaming. diff --git a/gateway/display_config.py b/gateway/display_config.py index 832f5cb2f2..55cc344677 100644 --- a/gateway/display_config.py +++ b/gateway/display_config.py @@ -35,6 +35,12 @@ _GLOBAL_DEFAULTS: dict[str, Any] = { "show_reasoning": False, "tool_preview_length": 0, "streaming": None, # None = follow top-level streaming config + # When true, delete tool-progress / "Still working..." / status bubbles + # after the final response lands on platforms that support message + # deletion (e.g. Telegram). Off by default — progress is still shown + # live, just cleaned up after success so the chat doesn't fill up with + # stale breadcrumbs. Failed runs leave bubbles in place as breadcrumbs. + "cleanup_progress": False, } # --------------------------------------------------------------------------- @@ -188,6 +194,10 @@ def _normalise(setting: str, value: Any) -> Any: if isinstance(value, str): return value.lower() in ("true", "1", "yes", "on") return bool(value) + if setting == "cleanup_progress": + if isinstance(value, str): + return value.lower() in ("true", "1", "yes", "on") + return bool(value) if setting == "tool_preview_length": try: return int(value) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 5c2bbf96aa..5abbef808d 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -2096,9 +2096,52 @@ class BasePlatformAdapter(ABC): ``generation`` lets callers tie the callback to a specific gateway run generation so stale runs cannot clear callbacks owned by a fresher run. + + If a callback for the same ``session_key`` (and generation, when set) + is already registered, the new callback is chained — both fire, in + registration order, with per-callback exception isolation. This lets + independent features (background-review release + temporary-bubble + cleanup) coexist without clobbering each other. Stale-generation + callers never overwrite a fresher generation's slot. """ if not session_key or not callable(callback): return + + existing = self._post_delivery_callbacks.get(session_key) + if existing is not None: + if isinstance(existing, tuple) and len(existing) == 2: + existing_gen, existing_cb = existing + else: + existing_gen, existing_cb = None, existing + # Stale-generation registrations never overwrite a fresher slot. + if ( + existing_gen is not None + and generation is not None + and int(generation) < int(existing_gen) + ): + return + # Same-or-newer generation: chain with the existing callback so + # both fire in registration order. + if callable(existing_cb) and ( + existing_gen is None + or generation is None + or int(existing_gen) == int(generation) + ): + _prev = existing_cb + _new = callback + + def _chained() -> None: + try: + _prev() + except Exception: + logger.debug("Post-delivery callback failed", exc_info=True) + try: + _new() + except Exception: + logger.debug("Post-delivery callback failed", exc_info=True) + + callback = _chained + if generation is None: self._post_delivery_callbacks[session_key] = callback else: diff --git a/gateway/run.py b/gateway/run.py index 4f58aeee97..219b564eb8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -12845,6 +12845,24 @@ class GatewayRunner: last_tool = [None] # Mutable container for tracking in closure last_progress_msg = [None] # Track last message for dedup repeat_count = [0] # How many times the same message repeated + + # Auto-cleanup of temporary progress bubbles (Telegram + any adapter + # that implements ``delete_message``). When enabled via + # ``display.platforms..cleanup_progress: true``, message IDs + # from the tool-progress / "Still working..." / status-callback bubbles + # are collected here and deleted after the final response lands. + # Failed runs skip cleanup so the bubbles remain as breadcrumbs. + _cleanup_progress = bool( + resolve_display_setting(user_config, platform_key, "cleanup_progress") + ) + _cleanup_adapter = self.adapters.get(source.platform) if _cleanup_progress else None + if _cleanup_adapter is not None and ( + type(_cleanup_adapter).delete_message is BasePlatformAdapter.delete_message + ): + # Adapter doesn't support deletion — silently disable. + _cleanup_progress = False + _cleanup_adapter = None + _cleanup_msg_ids: List[str] = [] # First-touch onboarding latch: fires at most once per run, even if # several tools exceed the threshold. long_tool_hint_fired = [False] @@ -13093,12 +13111,18 @@ class GatewayRunner: adapter.name, ) can_edit = False - await adapter.send( + _flood_result = await adapter.send( chat_id=source.chat_id, content=msg, reply_to=_progress_reply_to, metadata=_progress_metadata, ) + if ( + _cleanup_progress + and getattr(_flood_result, "success", False) + and getattr(_flood_result, "message_id", None) + ): + _cleanup_msg_ids.append(str(_flood_result.message_id)) else: if can_edit: # First tool: send all accumulated text as new message @@ -13119,6 +13143,8 @@ class GatewayRunner: ) if result.success and result.message_id: progress_msg_id = result.message_id + if _cleanup_progress: + _cleanup_msg_ids.append(str(result.message_id)) _last_edit_ts = time.monotonic() @@ -13232,7 +13258,7 @@ class GatewayRunner: if not _status_adapter or not _run_still_current(): return try: - asyncio.run_coroutine_threadsafe( + _fut = asyncio.run_coroutine_threadsafe( _status_adapter.send( _status_chat_id, message, @@ -13240,6 +13266,16 @@ class GatewayRunner: ), _loop_for_step, ) + if _cleanup_progress: + def _track_status_id(fut) -> None: + try: + res = fut.result() + except Exception: + return + mid = getattr(res, "message_id", None) + if getattr(res, "success", False) and mid: + _cleanup_msg_ids.append(str(mid)) + _fut.add_done_callback(_track_status_id) except Exception as _e: logger.debug("status_callback error (%s): %s", event_type, _e) @@ -14100,11 +14136,17 @@ class GatewayRunner: except Exception: pass try: - await _notify_adapter.send( + _notify_res = await _notify_adapter.send( source.chat_id, f"⏳ Still working... ({_elapsed_mins} min elapsed{_status_detail})", metadata=_status_thread_metadata, ) + if ( + _cleanup_progress + and getattr(_notify_res, "success", False) + and getattr(_notify_res, "message_id", None) + ): + _cleanup_msg_ids.append(str(_notify_res.message_id)) except Exception as _ne: logger.debug("Long-running notification error: %s", _ne) @@ -14578,7 +14620,49 @@ class GatewayRunner: _previewed, ) response["already_sent"] = True - + + # Schedule deletion of tracked temporary progress bubbles after the + # final response lands. Failed runs skip this so bubbles remain as + # breadcrumbs for the user to see what work happened. Only fires on + # adapters that support ``delete_message`` (see init above); failures + # are swallowed — deletion is best-effort. + if ( + _cleanup_progress + and _cleanup_adapter is not None + and _cleanup_msg_ids + and session_key + and isinstance(response, dict) + and not response.get("failed") + and hasattr(_cleanup_adapter, "register_post_delivery_callback") + ): + _ids_snapshot = list(_cleanup_msg_ids) + _chat_id_snapshot = source.chat_id + _adapter_snapshot = _cleanup_adapter + _loop_snapshot = asyncio.get_running_loop() + + def _cleanup_temp_bubbles() -> None: + async def _delete_all() -> None: + for _mid in _ids_snapshot: + try: + await _adapter_snapshot.delete_message( + _chat_id_snapshot, _mid + ) + except Exception: + pass + try: + asyncio.run_coroutine_threadsafe(_delete_all(), _loop_snapshot) + except Exception: + pass + + try: + _cleanup_adapter.register_post_delivery_callback( + session_key, + _cleanup_temp_bubbles, + generation=run_generation, + ) + except Exception as _rpe: + logger.debug("Post-delivery cleanup registration failed: %s", _rpe) + return response diff --git a/scripts/release.py b/scripts/release.py index 19d744782e..0771d3f6dc 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -118,6 +118,8 @@ AUTHOR_MAP = { "heathley@Heathley-MacBook-Air.local": "heathley", "vlad19@gmail.com": "dandaka", "adamrummer@gmail.com": "cyclingwithelephants", + # Temporary tool-progress cleanup salvage (May 2026) + "Mrcharlesiv@gmail.com": "mrcharlesiv", "nbot@liizfq.top": "liizfq", "274096618+hermes-agent-dhabibi@users.noreply.github.com": "dhabibi", "dejie.guo@gmail.com": "JayGwod", diff --git a/tests/gateway/test_display_config.py b/tests/gateway/test_display_config.py index 07d5c82a5f..c702d3121d 100644 --- a/tests/gateway/test_display_config.py +++ b/tests/gateway/test_display_config.py @@ -333,3 +333,64 @@ class TestStreamingPerPlatform: } } assert resolve_display_setting(config, "email", "streaming") is True + + +# --------------------------------------------------------------------------- +# cleanup_progress — opt-in deletion of temporary progress bubbles +# --------------------------------------------------------------------------- + +class TestCleanupProgress: + """``cleanup_progress`` is off by default and resolvable per-platform.""" + + def test_default_off_for_all_platforms(self): + """No config set → cleanup_progress resolves to False everywhere.""" + from gateway.display_config import resolve_display_setting + + for plat in ("telegram", "discord", "slack", "email"): + assert resolve_display_setting({}, plat, "cleanup_progress") is False + + def test_global_true_applies_to_all_platforms(self): + """display.cleanup_progress=true opts in globally.""" + from gateway.display_config import resolve_display_setting + + config = {"display": {"cleanup_progress": True}} + assert resolve_display_setting(config, "telegram", "cleanup_progress") is True + assert resolve_display_setting(config, "discord", "cleanup_progress") is True + + def test_per_platform_override_wins(self): + """display.platforms..cleanup_progress beats the global value.""" + from gateway.display_config import resolve_display_setting + + config = { + "display": { + "cleanup_progress": False, + "platforms": { + "telegram": {"cleanup_progress": True}, + }, + } + } + assert resolve_display_setting(config, "telegram", "cleanup_progress") is True + assert resolve_display_setting(config, "discord", "cleanup_progress") is False + + def test_yaml_off_string_normalises_to_false(self): + """YAML 1.1 bare ``off`` becomes string 'off' — treat as False.""" + from gateway.display_config import resolve_display_setting + + config = { + "display": { + "platforms": {"telegram": {"cleanup_progress": "off"}}, + } + } + assert resolve_display_setting(config, "telegram", "cleanup_progress") is False + + def test_yaml_true_string_normalises_to_true(self): + """String 'true'/'yes'/'on' all resolve to True.""" + from gateway.display_config import resolve_display_setting + + for val in ("true", "yes", "on", "1"): + config = { + "display": { + "platforms": {"telegram": {"cleanup_progress": val}}, + } + } + assert resolve_display_setting(config, "telegram", "cleanup_progress") is True, val diff --git a/tests/gateway/test_post_delivery_callback_chaining.py b/tests/gateway/test_post_delivery_callback_chaining.py new file mode 100644 index 0000000000..38c1978f0f --- /dev/null +++ b/tests/gateway/test_post_delivery_callback_chaining.py @@ -0,0 +1,113 @@ +"""Tests for ``BasePlatformAdapter.register_post_delivery_callback`` chaining. + +When two features want to run after the final response lands on the same +session (e.g. background-review release + temporary-progress cleanup), the +registration API chains them rather than clobbering. Per-callback +exceptions are swallowed so one bad callback can't sabotage the others. +Stale-generation registrations are rejected. +""" +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import BasePlatformAdapter, SendResult + + +class _MinAdapter(BasePlatformAdapter): + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + return None + + async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult: + return SendResult(success=True, message_id="1") + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + +@pytest.fixture +def adapter(): + return _MinAdapter(PlatformConfig(enabled=True), Platform.TELEGRAM) + + +class TestPostDeliveryCallbackChaining: + def test_single_callback_fires(self, adapter): + fired = [] + adapter.register_post_delivery_callback("s", lambda: fired.append("A")) + cb = adapter.pop_post_delivery_callback("s") + cb() + assert fired == ["A"] + + def test_two_callbacks_chain_in_order(self, adapter): + fired = [] + adapter.register_post_delivery_callback("s", lambda: fired.append("A")) + adapter.register_post_delivery_callback("s", lambda: fired.append("B")) + cb = adapter.pop_post_delivery_callback("s") + cb() + assert fired == ["A", "B"] + + def test_three_callbacks_chain_in_order(self, adapter): + """Chain composes over an already-chained callback.""" + fired = [] + for label in ("A", "B", "C"): + adapter.register_post_delivery_callback( + "s", lambda x=label: fired.append(x) + ) + cb = adapter.pop_post_delivery_callback("s") + cb() + assert fired == ["A", "B", "C"] + + def test_exception_in_one_callback_does_not_block_next(self, adapter): + fired = [] + + def boom(): + raise ValueError("boom") + + adapter.register_post_delivery_callback("s", boom) + adapter.register_post_delivery_callback("s", lambda: fired.append("survived")) + cb = adapter.pop_post_delivery_callback("s") + cb() + assert fired == ["survived"] + + def test_same_generation_chains(self, adapter): + fired = [] + adapter.register_post_delivery_callback( + "s", lambda: fired.append("A"), generation=5 + ) + adapter.register_post_delivery_callback( + "s", lambda: fired.append("B"), generation=5 + ) + cb = adapter.pop_post_delivery_callback("s", generation=5) + cb() + assert fired == ["A", "B"] + + def test_stale_generation_registration_rejected(self, adapter): + """A registration with an older generation than the existing + entry is rejected — it doesn't clobber the newer run's slot.""" + fired = [] + adapter.register_post_delivery_callback( + "s", lambda: fired.append("gen7"), generation=7 + ) + adapter.register_post_delivery_callback( + "s", lambda: fired.append("stale_gen3"), generation=3 + ) + cb = adapter.pop_post_delivery_callback("s", generation=7) + cb() + assert fired == ["gen7"] + + def test_pop_at_wrong_generation_returns_none(self, adapter): + adapter.register_post_delivery_callback( + "s", lambda: None, generation=5 + ) + assert adapter.pop_post_delivery_callback("s", generation=99) is None + # Correct generation still finds it. + assert adapter.pop_post_delivery_callback("s", generation=5) is not None + + def test_empty_session_key_is_noop(self, adapter): + adapter.register_post_delivery_callback("", lambda: None) + assert adapter._post_delivery_callbacks == {} + + def test_non_callable_is_noop(self, adapter): + adapter.register_post_delivery_callback("s", "not-callable") # type: ignore[arg-type] + assert adapter._post_delivery_callbacks == {} diff --git a/tests/gateway/test_run_cleanup_progress.py b/tests/gateway/test_run_cleanup_progress.py new file mode 100644 index 0000000000..3e1439cc0d --- /dev/null +++ b/tests/gateway/test_run_cleanup_progress.py @@ -0,0 +1,367 @@ +"""Tests for opt-in cleanup of temporary progress bubbles. + +When ``display.platforms..cleanup_progress: true`` is set for a +platform whose adapter supports message deletion (e.g. Telegram), the +tool-progress bubble, "⏳ Still working..." notices, and status-callback +messages sent during a run are deleted after the final response is +delivered. + +Failed runs skip cleanup so the bubbles remain as breadcrumbs. +Adapters without ``delete_message`` silently no-op. +""" + +import asyncio +import importlib +import sys +import time +import types +from types import SimpleNamespace + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import BasePlatformAdapter, SendResult +from gateway.session import SessionSource + + +# --------------------------------------------------------------------------- +# Test fakes — mirror those in test_run_progress_topics.py but add a +# delete_message implementation that records ids instead of hitting a bot. +# --------------------------------------------------------------------------- + + +class CleanupCaptureAdapter(BasePlatformAdapter): + """Adapter that records every delete_message call for inspection.""" + + _next_mid = 100 + + def __init__(self, platform=Platform.TELEGRAM): + super().__init__(PlatformConfig(enabled=True, token="***"), platform) + self.sent = [] + self.edits = [] + self.deleted = [] + + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + return None + + def _mint_id(self) -> str: + CleanupCaptureAdapter._next_mid += 1 + return str(CleanupCaptureAdapter._next_mid) + + async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult: + mid = self._mint_id() + self.sent.append( + {"chat_id": chat_id, "content": content, "message_id": mid, "metadata": metadata} + ) + return SendResult(success=True, message_id=mid) + + async def edit_message(self, chat_id, message_id, content) -> SendResult: + self.edits.append({"chat_id": chat_id, "message_id": message_id, "content": content}) + return SendResult(success=True, message_id=message_id) + + async def delete_message(self, chat_id, message_id) -> bool: + self.deleted.append({"chat_id": chat_id, "message_id": str(message_id)}) + return True + + async def send_typing(self, chat_id, metadata=None) -> None: + return None + + async def stop_typing(self, chat_id) -> None: + return None + + async def get_chat_info(self, chat_id: str): + return {"id": chat_id} + + +class NoDeleteAdapter(CleanupCaptureAdapter): + """Adapter that inherits the base no-op delete_message (used to prove + the cleanup path skips adapters without deletion support).""" + + async def delete_message(self, chat_id, message_id) -> bool: # type: ignore[override] + # Pretend to be an adapter whose platform doesn't support deletion: + # match the base class behavior exactly. gateway/run.py checks + # ``type(adapter).delete_message is BasePlatformAdapter.delete_message`` + # to detect this, so we re-assign at class body level below. + raise AssertionError("should not be called — cleanup must skip this adapter") + + +# Re-bind so the class's delete_message identity equals the base's. +NoDeleteAdapter.delete_message = BasePlatformAdapter.delete_message + + +class ProgressAgent: + """Emits two tool-progress events and returns a normal final response.""" + + def __init__(self, **kwargs): + self.tool_progress_callback = kwargs.get("tool_progress_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + cb = self.tool_progress_callback + if cb is not None: + cb("tool.started", "terminal", "pwd", {}) + time.sleep(0.25) + cb("tool.started", "terminal", "ls", {}) + time.sleep(0.25) + return {"final_response": "done", "messages": [], "api_calls": 1} + + +class FailingAgent: + def __init__(self, **kwargs): + self.tool_progress_callback = kwargs.get("tool_progress_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + cb = self.tool_progress_callback + if cb is not None: + cb("tool.started", "terminal", "pwd", {}) + time.sleep(0.25) + # Empty final_response + failed=True is the shape the gateway + # actually returns on provider errors (see gateway/run.py where + # failed keys are only propagated when final_response is empty). + return { + "final_response": "", + "messages": [], + "api_calls": 1, + "failed": True, + "error": "simulated provider failure", + } + + +def _make_runner(adapter): + gateway_run = importlib.import_module("gateway.run") + GatewayRunner = gateway_run.GatewayRunner + runner = object.__new__(GatewayRunner) + runner.adapters = {adapter.platform: adapter} + runner._voice_mode = {} + runner._prefill_messages = [] + runner._ephemeral_system_prompt = "" + runner._reasoning_config = None + runner._provider_routing = {} + runner._fallback_model = None + runner._session_db = None + runner._running_agents = {} + runner._session_run_generation = {} + runner.hooks = SimpleNamespace(loaded_hooks=False) + runner.config = SimpleNamespace( + thread_sessions_per_user=False, + group_sessions_per_user=False, + stt_enabled=False, + ) + return runner + + +def _install_fakes(monkeypatch, agent_cls, *, cleanup_on: bool): + """Wire up the module stubs every _run_agent test needs.""" + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *a, **k: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = agent_cls + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + import tools.terminal_tool # noqa: F401 — register tool emoji + + gateway_run = importlib.import_module("gateway.run") + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}) + + # Wire the per-platform cleanup_progress flag via the config loader the + # gateway actually reads (``_load_gateway_config`` returns user config). + cfg = { + "display": { + "platforms": { + "telegram": {"cleanup_progress": True}, + } + } + } if cleanup_on else {} + monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: cfg) + return gateway_run + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_cleanup_off_by_default_leaves_bubbles(monkeypatch, tmp_path): + """Without ``cleanup_progress: true``, firing whatever callback is + registered never reaches delete_message.""" + adapter = CleanupCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=False) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001") + session_key = "agent:main:telegram:group:-1001" + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key=session_key, + ) + + assert result["final_response"] == "done" + # Even if an unrelated callback got registered (background-review + # release lives in the same slot) firing it should never cause any + # delete_message calls when cleanup is off. + cb = adapter.pop_post_delivery_callback(session_key) + if cb is not None: + cb() + for _ in range(10): + await asyncio.sleep(0.01) + assert adapter.deleted == [] + + +@pytest.mark.asyncio +async def test_cleanup_registers_callback_and_deletes_on_success(monkeypatch, tmp_path): + """With the flag on, the cleanup callback deletes the progress bubble.""" + adapter = CleanupCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=True) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001") + session_key = "agent:main:telegram:group:-1001" + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key=session_key, + ) + + assert result["final_response"] == "done" + # The cleanup callback should be registered for this session. + cb = adapter.pop_post_delivery_callback(session_key) + assert callable(cb) + + # Fire it (base.py does this in _process_message_background's finally) + # and let the scheduled coroutine run to completion. + cb() + # delete_message is scheduled via run_coroutine_threadsafe → give the + # loop a couple of ticks to drain. + for _ in range(20): + await asyncio.sleep(0.01) + if adapter.deleted: + break + + # At least the first tool-progress bubble should have been deleted. + assert len(adapter.deleted) >= 1, f"deleted={adapter.deleted} sent={adapter.sent}" + for entry in adapter.deleted: + assert entry["chat_id"] == "-1001" + + +@pytest.mark.asyncio +async def test_cleanup_skipped_on_failed_run(monkeypatch, tmp_path): + """Failed runs skip cleanup registration — breadcrumbs stay.""" + adapter = CleanupCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = _install_fakes(monkeypatch, FailingAgent, cleanup_on=True) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001") + session_key = "agent:main:telegram:group:-1001" + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key=session_key, + ) + + assert result.get("failed") is True + # Whatever callback is registered should not trigger any deletion — + # the cleanup callback is skipped on failed runs. + cb = adapter.pop_post_delivery_callback(session_key) + if cb is not None: + cb() + for _ in range(10): + await asyncio.sleep(0.01) + assert adapter.deleted == [] + + +@pytest.mark.asyncio +async def test_cleanup_noop_on_adapter_without_delete_support(monkeypatch, tmp_path): + """Adapters that inherit the base-class delete_message no-op are + detected up front — the cleanup path never registers its callback so + a stray bg-review callback (if present) can fire harmlessly.""" + adapter = NoDeleteAdapter() + runner = _make_runner(adapter) + gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=True) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001") + session_key = "agent:main:telegram:group:-1001" + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key=session_key, + ) + + assert result["final_response"] == "done" + # No deletion attempts on an adapter without delete_message support. + # (The NoDeleteAdapter.delete_message would raise AssertionError if + # the cleanup closure had somehow captured a reference to it.) + assert adapter.deleted == [] + + +@pytest.mark.asyncio +async def test_cleanup_chains_with_existing_callback(monkeypatch, tmp_path): + """When a bg-review-style callback is already registered, the cleanup + callback chains with it — both fire, neither clobbers the other.""" + adapter = CleanupCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=True) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001") + session_key = "agent:main:telegram:group:-1001" + + pre_existing_fired = [] + + def _preexisting_callback() -> None: + pre_existing_fired.append(True) + + # Pre-register a callback with the same generation the run will use + # (run_generation=None in this test path — matches the default slot). + adapter.register_post_delivery_callback(session_key, _preexisting_callback) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key=session_key, + ) + + assert result["final_response"] == "done" + cb = adapter.pop_post_delivery_callback(session_key) + assert callable(cb) + cb() + for _ in range(20): + await asyncio.sleep(0.01) + if adapter.deleted: + break + + # Both effects land: the pre-existing callback fires AND the cleanup + # deletes at least one progress bubble. + assert pre_existing_fired == [True] + assert len(adapter.deleted) >= 1