fix(process): keep CLI drain dedup after poll goes read-only (#10156)

Follow-up to @de1tydev's poll-read-only fix. Removing the
_completion_consumed.add() from poll() fixes the gateway/tui watcher
suppression (#10156) but reintroduces the CLI duplicate that #8228 fixed:
a notify_on_complete process always enqueues a completion event, and the
CLI idle/post-turn drain would re-inject it as a [SYSTEM: ...] message
even though the agent already saw the exit inline in its poll result.

Add a separate _poll_observed set that poll() populates on an observed
exit. drain_notifications() (CLI only) skips poll-observed sessions; the
gateway/tui watchers keep checking only is_completion_consumed, so a
read-only poll never suppresses their autonomous delivery turn.

- _poll_observed pruned alongside _completion_consumed in _prune_if_needed
- 4 tests: CLI drain dedup after poll, gateway gate untouched, running
  poll doesn't mark observed, wait/log still skip CLI drain
This commit is contained in:
Teknium 2026-06-21 08:31:08 -07:00
parent 6f5f58e34b
commit 41ba90f814
2 changed files with 110 additions and 6 deletions

View file

@ -378,6 +378,72 @@ class TestCompletionConsumed:
assert result["status"] == "running"
assert not registry.is_completion_consumed("proc_running")
def test_poll_marks_poll_observed_for_cli_drain(self, registry):
"""poll() on an exited process records _poll_observed so the CLI drain
dedups (the agent already saw the exit inline) without marking the
session _completion_consumed (which would suppress the gateway watcher)."""
s = _make_session(sid="proc_pobs", notify_on_complete=True, output="done")
s.exited = True
s.exit_code = 0
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
# Completion is queued, nothing consumed/observed yet.
assert not registry.completion_queue.empty()
assert "proc_pobs" not in registry._poll_observed
assert not registry.is_completion_consumed("proc_pobs")
# Agent polls inline — read-only, so NOT _completion_consumed, but the
# exit was observed so the CLI drain must skip the queued completion.
assert registry.poll("proc_pobs")["status"] == "exited"
assert "proc_pobs" in registry._poll_observed
assert not registry.is_completion_consumed("proc_pobs")
# CLI drain skips it → no duplicate [SYSTEM: ...] injection (#8228).
drained = registry.drain_notifications()
assert drained == []
def test_poll_observed_does_not_suppress_gateway_watcher(self, registry):
"""The gateway/tui watcher gate (is_completion_consumed) must stay False
after a read-only poll, so the autonomous delivery turn still fires
even though the CLI drain was deduped (#10156)."""
s = _make_session(sid="proc_gw", notify_on_complete=True, output="done")
s.exited = True
s.exit_code = 0
registry._finished[s.id] = s
registry.poll("proc_gw")
# CLI-side dedup signal present...
assert "proc_gw" in registry._poll_observed
# ...but the gateway watcher gate is untouched, so it still delivers.
assert not registry.is_completion_consumed("proc_gw")
def test_running_poll_does_not_mark_poll_observed(self, registry):
"""poll() on a still-running process must not record _poll_observed."""
s = _make_session(sid="proc_run2", notify_on_complete=True, output="partial")
registry._running[s.id] = s
registry.poll("proc_run2")
assert "proc_run2" not in registry._poll_observed
def test_wait_and_log_still_skip_cli_drain(self, registry):
"""wait()/read_log() consume the output, so the CLI drain skips their
completions via _completion_consumed (the original #8228 contract)."""
for sid, action in (("proc_w", "wait"), ("proc_l", "log")):
s = _make_session(sid=sid, notify_on_complete=True, output="done")
s.exited = True
s.exit_code = 0
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
if action == "wait":
registry.wait(sid, timeout=1)
else:
registry.read_log(sid)
assert registry.is_completion_consumed(sid)
assert registry.drain_notifications() == []
# ---------------------------------------------------------------------------
# Silent-background-process hint

View file

@ -171,9 +171,21 @@ class ProcessRegistry:
self.completion_queue: _queue_mod.Queue = _queue_mod.Queue()
# Track sessions whose completion was already consumed by the agent
# via wait/poll/log. Drain loops skip notifications for these.
# via wait/log. Drain loops AND gateway/tui watchers skip notifications
# for these — a blocking wait() or a full read_log() means the agent
# has the output in hand and is acting on it this turn.
self._completion_consumed: set = set()
# Track sessions the agent merely *observed* exited via poll(). poll()
# is a read-only status check, so it does NOT mark _completion_consumed
# (that would let a status check suppress the gateway/tui watcher's
# autonomous delivery turn — #10156). But on the CLI the poll result
# is returned inline in the same turn, so the idle/post-turn drain must
# still skip the queued completion to avoid a duplicate [SYSTEM: ...]
# injection (the bug #8228 originally fixed). drain_notifications()
# consults this set; the gateway/tui watchers deliberately do NOT.
self._poll_observed: set = set()
# Global watch-match circuit breaker — across all sessions.
# Prevents sibling processes from collectively flooding the user even
# when each stays under its own per-session cap.
@ -911,11 +923,25 @@ class ProcessRegistry:
"""Check if a completion notification was already consumed via wait/log."""
return session_id in self._completion_consumed
def _drain_should_skip(self, session_id: str) -> bool:
"""Whether the CLI drain should skip a completion event for this session.
Skips when the agent has either truly consumed the output (wait/log
``_completion_consumed``) or observed the exit inline via poll()
(``_poll_observed``). In both cases the CLI agent already has the
result this turn, so injecting a [SYSTEM: ...] completion would be a
duplicate (#8228). The gateway/tui watchers do NOT use this — they
check only ``is_completion_consumed`` so a read-only poll never
suppresses their autonomous delivery turn (#10156).
"""
return session_id in self._completion_consumed or session_id in self._poll_observed
def drain_notifications(self) -> "list[tuple[dict, str]]":
"""Pop all pending notification events and return formatted pairs.
Returns a list of (raw_event, formatted_text) tuples.
Skips completion events that were already consumed via wait/poll/log.
Skips completion events the agent already consumed via wait/log or
observed inline via poll() (see ``_drain_should_skip``).
"""
results = []
while not self.completion_queue.empty():
@ -924,7 +950,7 @@ class ProcessRegistry:
except Exception:
break
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and self.is_completion_consumed(_evt_sid):
if evt.get("type") == "completion" and self._drain_should_skip(_evt_sid):
continue
text = format_process_notification(evt)
if text:
@ -1043,6 +1069,12 @@ class ProcessRegistry:
# represent actual output consumption and do mark it. Marking
# consumed here would let a status check silently suppress the
# notify_on_complete watcher's autonomous delivery turn (#10156).
#
# We DO record it in _poll_observed so the CLI's inline drain still
# dedups (the agent already saw the exit in this turn's poll result)
# without affecting the gateway/tui watchers, which only consult
# _completion_consumed.
self._poll_observed.add(session_id)
if session.detached:
result["detached"] = True
result["note"] = "Process recovered after restart -- output history unavailable"
@ -1398,6 +1430,7 @@ class ProcessRegistry:
for sid in expired:
del self._finished[sid]
self._completion_consumed.discard(sid)
self._poll_observed.discard(sid)
# If still over limit, remove oldest finished
total = len(self._running) + len(self._finished)
@ -1405,14 +1438,19 @@ class ProcessRegistry:
oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
del self._finished[oldest_id]
self._completion_consumed.discard(oldest_id)
self._poll_observed.discard(oldest_id)
# Drop any _completion_consumed entries whose sessions are no longer
# tracked at all — belt-and-suspenders against module-lifetime growth
# on process-registry lookup paths that don't reach the dict prunes.
# Drop any _completion_consumed / _poll_observed entries whose sessions
# are no longer tracked at all — belt-and-suspenders against
# module-lifetime growth on registry lookup paths that don't reach the
# dict prunes.
tracked = self._running.keys() | self._finished.keys()
stale = self._completion_consumed - tracked
if stale:
self._completion_consumed -= stale
stale_polls = self._poll_observed - tracked
if stale_polls:
self._poll_observed -= stale_polls
# ----- Checkpoint (crash recovery) -----