fix: keep poll read-only for notify_on_complete watcher

This commit is contained in:
Liao Shiwu 2026-04-29 19:34:35 +08:00 committed by Teknium
parent 9078b4bbdf
commit 6f5f58e34b
4 changed files with 54 additions and 7 deletions

View file

@ -13136,7 +13136,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if session.exited:
# --- Agent-triggered completion: inject synthetic message ---
# Skip if the agent already consumed the result via wait/poll/log
# Skip if the agent already consumed the result via wait/log.
# poll() is read-only and intentionally does NOT mark consumed
# (#10156) — a status check must not suppress this delivery turn.
from tools.process_registry import format_process_notification, process_registry as _pr_check
if agent_notify and not _pr_check.is_completion_consumed(session_id):
from tools.ansi_strip import strip_ansi

View file

@ -17,6 +17,7 @@ from gateway.config import GatewayConfig, Platform
from gateway.platforms.base import MessageEvent
from gateway.run import GatewayRunner
from gateway.session import SessionSource
from tools.process_registry import ProcessRegistry, ProcessSession
# ---------------------------------------------------------------------------
@ -99,6 +100,46 @@ async def test_notify_on_complete_sets_internal_flag(monkeypatch, tmp_path):
assert event.internal is True, "Synthetic completion event must be marked internal"
@pytest.mark.asyncio
async def test_poll_does_not_suppress_notify_on_complete_watcher(monkeypatch, tmp_path):
"""Regression: polling an exited process must not suppress watcher injection."""
import tools.process_registry as pr_module
registry = ProcessRegistry()
session = ProcessSession(
id="proc_polled_completion",
command="echo done",
output_buffer="done\n",
exited=True,
exit_code=0,
notify_on_complete=True,
)
registry._finished[session.id] = session
poll_result = registry.poll(session.id)
assert poll_result["status"] == "exited"
assert not registry.is_completion_consumed(session.id)
monkeypatch.setattr(pr_module, "process_registry", registry)
async def _instant_sleep(*_a, **_kw):
pass
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
runner = _build_runner(monkeypatch, tmp_path)
adapter = runner.adapters[Platform.DISCORD]
watcher = _watcher_dict_with_notify()
watcher["session_id"] = session.id
await runner._run_process_watcher(watcher)
assert adapter.handle_message.await_count == 1
event = adapter.handle_message.await_args.args[0]
assert session.id in event.text
assert event.internal is True
@pytest.mark.asyncio
async def test_internal_event_bypasses_authorization(monkeypatch, tmp_path):
"""An internal event should skip _is_user_authorized entirely."""

View file

@ -325,7 +325,7 @@ class TestCodeExecutionBlocked:
# =========================================================================
class TestCompletionConsumed:
"""Test that wait/poll/log suppress redundant completion notifications."""
"""Test that wait/log consume completion notifications while poll stays read-only."""
def test_wait_marks_completion_consumed(self, registry):
"""wait() returning exited status marks session as consumed."""
@ -347,8 +347,8 @@ class TestCompletionConsumed:
# Now the completion is marked as consumed
assert registry.is_completion_consumed("proc_wait")
def test_poll_marks_completion_consumed(self, registry):
"""poll() returning exited status marks session as consumed."""
def test_poll_does_not_mark_completion_consumed(self, registry):
"""poll() is a read-only status check and must not suppress notify_on_complete."""
s = _make_session(sid="proc_poll", notify_on_complete=True, output="done")
s.exited = True
s.exit_code = 0
@ -356,7 +356,7 @@ class TestCompletionConsumed:
result = registry.poll("proc_poll")
assert result["status"] == "exited"
assert registry.is_completion_consumed("proc_poll")
assert not registry.is_completion_consumed("proc_poll")
def test_log_marks_completion_consumed(self, registry):
"""read_log() on exited session marks as consumed."""

View file

@ -908,7 +908,7 @@ class ProcessRegistry:
# ----- Query Methods -----
def is_completion_consumed(self, session_id: str) -> bool:
"""Check if a completion notification was already consumed via wait/poll/log."""
"""Check if a completion notification was already consumed via wait/log."""
return session_id in self._completion_consumed
def drain_notifications(self) -> "list[tuple[dict, str]]":
@ -1038,7 +1038,11 @@ class ProcessRegistry:
result["exit_code"] = session.exit_code
result["completion_reason"] = session.completion_reason
result["termination_source"] = session.termination_source
self._completion_consumed.add(session_id)
# NOTE: poll() is a read-only status query and deliberately does
# NOT mark the session _completion_consumed. wait()/read_log()
# represent actual output consumption and do mark it. Marking
# consumed here would let a status check silently suppress the
# notify_on_complete watcher's autonomous delivery turn (#10156).
if session.detached:
result["detached"] = True
result["note"] = "Process recovered after restart -- output history unavailable"