fix(tui): autonomous background process completion notifications (#26071) (#26327)

* feat(process-registry): add format_process_notification shared helper

* feat(process-registry): add drain_notifications method

* refactor(cli): use shared drain_notifications and format_process_notification

* feat(tui): add background notification poller for completion_queue

* feat(tui): wire notification poller into session init/finalize

* refactor(tui): add post-turn drain using shared helper as safety net
This commit is contained in:
Siddharth Balyan 2026-05-15 19:31:00 +05:30 committed by GitHub
parent db84a78e61
commit d5416284f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 486 additions and 55 deletions

59
cli.py
View file

@ -1965,43 +1965,7 @@ def _resolve_attachment_path(raw_path: str) -> Path | None:
return resolved
def _format_process_notification(evt: dict) -> "str | None":
"""Format a process notification event into a [IMPORTANT: ...] message.
Handles both completion events (notify_on_complete) and watch pattern
match events from the unified completion_queue.
"""
evt_type = evt.get("type", "completion")
_sid = evt.get("session_id", "unknown")
_cmd = evt.get("command", "unknown")
if evt_type == "watch_disabled":
return f"[IMPORTANT: {evt.get('message', '')}]"
if evt_type == "watch_match":
_pat = evt.get("pattern", "?")
_out = evt.get("output", "")
_sup = evt.get("suppressed", 0)
text = (
f"[IMPORTANT: Background process {_sid} matched "
f"watch pattern \"{_pat}\".\n"
f"Command: {_cmd}\n"
f"Matched output:\n{_out}"
)
if _sup:
text += f"\n({_sup} earlier matches were suppressed by rate limit)"
text += "]"
return text
# Default: completion event
_exit = evt.get("exit_code", "?")
_out = evt.get("output", "")
return (
f"[IMPORTANT: Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)
def _detect_file_drop(user_input: str) -> "dict | None":
@ -13518,16 +13482,8 @@ class HermesCLI:
# and watch pattern matches) while agent is idle.
try:
from tools.process_registry import process_registry
if not process_registry.completion_queue.empty():
evt = process_registry.completion_queue.get_nowait()
# Skip if the agent already consumed this via wait/poll/log
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
pass # already delivered via tool result
else:
_synth = _format_process_notification(evt)
if _synth:
self._pending_input.put(_synth)
for _evt, _synth in process_registry.drain_notifications():
self._pending_input.put(_synth)
except Exception:
pass
continue
@ -13635,15 +13591,8 @@ class HermesCLI:
# that arrived while the agent was running.
try:
from tools.process_registry import process_registry
while not process_registry.completion_queue.empty():
evt = process_registry.completion_queue.get_nowait()
# Skip if the agent already consumed this via wait/poll/log
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
continue # already delivered via tool result
_synth = _format_process_notification(evt)
if _synth:
self._pending_input.put(_synth)
for _evt, _synth in process_registry.drain_notifications():
self._pending_input.put(_synth)
except Exception:
pass # Non-fatal — don't break the main loop

View file

@ -4649,3 +4649,158 @@ def test_config_show_displays_nested_max_turns(monkeypatch):
)
assert ["Max Turns", "120"] in agent_rows
def test_notification_poller_delivers_completion(monkeypatch):
"""Poller picks up completion events and triggers agent turns."""
from tools.process_registry import process_registry
turns = []
emitted = []
class _Agent:
def run_conversation(self, prompt, conversation_history=None, stream_callback=None):
turns.append(prompt)
return {
"final_response": "ok",
"messages": [{"role": "assistant", "content": "ok"}],
}
class _ImmediateThread:
def __init__(self, target=None, daemon=None):
self._target = target
def start(self):
self._target()
sess = _session(agent=_Agent())
server._sessions["sid_poll"] = sess
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a))
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
# Clear queue
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
process_registry._completion_consumed.discard("proc_poller_test")
stop = threading.Event()
# Put event on queue, then immediately signal stop so the poller
# runs exactly one iteration.
process_registry.completion_queue.put({
"type": "completion",
"session_id": "proc_poller_test",
"command": "echo hello",
"exit_code": 0,
"output": "hello",
})
stop.set()
try:
server._notification_poller_loop(stop, "sid_poll", sess)
# Should have emitted a status.update with kind=process
status_calls = [a for a in emitted if a[0] == "status.update"]
assert len(status_calls) >= 1
assert status_calls[0][2]["kind"] == "process"
# Should have triggered an agent turn
assert len(turns) == 1
assert "[IMPORTANT: Background process proc_poller_test completed" in turns[0]
finally:
server._sessions.pop("sid_poll", None)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def test_notification_poller_skips_consumed(monkeypatch):
"""Already-consumed completions are not dispatched by the poller."""
from tools.process_registry import process_registry
turns = []
class _Agent:
def run_conversation(self, prompt, conversation_history=None, stream_callback=None):
turns.append(prompt)
return {"final_response": "ok", "messages": []}
class _ImmediateThread:
def __init__(self, target=None, daemon=None):
self._target = target
def start(self):
self._target()
sess = _session(agent=_Agent())
server._sessions["sid_skip"] = sess
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
process_registry._completion_consumed.add("proc_already_done")
process_registry.completion_queue.put({
"type": "completion",
"session_id": "proc_already_done",
"command": "echo x",
"exit_code": 0,
"output": "x",
})
stop = threading.Event()
stop.set()
try:
server._notification_poller_loop(stop, "sid_skip", sess)
assert len(turns) == 0
finally:
server._sessions.pop("sid_skip", None)
process_registry._completion_consumed.discard("proc_already_done")
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def test_notification_poller_requeues_when_busy(monkeypatch):
"""When the agent is busy, the poller requeues the event."""
from tools.process_registry import process_registry
emitted = []
sess = _session(running=True) # agent is busy
server._sessions["sid_busy"] = sess
monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a))
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
process_registry._completion_consumed.discard("proc_busy_test")
evt = {
"type": "completion",
"session_id": "proc_busy_test",
"command": "make build",
"exit_code": 0,
"output": "ok",
}
process_registry.completion_queue.put(evt)
stop = threading.Event()
stop.set()
try:
server._notification_poller_loop(stop, "sid_busy", sess)
# Status update was emitted (user sees it)
status_calls = [a for a in emitted if a[0] == "status.update"]
assert len(status_calls) == 1
# Event was requeued (agent was busy, no turn triggered)
assert not process_registry.completion_queue.empty()
requeued = process_registry.completion_queue.get_nowait()
assert requeued["session_id"] == "proc_busy_test"
finally:
server._sessions.pop("sid_busy", None)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()

View file

@ -865,3 +865,138 @@ class TestProcessToolHandler:
from tools.process_registry import _handle_process
result = json.loads(_handle_process({"action": "unknown_action"}))
assert "error" in result
# =========================================================================
# format_process_notification + drain_notifications (shared helpers)
# =========================================================================
from tools.process_registry import format_process_notification
def test_format_completion_event():
evt = {
"type": "completion",
"session_id": "proc_abc",
"command": "sleep 5",
"exit_code": 0,
"output": "done",
}
result = format_process_notification(evt)
assert "[IMPORTANT: Background process proc_abc completed" in result
assert "exit code 0" in result
assert "Command: sleep 5" in result
assert "Output:\ndone]" in result
def test_format_watch_match_event():
evt = {
"type": "watch_match",
"session_id": "proc_xyz",
"command": "tail -f log",
"pattern": "ERROR",
"output": "ERROR: disk full",
"suppressed": 0,
}
result = format_process_notification(evt)
assert 'watch pattern "ERROR"' in result
assert "Matched output:\nERROR: disk full" in result
def test_format_watch_match_with_suppressed():
evt = {
"type": "watch_match",
"session_id": "proc_xyz",
"command": "tail -f log",
"pattern": "WARN",
"output": "WARN: low mem",
"suppressed": 3,
}
result = format_process_notification(evt)
assert "3 earlier matches were suppressed" in result
def test_format_watch_disabled_event():
evt = {
"type": "watch_disabled",
"message": "Watch disabled for proc_xyz: too many matches",
}
result = format_process_notification(evt)
assert "[IMPORTANT: Watch disabled for proc_xyz" in result
def test_format_returns_none_for_empty_event():
evt = {}
result = format_process_notification(evt)
assert result is not None
assert "unknown" in result
def test_drain_notifications_returns_pending_events():
from tools.process_registry import process_registry
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
process_registry.completion_queue.put({
"type": "completion",
"session_id": "proc_drain1",
"command": "echo hi",
"exit_code": 0,
"output": "hi",
})
process_registry.completion_queue.put({
"type": "watch_match",
"session_id": "proc_drain2",
"command": "tail -f x",
"pattern": "ERR",
"output": "ERR found",
"suppressed": 0,
})
try:
results = process_registry.drain_notifications()
assert len(results) == 2
assert results[0][0]["session_id"] == "proc_drain1"
assert "proc_drain1 completed" in results[0][1]
assert results[1][0]["session_id"] == "proc_drain2"
assert "watch pattern" in results[1][1]
finally:
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
process_registry._completion_consumed.discard("proc_drain1")
process_registry._completion_consumed.discard("proc_drain2")
def test_drain_notifications_skips_consumed():
from tools.process_registry import process_registry
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
process_registry._completion_consumed.add("proc_consumed")
process_registry.completion_queue.put({
"type": "completion",
"session_id": "proc_consumed",
"command": "echo done",
"exit_code": 0,
"output": "done",
})
try:
results = process_registry.drain_notifications()
assert len(results) == 0
finally:
process_registry._completion_consumed.discard("proc_consumed")
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def test_drain_notifications_empty_queue():
from tools.process_registry import process_registry
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
results = process_registry.drain_notifications()
assert results == []

View file

@ -826,6 +826,26 @@ class ProcessRegistry:
"""Check if a completion notification was already consumed via wait/poll/log."""
return session_id in self._completion_consumed
def drain_notifications(self) -> "list[tuple[dict, str]]":
"""Pop all pending notification events and return formatted pairs.
Returns a list of (raw_event, formatted_text) tuples.
Skips completion events that were already consumed via wait/poll/log.
"""
results = []
while not self.completion_queue.empty():
try:
evt = self.completion_queue.get_nowait()
except Exception:
break
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and self.is_completion_consumed(_evt_sid):
continue
text = format_process_notification(evt)
if text:
results.append((evt, text))
return results
def get(self, session_id: str) -> Optional[ProcessSession]:
"""Get a session by ID (running or finished)."""
with self._lock:
@ -1388,6 +1408,44 @@ class ProcessRegistry:
process_registry = ProcessRegistry()
def format_process_notification(evt: dict) -> "str | None":
"""Format a process notification event into a [IMPORTANT: ...] message.
Handles completion events (notify_on_complete), watch pattern matches,
and watch disabled events from the unified completion_queue.
"""
evt_type = evt.get("type", "completion")
_sid = evt.get("session_id", "unknown")
_cmd = evt.get("command", "unknown")
if evt_type == "watch_disabled":
return f"[IMPORTANT: {evt.get('message', '')}]"
if evt_type == "watch_match":
_pat = evt.get("pattern", "?")
_out = evt.get("output", "")
_sup = evt.get("suppressed", 0)
text = (
f"[IMPORTANT: Background process {_sid} matched "
f"watch pattern \"{_pat}\".\n"
f"Command: {_cmd}\n"
f"Matched output:\n{_out}"
)
if _sup:
text += f"\n({_sup} earlier matches were suppressed by rate limit)"
text += "]"
return text
_exit = evt.get("exit_code", "?")
_out = evt.get("output", "")
return (
f"[IMPORTANT: Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)
# ---------------------------------------------------------------------------
# Registry -- the "process" tool schema + handler
# ---------------------------------------------------------------------------

View file

@ -287,6 +287,9 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No
if not session or session.get("_finalized"):
return
session["_finalized"] = True
stop_event = session.get("_notif_stop")
if stop_event is not None:
stop_event.set()
agent = session.get("agent")
lock = session.get("history_lock")
@ -579,6 +582,7 @@ def _start_agent_build(sid: str, session: dict) -> None:
pass
_wire_callbacks(sid)
_sessions[sid]["_notif_stop"] = _start_notification_poller(sid, _sessions[sid])
_notify_session_boundary("on_session_reset", key)
info = _session_info(agent)
@ -1955,6 +1959,7 @@ def _init_session(sid: str, key: str, agent, history: list, cols: int = 80):
# session startup resilient).
pass
_wire_callbacks(sid)
_sessions[sid]["_notif_stop"] = _start_notification_poller(sid, _sessions[sid])
_notify_session_boundary("on_session_reset", key)
_emit("session.info", sid, _session_info(agent))
@ -3027,6 +3032,105 @@ def _(rid, params: dict) -> dict:
return _ok(rid, {"status": "streaming"})
def _notification_poller_loop(
stop_event: threading.Event, sid: str, session: dict
) -> None:
"""Poll completion_queue and dispatch notifications autonomously.
Runs in a daemon thread started by _init_session(). Emits a
status.update (kind=process) for user visibility, then chains an
agent turn via _run_prompt_submit if the session is idle.
NOTE: The completion_queue is global (one per process). If multiple
TUI sessions coexist, whichever poller wakes first grabs the event,
even if the process was started by a different session. This matches
CLI/gateway behavior (single session per process).
"""
from tools.process_registry import process_registry, format_process_notification
while not stop_event.is_set() and not session.get("_finalized"):
try:
evt = process_registry.completion_queue.get(timeout=0.5)
except Exception:
continue
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
continue
text = format_process_notification(evt)
if not text:
continue
_emit("status.update", sid, {"kind": "process", "text": text})
with session["history_lock"]:
if session.get("running"):
process_registry.completion_queue.put(evt)
continue
session["running"] = True
rid = f"__notif__{int(time.time() * 1000)}"
try:
_emit("message.start", sid)
_run_prompt_submit(rid, sid, session, text)
except Exception as exc:
print(
f"[tui_gateway] notification poller dispatch failed: "
f"{type(exc).__name__}: {exc}",
file=sys.stderr,
)
with session["history_lock"]:
session["running"] = False
# Drain any remaining events after stop signal (process all pending
# before exiting so nothing is lost on shutdown).
while not process_registry.completion_queue.empty():
try:
evt = process_registry.completion_queue.get_nowait()
except Exception:
break
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
continue
text = format_process_notification(evt)
if not text:
continue
_emit("status.update", sid, {"kind": "process", "text": text})
with session["history_lock"]:
if session.get("running"):
process_registry.completion_queue.put(evt)
break
session["running"] = True
rid = f"__notif__{int(time.time() * 1000)}"
try:
_emit("message.start", sid)
_run_prompt_submit(rid, sid, session, text)
except Exception as exc:
print(
f"[tui_gateway] notification poller dispatch failed: "
f"{type(exc).__name__}: {exc}",
file=sys.stderr,
)
with session["history_lock"]:
session["running"] = False
def _start_notification_poller(sid: str, session: dict) -> threading.Event:
"""Start the background notification poller for a TUI session."""
stop = threading.Event()
t = threading.Thread(
target=_notification_poller_loop,
args=(stop, sid, session),
daemon=True,
)
t.start()
return stop
def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None:
with session["history_lock"]:
history = list(session["history"])
@ -3385,6 +3489,36 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None:
with session["history_lock"]:
session["running"] = False
# Drain completion notifications that arrived during this turn.
# The background poller handles between-turn delivery; this is
# the safety net for events that arrived mid-turn.
try:
from tools.process_registry import process_registry
for _evt, synth in process_registry.drain_notifications():
with session["history_lock"]:
if session.get("running"):
process_registry.completion_queue.put(_evt)
break
session["running"] = True
try:
_emit("message.start", sid)
_run_prompt_submit(rid, sid, session, synth)
except Exception as _n_exc:
print(
f"[tui_gateway] completion notification dispatch failed: "
f"{type(_n_exc).__name__}: {_n_exc}",
file=sys.stderr,
)
with session["history_lock"]:
session["running"] = False
except Exception as _drain_exc:
print(
f"[tui_gateway] completion queue drain failed: "
f"{type(_drain_exc).__name__}: {_drain_exc}",
file=sys.stderr,
)
threading.Thread(target=run, daemon=True).start()