mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: suppress duplicate completion notifications when agent already consumed output via wait/poll/log (#8228)
When the agent calls process(action='wait') or process(action='poll') and gets the exited status, the completion_queue notification is redundant — the agent already has the output from the tool return. Previously, the drain loops in CLI and gateway would still inject the [SYSTEM: Background process completed] message, causing the agent to receive the same information twice. Fix: track session IDs in _completion_consumed set when wait/poll/log returns an exited process. Drain loops in cli.py and gateway watcher skip completion events for consumed sessions. Watch pattern events are never suppressed (they have independent semantics). Adds 4 tests covering wait/poll/log marking and running-process negative case.
This commit is contained in:
parent
fdf55e0fe9
commit
f53a5a7fe1
5 changed files with 92 additions and 5 deletions
15
cli.py
15
cli.py
|
|
@ -9385,9 +9385,14 @@ class HermesCLI:
|
|||
from tools.process_registry import process_registry
|
||||
if not process_registry.completion_queue.empty():
|
||||
evt = process_registry.completion_queue.get_nowait()
|
||||
_synth = _format_process_notification(evt)
|
||||
if _synth:
|
||||
self._pending_input.put(_synth)
|
||||
# Skip if the agent already consumed this via wait/poll/log
|
||||
_evt_sid = evt.get("session_id", "")
|
||||
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
|
||||
pass # already delivered via tool result
|
||||
else:
|
||||
_synth = _format_process_notification(evt)
|
||||
if _synth:
|
||||
self._pending_input.put(_synth)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
|
@ -9513,6 +9518,10 @@ class HermesCLI:
|
|||
from tools.process_registry import process_registry
|
||||
while not process_registry.completion_queue.empty():
|
||||
evt = process_registry.completion_queue.get_nowait()
|
||||
# Skip if the agent already consumed this via wait/poll/log
|
||||
_evt_sid = evt.get("session_id", "")
|
||||
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
|
||||
continue # already delivered via tool result
|
||||
_synth = _format_process_notification(evt)
|
||||
if _synth:
|
||||
self._pending_input.put(_synth)
|
||||
|
|
|
|||
|
|
@ -7009,7 +7009,9 @@ class GatewayRunner:
|
|||
|
||||
if session.exited:
|
||||
# --- Agent-triggered completion: inject synthetic message ---
|
||||
if agent_notify:
|
||||
# Skip if the agent already consumed the result via wait/poll/log
|
||||
from tools.process_registry import process_registry as _pr_check
|
||||
if agent_notify and not _pr_check.is_completion_consumed(session_id):
|
||||
from tools.ansi_strip import strip_ansi
|
||||
_out = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
|
||||
synth_text = (
|
||||
|
|
|
|||
|
|
@ -28,12 +28,16 @@ class _FakeRegistry:
|
|||
|
||||
def __init__(self, sessions):
|
||||
self._sessions = list(sessions)
|
||||
self._completion_consumed: set = set()
|
||||
|
||||
def get(self, session_id):
|
||||
if self._sessions:
|
||||
return self._sessions.pop(0)
|
||||
return None
|
||||
|
||||
def is_completion_consumed(self, session_id):
|
||||
return session_id in self._completion_consumed
|
||||
|
||||
|
||||
def _build_runner(monkeypatch, tmp_path) -> GatewayRunner:
|
||||
"""Create a GatewayRunner with notifications set to 'all'."""
|
||||
|
|
|
|||
|
|
@ -289,3 +289,62 @@ class TestCodeExecutionBlocked:
|
|||
def test_notify_on_complete_blocked_in_sandbox(self):
|
||||
from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS
|
||||
assert "notify_on_complete" in _TERMINAL_BLOCKED_PARAMS
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Completion consumed suppression
|
||||
# =========================================================================
|
||||
|
||||
class TestCompletionConsumed:
|
||||
"""Test that wait/poll/log suppress redundant completion notifications."""
|
||||
|
||||
def test_wait_marks_completion_consumed(self, registry):
|
||||
"""wait() returning exited status marks session as consumed."""
|
||||
s = _make_session(sid="proc_wait", notify_on_complete=True, output="done")
|
||||
s.exited = True
|
||||
s.exit_code = 0
|
||||
registry._running[s.id] = s
|
||||
with patch.object(registry, "_write_checkpoint"):
|
||||
registry._move_to_finished(s)
|
||||
|
||||
# Notification is in the queue
|
||||
assert not registry.completion_queue.empty()
|
||||
assert not registry.is_completion_consumed("proc_wait")
|
||||
|
||||
# Agent calls wait() — gets the result directly
|
||||
result = registry.wait("proc_wait", timeout=1)
|
||||
assert result["status"] == "exited"
|
||||
|
||||
# Now the completion is marked as consumed
|
||||
assert registry.is_completion_consumed("proc_wait")
|
||||
|
||||
def test_poll_marks_completion_consumed(self, registry):
|
||||
"""poll() returning exited status marks session as consumed."""
|
||||
s = _make_session(sid="proc_poll", notify_on_complete=True, output="done")
|
||||
s.exited = True
|
||||
s.exit_code = 0
|
||||
registry._finished[s.id] = s
|
||||
|
||||
result = registry.poll("proc_poll")
|
||||
assert result["status"] == "exited"
|
||||
assert registry.is_completion_consumed("proc_poll")
|
||||
|
||||
def test_log_marks_completion_consumed(self, registry):
|
||||
"""read_log() on exited session marks as consumed."""
|
||||
s = _make_session(sid="proc_log", notify_on_complete=True, output="line1\nline2")
|
||||
s.exited = True
|
||||
s.exit_code = 0
|
||||
registry._finished[s.id] = s
|
||||
|
||||
result = registry.read_log("proc_log")
|
||||
assert result["status"] == "exited"
|
||||
assert registry.is_completion_consumed("proc_log")
|
||||
|
||||
def test_running_process_not_consumed(self, registry):
|
||||
"""poll() on a still-running process does not mark as consumed."""
|
||||
s = _make_session(sid="proc_running", notify_on_complete=True, output="partial")
|
||||
registry._running[s.id] = s
|
||||
|
||||
result = registry.poll("proc_running")
|
||||
assert result["status"] == "running"
|
||||
assert not registry.is_completion_consumed("proc_running")
|
||||
|
|
|
|||
|
|
@ -136,6 +136,10 @@ class ProcessRegistry:
|
|||
import queue as _queue_mod
|
||||
self.completion_queue: _queue_mod.Queue = _queue_mod.Queue()
|
||||
|
||||
# Track sessions whose completion was already consumed by the agent
|
||||
# via wait/poll/log. Drain loops skip notifications for these.
|
||||
self._completion_consumed: set = set()
|
||||
|
||||
@staticmethod
|
||||
def _clean_shell_noise(text: str) -> str:
|
||||
"""Strip shell startup warnings from the beginning of output."""
|
||||
|
|
@ -613,6 +617,10 @@ class ProcessRegistry:
|
|||
|
||||
# ----- Query Methods -----
|
||||
|
||||
def is_completion_consumed(self, session_id: str) -> bool:
|
||||
"""Check if a completion notification was already consumed via wait/poll/log."""
|
||||
return session_id in self._completion_consumed
|
||||
|
||||
def get(self, session_id: str) -> Optional[ProcessSession]:
|
||||
"""Get a session by ID (running or finished)."""
|
||||
with self._lock:
|
||||
|
|
@ -640,6 +648,7 @@ class ProcessRegistry:
|
|||
}
|
||||
if session.exited:
|
||||
result["exit_code"] = session.exit_code
|
||||
self._completion_consumed.add(session_id)
|
||||
if session.detached:
|
||||
result["detached"] = True
|
||||
result["note"] = "Process recovered after restart -- output history unavailable"
|
||||
|
|
@ -665,13 +674,16 @@ class ProcessRegistry:
|
|||
else:
|
||||
selected = lines[offset:offset + limit]
|
||||
|
||||
return {
|
||||
result = {
|
||||
"session_id": session.id,
|
||||
"status": "exited" if session.exited else "running",
|
||||
"output": "\n".join(selected),
|
||||
"total_lines": total_lines,
|
||||
"showing": f"{len(selected)} lines",
|
||||
}
|
||||
if session.exited:
|
||||
self._completion_consumed.add(session_id)
|
||||
return result
|
||||
|
||||
def wait(self, session_id: str, timeout: int = None) -> dict:
|
||||
"""
|
||||
|
|
@ -714,6 +726,7 @@ class ProcessRegistry:
|
|||
while time.monotonic() < deadline:
|
||||
session = self._refresh_detached_session(session)
|
||||
if session.exited:
|
||||
self._completion_consumed.add(session_id)
|
||||
result = {
|
||||
"status": "exited",
|
||||
"exit_code": session.exit_code,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue