From f53a5a7fe1fa17514130b18e13ba220c64d9f609 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 12 Apr 2026 00:36:22 -0700 Subject: [PATCH] fix: suppress duplicate completion notifications when agent already consumed output via wait/poll/log (#8228) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the agent calls process(action='wait') or process(action='poll') and gets the exited status, the completion_queue notification is redundant — the agent already has the output from the tool return. Previously, the drain loops in CLI and gateway would still inject the [SYSTEM: Background process completed] message, causing the agent to receive the same information twice. Fix: track session IDs in _completion_consumed set when wait/poll/log returns an exited process. Drain loops in cli.py and gateway watcher skip completion events for consumed sessions. Watch pattern events are never suppressed (they have independent semantics). Adds 4 tests covering wait/poll/log marking and running-process negative case. --- cli.py | 15 ++++- gateway/run.py | 4 +- .../test_internal_event_bypass_pairing.py | 4 ++ tests/tools/test_notify_on_complete.py | 59 +++++++++++++++++++ tools/process_registry.py | 15 ++++- 5 files changed, 92 insertions(+), 5 deletions(-) diff --git a/cli.py b/cli.py index 55372bbb0..809be5131 100644 --- a/cli.py +++ b/cli.py @@ -9385,9 +9385,14 @@ class HermesCLI: from tools.process_registry import process_registry if not process_registry.completion_queue.empty(): evt = process_registry.completion_queue.get_nowait() - _synth = _format_process_notification(evt) - if _synth: - self._pending_input.put(_synth) + # Skip if the agent already consumed this via wait/poll/log + _evt_sid = evt.get("session_id", "") + if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid): + pass # already delivered via tool result + else: + _synth = _format_process_notification(evt) + if _synth: + self._pending_input.put(_synth) except Exception: pass continue @@ -9513,6 +9518,10 @@ class HermesCLI: from tools.process_registry import process_registry while not process_registry.completion_queue.empty(): evt = process_registry.completion_queue.get_nowait() + # Skip if the agent already consumed this via wait/poll/log + _evt_sid = evt.get("session_id", "") + if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid): + continue # already delivered via tool result _synth = _format_process_notification(evt) if _synth: self._pending_input.put(_synth) diff --git a/gateway/run.py b/gateway/run.py index 56573d58a..9cf39a160 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7009,7 +7009,9 @@ class GatewayRunner: if session.exited: # --- Agent-triggered completion: inject synthetic message --- - if agent_notify: + # Skip if the agent already consumed the result via wait/poll/log + from tools.process_registry import process_registry as _pr_check + if agent_notify and not _pr_check.is_completion_consumed(session_id): from tools.ansi_strip import strip_ansi _out = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else "" synth_text = ( diff --git a/tests/gateway/test_internal_event_bypass_pairing.py b/tests/gateway/test_internal_event_bypass_pairing.py index 46a96e5aa..1c3f9f0c9 100644 --- a/tests/gateway/test_internal_event_bypass_pairing.py +++ b/tests/gateway/test_internal_event_bypass_pairing.py @@ -28,12 +28,16 @@ class _FakeRegistry: def __init__(self, sessions): self._sessions = list(sessions) + self._completion_consumed: set = set() def get(self, session_id): if self._sessions: return self._sessions.pop(0) return None + def is_completion_consumed(self, session_id): + return session_id in self._completion_consumed + def _build_runner(monkeypatch, tmp_path) -> GatewayRunner: """Create a GatewayRunner with notifications set to 'all'.""" diff --git a/tests/tools/test_notify_on_complete.py b/tests/tools/test_notify_on_complete.py index 411f95f7e..64d198970 100644 --- a/tests/tools/test_notify_on_complete.py +++ b/tests/tools/test_notify_on_complete.py @@ -289,3 +289,62 @@ class TestCodeExecutionBlocked: def test_notify_on_complete_blocked_in_sandbox(self): from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS assert "notify_on_complete" in _TERMINAL_BLOCKED_PARAMS + + +# ========================================================================= +# Completion consumed suppression +# ========================================================================= + +class TestCompletionConsumed: + """Test that wait/poll/log suppress redundant completion notifications.""" + + def test_wait_marks_completion_consumed(self, registry): + """wait() returning exited status marks session as consumed.""" + s = _make_session(sid="proc_wait", notify_on_complete=True, output="done") + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + + # Notification is in the queue + assert not registry.completion_queue.empty() + assert not registry.is_completion_consumed("proc_wait") + + # Agent calls wait() — gets the result directly + result = registry.wait("proc_wait", timeout=1) + assert result["status"] == "exited" + + # Now the completion is marked as consumed + assert registry.is_completion_consumed("proc_wait") + + def test_poll_marks_completion_consumed(self, registry): + """poll() returning exited status marks session as consumed.""" + s = _make_session(sid="proc_poll", notify_on_complete=True, output="done") + s.exited = True + s.exit_code = 0 + registry._finished[s.id] = s + + result = registry.poll("proc_poll") + assert result["status"] == "exited" + assert registry.is_completion_consumed("proc_poll") + + def test_log_marks_completion_consumed(self, registry): + """read_log() on exited session marks as consumed.""" + s = _make_session(sid="proc_log", notify_on_complete=True, output="line1\nline2") + s.exited = True + s.exit_code = 0 + registry._finished[s.id] = s + + result = registry.read_log("proc_log") + assert result["status"] == "exited" + assert registry.is_completion_consumed("proc_log") + + def test_running_process_not_consumed(self, registry): + """poll() on a still-running process does not mark as consumed.""" + s = _make_session(sid="proc_running", notify_on_complete=True, output="partial") + registry._running[s.id] = s + + result = registry.poll("proc_running") + assert result["status"] == "running" + assert not registry.is_completion_consumed("proc_running") diff --git a/tools/process_registry.py b/tools/process_registry.py index 044a4e776..a5dbc3b1b 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -136,6 +136,10 @@ class ProcessRegistry: import queue as _queue_mod self.completion_queue: _queue_mod.Queue = _queue_mod.Queue() + # Track sessions whose completion was already consumed by the agent + # via wait/poll/log. Drain loops skip notifications for these. + self._completion_consumed: set = set() + @staticmethod def _clean_shell_noise(text: str) -> str: """Strip shell startup warnings from the beginning of output.""" @@ -613,6 +617,10 @@ class ProcessRegistry: # ----- Query Methods ----- + def is_completion_consumed(self, session_id: str) -> bool: + """Check if a completion notification was already consumed via wait/poll/log.""" + return session_id in self._completion_consumed + def get(self, session_id: str) -> Optional[ProcessSession]: """Get a session by ID (running or finished).""" with self._lock: @@ -640,6 +648,7 @@ class ProcessRegistry: } if session.exited: result["exit_code"] = session.exit_code + self._completion_consumed.add(session_id) if session.detached: result["detached"] = True result["note"] = "Process recovered after restart -- output history unavailable" @@ -665,13 +674,16 @@ class ProcessRegistry: else: selected = lines[offset:offset + limit] - return { + result = { "session_id": session.id, "status": "exited" if session.exited else "running", "output": "\n".join(selected), "total_lines": total_lines, "showing": f"{len(selected)} lines", } + if session.exited: + self._completion_consumed.add(session_id) + return result def wait(self, session_id: str, timeout: int = None) -> dict: """ @@ -714,6 +726,7 @@ class ProcessRegistry: while time.monotonic() < deadline: session = self._refresh_detached_session(session) if session.exited: + self._completion_consumed.add(session_id) result = { "status": "exited", "exit_code": session.exit_code,