feat(gateway): opt-in cleanup of temporary progress bubbles (#21186)

When display.cleanup_progress (or display.platforms.<plat>.cleanup_progress)
is true, the gateway deletes tool-progress bubbles, long-running ' Still
working...' notices, and status-callback messages after the final response
is delivered successfully. Currently effective on adapters that implement
delete_message (Telegram); silently no-ops elsewhere. Off by default.
Failed runs skip cleanup so bubbles stay as breadcrumbs.

Minimal plumbing: base.py's existing post_delivery_callback slot now chains
new registrations onto any existing callback (with per-callback exception
isolation) rather than clobbering. Stale-generation registrations are
rejected so they can't step on a fresher run's callbacks. This lets the
cleanup callback coexist with the background-review release hook already
registered on the same slot.

Co-authored-by: mrcharlesiv <Mrcharlesiv@gmail.com>
This commit is contained in:
Teknium 2026-05-07 05:04:37 -07:00 committed by GitHub
parent 7c0766e06a
commit bf843adf05
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 700 additions and 4 deletions

View file

@ -333,3 +333,64 @@ class TestStreamingPerPlatform:
}
}
assert resolve_display_setting(config, "email", "streaming") is True
# ---------------------------------------------------------------------------
# cleanup_progress — opt-in deletion of temporary progress bubbles
# ---------------------------------------------------------------------------
class TestCleanupProgress:
"""``cleanup_progress`` is off by default and resolvable per-platform."""
def test_default_off_for_all_platforms(self):
"""No config set → cleanup_progress resolves to False everywhere."""
from gateway.display_config import resolve_display_setting
for plat in ("telegram", "discord", "slack", "email"):
assert resolve_display_setting({}, plat, "cleanup_progress") is False
def test_global_true_applies_to_all_platforms(self):
"""display.cleanup_progress=true opts in globally."""
from gateway.display_config import resolve_display_setting
config = {"display": {"cleanup_progress": True}}
assert resolve_display_setting(config, "telegram", "cleanup_progress") is True
assert resolve_display_setting(config, "discord", "cleanup_progress") is True
def test_per_platform_override_wins(self):
"""display.platforms.<plat>.cleanup_progress beats the global value."""
from gateway.display_config import resolve_display_setting
config = {
"display": {
"cleanup_progress": False,
"platforms": {
"telegram": {"cleanup_progress": True},
},
}
}
assert resolve_display_setting(config, "telegram", "cleanup_progress") is True
assert resolve_display_setting(config, "discord", "cleanup_progress") is False
def test_yaml_off_string_normalises_to_false(self):
"""YAML 1.1 bare ``off`` becomes string 'off' — treat as False."""
from gateway.display_config import resolve_display_setting
config = {
"display": {
"platforms": {"telegram": {"cleanup_progress": "off"}},
}
}
assert resolve_display_setting(config, "telegram", "cleanup_progress") is False
def test_yaml_true_string_normalises_to_true(self):
"""String 'true'/'yes'/'on' all resolve to True."""
from gateway.display_config import resolve_display_setting
for val in ("true", "yes", "on", "1"):
config = {
"display": {
"platforms": {"telegram": {"cleanup_progress": val}},
}
}
assert resolve_display_setting(config, "telegram", "cleanup_progress") is True, val

View file

@ -0,0 +1,113 @@
"""Tests for ``BasePlatformAdapter.register_post_delivery_callback`` chaining.
When two features want to run after the final response lands on the same
session (e.g. background-review release + temporary-progress cleanup), the
registration API chains them rather than clobbering. Per-callback
exceptions are swallowed so one bad callback can't sabotage the others.
Stale-generation registrations are rejected.
"""
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
class _MinAdapter(BasePlatformAdapter):
async def connect(self) -> bool:
return True
async def disconnect(self) -> None:
return None
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
return SendResult(success=True, message_id="1")
async def get_chat_info(self, chat_id):
return {"id": chat_id}
@pytest.fixture
def adapter():
return _MinAdapter(PlatformConfig(enabled=True), Platform.TELEGRAM)
class TestPostDeliveryCallbackChaining:
def test_single_callback_fires(self, adapter):
fired = []
adapter.register_post_delivery_callback("s", lambda: fired.append("A"))
cb = adapter.pop_post_delivery_callback("s")
cb()
assert fired == ["A"]
def test_two_callbacks_chain_in_order(self, adapter):
fired = []
adapter.register_post_delivery_callback("s", lambda: fired.append("A"))
adapter.register_post_delivery_callback("s", lambda: fired.append("B"))
cb = adapter.pop_post_delivery_callback("s")
cb()
assert fired == ["A", "B"]
def test_three_callbacks_chain_in_order(self, adapter):
"""Chain composes over an already-chained callback."""
fired = []
for label in ("A", "B", "C"):
adapter.register_post_delivery_callback(
"s", lambda x=label: fired.append(x)
)
cb = adapter.pop_post_delivery_callback("s")
cb()
assert fired == ["A", "B", "C"]
def test_exception_in_one_callback_does_not_block_next(self, adapter):
fired = []
def boom():
raise ValueError("boom")
adapter.register_post_delivery_callback("s", boom)
adapter.register_post_delivery_callback("s", lambda: fired.append("survived"))
cb = adapter.pop_post_delivery_callback("s")
cb()
assert fired == ["survived"]
def test_same_generation_chains(self, adapter):
fired = []
adapter.register_post_delivery_callback(
"s", lambda: fired.append("A"), generation=5
)
adapter.register_post_delivery_callback(
"s", lambda: fired.append("B"), generation=5
)
cb = adapter.pop_post_delivery_callback("s", generation=5)
cb()
assert fired == ["A", "B"]
def test_stale_generation_registration_rejected(self, adapter):
"""A registration with an older generation than the existing
entry is rejected it doesn't clobber the newer run's slot."""
fired = []
adapter.register_post_delivery_callback(
"s", lambda: fired.append("gen7"), generation=7
)
adapter.register_post_delivery_callback(
"s", lambda: fired.append("stale_gen3"), generation=3
)
cb = adapter.pop_post_delivery_callback("s", generation=7)
cb()
assert fired == ["gen7"]
def test_pop_at_wrong_generation_returns_none(self, adapter):
adapter.register_post_delivery_callback(
"s", lambda: None, generation=5
)
assert adapter.pop_post_delivery_callback("s", generation=99) is None
# Correct generation still finds it.
assert adapter.pop_post_delivery_callback("s", generation=5) is not None
def test_empty_session_key_is_noop(self, adapter):
adapter.register_post_delivery_callback("", lambda: None)
assert adapter._post_delivery_callbacks == {}
def test_non_callable_is_noop(self, adapter):
adapter.register_post_delivery_callback("s", "not-callable") # type: ignore[arg-type]
assert adapter._post_delivery_callbacks == {}

View file

@ -0,0 +1,367 @@
"""Tests for opt-in cleanup of temporary progress bubbles.
When ``display.platforms.<plat>.cleanup_progress: true`` is set for a
platform whose adapter supports message deletion (e.g. Telegram), the
tool-progress bubble, "⏳ Still working..." notices, and status-callback
messages sent during a run are deleted after the final response is
delivered.
Failed runs skip cleanup so the bubbles remain as breadcrumbs.
Adapters without ``delete_message`` silently no-op.
"""
import asyncio
import importlib
import sys
import time
import types
from types import SimpleNamespace
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.session import SessionSource
# ---------------------------------------------------------------------------
# Test fakes — mirror those in test_run_progress_topics.py but add a
# delete_message implementation that records ids instead of hitting a bot.
# ---------------------------------------------------------------------------
class CleanupCaptureAdapter(BasePlatformAdapter):
"""Adapter that records every delete_message call for inspection."""
_next_mid = 100
def __init__(self, platform=Platform.TELEGRAM):
super().__init__(PlatformConfig(enabled=True, token="***"), platform)
self.sent = []
self.edits = []
self.deleted = []
async def connect(self) -> bool:
return True
async def disconnect(self) -> None:
return None
def _mint_id(self) -> str:
CleanupCaptureAdapter._next_mid += 1
return str(CleanupCaptureAdapter._next_mid)
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
mid = self._mint_id()
self.sent.append(
{"chat_id": chat_id, "content": content, "message_id": mid, "metadata": metadata}
)
return SendResult(success=True, message_id=mid)
async def edit_message(self, chat_id, message_id, content) -> SendResult:
self.edits.append({"chat_id": chat_id, "message_id": message_id, "content": content})
return SendResult(success=True, message_id=message_id)
async def delete_message(self, chat_id, message_id) -> bool:
self.deleted.append({"chat_id": chat_id, "message_id": str(message_id)})
return True
async def send_typing(self, chat_id, metadata=None) -> None:
return None
async def stop_typing(self, chat_id) -> None:
return None
async def get_chat_info(self, chat_id: str):
return {"id": chat_id}
class NoDeleteAdapter(CleanupCaptureAdapter):
"""Adapter that inherits the base no-op delete_message (used to prove
the cleanup path skips adapters without deletion support)."""
async def delete_message(self, chat_id, message_id) -> bool: # type: ignore[override]
# Pretend to be an adapter whose platform doesn't support deletion:
# match the base class behavior exactly. gateway/run.py checks
# ``type(adapter).delete_message is BasePlatformAdapter.delete_message``
# to detect this, so we re-assign at class body level below.
raise AssertionError("should not be called — cleanup must skip this adapter")
# Re-bind so the class's delete_message identity equals the base's.
NoDeleteAdapter.delete_message = BasePlatformAdapter.delete_message
class ProgressAgent:
"""Emits two tool-progress events and returns a normal final response."""
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
cb = self.tool_progress_callback
if cb is not None:
cb("tool.started", "terminal", "pwd", {})
time.sleep(0.25)
cb("tool.started", "terminal", "ls", {})
time.sleep(0.25)
return {"final_response": "done", "messages": [], "api_calls": 1}
class FailingAgent:
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
cb = self.tool_progress_callback
if cb is not None:
cb("tool.started", "terminal", "pwd", {})
time.sleep(0.25)
# Empty final_response + failed=True is the shape the gateway
# actually returns on provider errors (see gateway/run.py where
# failed keys are only propagated when final_response is empty).
return {
"final_response": "",
"messages": [],
"api_calls": 1,
"failed": True,
"error": "simulated provider failure",
}
def _make_runner(adapter):
gateway_run = importlib.import_module("gateway.run")
GatewayRunner = gateway_run.GatewayRunner
runner = object.__new__(GatewayRunner)
runner.adapters = {adapter.platform: adapter}
runner._voice_mode = {}
runner._prefill_messages = []
runner._ephemeral_system_prompt = ""
runner._reasoning_config = None
runner._provider_routing = {}
runner._fallback_model = None
runner._session_db = None
runner._running_agents = {}
runner._session_run_generation = {}
runner.hooks = SimpleNamespace(loaded_hooks=False)
runner.config = SimpleNamespace(
thread_sessions_per_user=False,
group_sessions_per_user=False,
stt_enabled=False,
)
return runner
def _install_fakes(monkeypatch, agent_cls, *, cleanup_on: bool):
"""Wire up the module stubs every _run_agent test needs."""
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *a, **k: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = agent_cls
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
import tools.terminal_tool # noqa: F401 — register tool emoji
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"})
# Wire the per-platform cleanup_progress flag via the config loader the
# gateway actually reads (``_load_gateway_config`` returns user config).
cfg = {
"display": {
"platforms": {
"telegram": {"cleanup_progress": True},
}
}
} if cleanup_on else {}
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: cfg)
return gateway_run
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cleanup_off_by_default_leaves_bubbles(monkeypatch, tmp_path):
"""Without ``cleanup_progress: true``, firing whatever callback is
registered never reaches delete_message."""
adapter = CleanupCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=False)
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001")
session_key = "agent:main:telegram:group:-1001"
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-1",
session_key=session_key,
)
assert result["final_response"] == "done"
# Even if an unrelated callback got registered (background-review
# release lives in the same slot) firing it should never cause any
# delete_message calls when cleanup is off.
cb = adapter.pop_post_delivery_callback(session_key)
if cb is not None:
cb()
for _ in range(10):
await asyncio.sleep(0.01)
assert adapter.deleted == []
@pytest.mark.asyncio
async def test_cleanup_registers_callback_and_deletes_on_success(monkeypatch, tmp_path):
"""With the flag on, the cleanup callback deletes the progress bubble."""
adapter = CleanupCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=True)
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001")
session_key = "agent:main:telegram:group:-1001"
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-1",
session_key=session_key,
)
assert result["final_response"] == "done"
# The cleanup callback should be registered for this session.
cb = adapter.pop_post_delivery_callback(session_key)
assert callable(cb)
# Fire it (base.py does this in _process_message_background's finally)
# and let the scheduled coroutine run to completion.
cb()
# delete_message is scheduled via run_coroutine_threadsafe → give the
# loop a couple of ticks to drain.
for _ in range(20):
await asyncio.sleep(0.01)
if adapter.deleted:
break
# At least the first tool-progress bubble should have been deleted.
assert len(adapter.deleted) >= 1, f"deleted={adapter.deleted} sent={adapter.sent}"
for entry in adapter.deleted:
assert entry["chat_id"] == "-1001"
@pytest.mark.asyncio
async def test_cleanup_skipped_on_failed_run(monkeypatch, tmp_path):
"""Failed runs skip cleanup registration — breadcrumbs stay."""
adapter = CleanupCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = _install_fakes(monkeypatch, FailingAgent, cleanup_on=True)
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001")
session_key = "agent:main:telegram:group:-1001"
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-1",
session_key=session_key,
)
assert result.get("failed") is True
# Whatever callback is registered should not trigger any deletion —
# the cleanup callback is skipped on failed runs.
cb = adapter.pop_post_delivery_callback(session_key)
if cb is not None:
cb()
for _ in range(10):
await asyncio.sleep(0.01)
assert adapter.deleted == []
@pytest.mark.asyncio
async def test_cleanup_noop_on_adapter_without_delete_support(monkeypatch, tmp_path):
"""Adapters that inherit the base-class delete_message no-op are
detected up front the cleanup path never registers its callback so
a stray bg-review callback (if present) can fire harmlessly."""
adapter = NoDeleteAdapter()
runner = _make_runner(adapter)
gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=True)
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001")
session_key = "agent:main:telegram:group:-1001"
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-1",
session_key=session_key,
)
assert result["final_response"] == "done"
# No deletion attempts on an adapter without delete_message support.
# (The NoDeleteAdapter.delete_message would raise AssertionError if
# the cleanup closure had somehow captured a reference to it.)
assert adapter.deleted == []
@pytest.mark.asyncio
async def test_cleanup_chains_with_existing_callback(monkeypatch, tmp_path):
"""When a bg-review-style callback is already registered, the cleanup
callback chains with it both fire, neither clobbers the other."""
adapter = CleanupCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = _install_fakes(monkeypatch, ProgressAgent, cleanup_on=True)
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
source = SessionSource(platform=Platform.TELEGRAM, chat_id="-1001")
session_key = "agent:main:telegram:group:-1001"
pre_existing_fired = []
def _preexisting_callback() -> None:
pre_existing_fired.append(True)
# Pre-register a callback with the same generation the run will use
# (run_generation=None in this test path — matches the default slot).
adapter.register_post_delivery_callback(session_key, _preexisting_callback)
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-1",
session_key=session_key,
)
assert result["final_response"] == "done"
cb = adapter.pop_post_delivery_callback(session_key)
assert callable(cb)
cb()
for _ in range(20):
await asyncio.sleep(0.01)
if adapter.deleted:
break
# Both effects land: the pre-existing callback fires AND the cleanup
# deletes at least one progress bubble.
assert pre_existing_fired == [True]
assert len(adapter.deleted) >= 1