diff --git a/tests/tools/test_notify_on_complete.py b/tests/tools/test_notify_on_complete.py index e36b27e44f8..23b3af34184 100644 --- a/tests/tools/test_notify_on_complete.py +++ b/tests/tools/test_notify_on_complete.py @@ -378,6 +378,72 @@ class TestCompletionConsumed: assert result["status"] == "running" assert not registry.is_completion_consumed("proc_running") + def test_poll_marks_poll_observed_for_cli_drain(self, registry): + """poll() on an exited process records _poll_observed so the CLI drain + dedups (the agent already saw the exit inline) without marking the + session _completion_consumed (which would suppress the gateway watcher).""" + s = _make_session(sid="proc_pobs", notify_on_complete=True, output="done") + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + + # Completion is queued, nothing consumed/observed yet. + assert not registry.completion_queue.empty() + assert "proc_pobs" not in registry._poll_observed + assert not registry.is_completion_consumed("proc_pobs") + + # Agent polls inline — read-only, so NOT _completion_consumed, but the + # exit was observed so the CLI drain must skip the queued completion. + assert registry.poll("proc_pobs")["status"] == "exited" + assert "proc_pobs" in registry._poll_observed + assert not registry.is_completion_consumed("proc_pobs") + + # CLI drain skips it → no duplicate [SYSTEM: ...] injection (#8228). + drained = registry.drain_notifications() + assert drained == [] + + def test_poll_observed_does_not_suppress_gateway_watcher(self, registry): + """The gateway/tui watcher gate (is_completion_consumed) must stay False + after a read-only poll, so the autonomous delivery turn still fires + even though the CLI drain was deduped (#10156).""" + s = _make_session(sid="proc_gw", notify_on_complete=True, output="done") + s.exited = True + s.exit_code = 0 + registry._finished[s.id] = s + + registry.poll("proc_gw") + # CLI-side dedup signal present... + assert "proc_gw" in registry._poll_observed + # ...but the gateway watcher gate is untouched, so it still delivers. + assert not registry.is_completion_consumed("proc_gw") + + def test_running_poll_does_not_mark_poll_observed(self, registry): + """poll() on a still-running process must not record _poll_observed.""" + s = _make_session(sid="proc_run2", notify_on_complete=True, output="partial") + registry._running[s.id] = s + + registry.poll("proc_run2") + assert "proc_run2" not in registry._poll_observed + + def test_wait_and_log_still_skip_cli_drain(self, registry): + """wait()/read_log() consume the output, so the CLI drain skips their + completions via _completion_consumed (the original #8228 contract).""" + for sid, action in (("proc_w", "wait"), ("proc_l", "log")): + s = _make_session(sid=sid, notify_on_complete=True, output="done") + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + if action == "wait": + registry.wait(sid, timeout=1) + else: + registry.read_log(sid) + assert registry.is_completion_consumed(sid) + assert registry.drain_notifications() == [] + # --------------------------------------------------------------------------- # Silent-background-process hint diff --git a/tools/process_registry.py b/tools/process_registry.py index 6b78c3b45b1..a8bd30b083b 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -171,9 +171,21 @@ class ProcessRegistry: self.completion_queue: _queue_mod.Queue = _queue_mod.Queue() # Track sessions whose completion was already consumed by the agent - # via wait/poll/log. Drain loops skip notifications for these. + # via wait/log. Drain loops AND gateway/tui watchers skip notifications + # for these — a blocking wait() or a full read_log() means the agent + # has the output in hand and is acting on it this turn. self._completion_consumed: set = set() + # Track sessions the agent merely *observed* exited via poll(). poll() + # is a read-only status check, so it does NOT mark _completion_consumed + # (that would let a status check suppress the gateway/tui watcher's + # autonomous delivery turn — #10156). But on the CLI the poll result + # is returned inline in the same turn, so the idle/post-turn drain must + # still skip the queued completion to avoid a duplicate [SYSTEM: ...] + # injection (the bug #8228 originally fixed). drain_notifications() + # consults this set; the gateway/tui watchers deliberately do NOT. + self._poll_observed: set = set() + # Global watch-match circuit breaker — across all sessions. # Prevents sibling processes from collectively flooding the user even # when each stays under its own per-session cap. @@ -911,11 +923,25 @@ class ProcessRegistry: """Check if a completion notification was already consumed via wait/log.""" return session_id in self._completion_consumed + def _drain_should_skip(self, session_id: str) -> bool: + """Whether the CLI drain should skip a completion event for this session. + + Skips when the agent has either truly consumed the output (wait/log → + ``_completion_consumed``) or observed the exit inline via poll() + (``_poll_observed``). In both cases the CLI agent already has the + result this turn, so injecting a [SYSTEM: ...] completion would be a + duplicate (#8228). The gateway/tui watchers do NOT use this — they + check only ``is_completion_consumed`` so a read-only poll never + suppresses their autonomous delivery turn (#10156). + """ + return session_id in self._completion_consumed or session_id in self._poll_observed + def drain_notifications(self) -> "list[tuple[dict, str]]": """Pop all pending notification events and return formatted pairs. Returns a list of (raw_event, formatted_text) tuples. - Skips completion events that were already consumed via wait/poll/log. + Skips completion events the agent already consumed via wait/log or + observed inline via poll() (see ``_drain_should_skip``). """ results = [] while not self.completion_queue.empty(): @@ -924,7 +950,7 @@ class ProcessRegistry: except Exception: break _evt_sid = evt.get("session_id", "") - if evt.get("type") == "completion" and self.is_completion_consumed(_evt_sid): + if evt.get("type") == "completion" and self._drain_should_skip(_evt_sid): continue text = format_process_notification(evt) if text: @@ -1043,6 +1069,12 @@ class ProcessRegistry: # represent actual output consumption and do mark it. Marking # consumed here would let a status check silently suppress the # notify_on_complete watcher's autonomous delivery turn (#10156). + # + # We DO record it in _poll_observed so the CLI's inline drain still + # dedups (the agent already saw the exit in this turn's poll result) + # without affecting the gateway/tui watchers, which only consult + # _completion_consumed. + self._poll_observed.add(session_id) if session.detached: result["detached"] = True result["note"] = "Process recovered after restart -- output history unavailable" @@ -1398,6 +1430,7 @@ class ProcessRegistry: for sid in expired: del self._finished[sid] self._completion_consumed.discard(sid) + self._poll_observed.discard(sid) # If still over limit, remove oldest finished total = len(self._running) + len(self._finished) @@ -1405,14 +1438,19 @@ class ProcessRegistry: oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at) del self._finished[oldest_id] self._completion_consumed.discard(oldest_id) + self._poll_observed.discard(oldest_id) - # Drop any _completion_consumed entries whose sessions are no longer - # tracked at all — belt-and-suspenders against module-lifetime growth - # on process-registry lookup paths that don't reach the dict prunes. + # Drop any _completion_consumed / _poll_observed entries whose sessions + # are no longer tracked at all — belt-and-suspenders against + # module-lifetime growth on registry lookup paths that don't reach the + # dict prunes. tracked = self._running.keys() | self._finished.keys() stale = self._completion_consumed - tracked if stale: self._completion_consumed -= stale + stale_polls = self._poll_observed - tracked + if stale_polls: + self._poll_observed -= stale_polls # ----- Checkpoint (crash recovery) -----