mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
feat: background process monitoring — watch_patterns for real-time output alerts
* feat: add watch_patterns to background processes for output monitoring
Adds a new 'watch_patterns' parameter to terminal(background=true) that
lets the agent specify strings to watch for in process output. When a
matching line appears, a notification is queued and injected as a
synthetic message — triggering a new agent turn, similar to
notify_on_complete but mid-process.
Implementation:
- ProcessSession gets watch_patterns field + rate-limit state
- _check_watch_patterns() in ProcessRegistry scans new output chunks
from all three reader threads (local, PTY, env-poller)
- Rate limited: max 8 notifications per 10s window
- Sustained overload (45s) permanently disables watching for that process
- watch_queue alongside completion_queue, same consumption pattern
- CLI drains watch_queue in both idle loop and post-turn drain
- Gateway drains after agent runs via _inject_watch_notification()
- Checkpoint persistence + crash recovery includes watch_patterns
- Blocked in execute_code sandbox (like other bg params)
- 20 new tests covering matching, rate limiting, overload kill,
checkpoint persistence, schema, and handler passthrough
Usage:
terminal(
command='npm run dev',
background=true,
watch_patterns=['ERROR', 'WARN', 'listening on port']
)
* refactor: merge watch_queue into completion_queue
Unified queue with 'type' field distinguishing 'completion',
'watch_match', and 'watch_disabled' events. Extracted
_format_process_notification() in CLI and gateway to handle
all event types in a single drain loop. Removes duplication
across both CLI drain sites and the gateway.
This commit is contained in:
parent
a2f9f04c06
commit
f459214010
6 changed files with 556 additions and 35 deletions
81
cli.py
81
cli.py
|
|
@ -1171,6 +1171,45 @@ def _resolve_attachment_path(raw_path: str) -> Path | None:
|
|||
return resolved
|
||||
|
||||
|
||||
def _format_process_notification(evt: dict) -> "str | None":
|
||||
"""Format a process notification event into a [SYSTEM: ...] message.
|
||||
|
||||
Handles both completion events (notify_on_complete) and watch pattern
|
||||
match events from the unified completion_queue.
|
||||
"""
|
||||
evt_type = evt.get("type", "completion")
|
||||
_sid = evt.get("session_id", "unknown")
|
||||
_cmd = evt.get("command", "unknown")
|
||||
|
||||
if evt_type == "watch_disabled":
|
||||
return f"[SYSTEM: {evt.get('message', '')}]"
|
||||
|
||||
if evt_type == "watch_match":
|
||||
_pat = evt.get("pattern", "?")
|
||||
_out = evt.get("output", "")
|
||||
_sup = evt.get("suppressed", 0)
|
||||
text = (
|
||||
f"[SYSTEM: Background process {_sid} matched "
|
||||
f"watch pattern \"{_pat}\".\n"
|
||||
f"Command: {_cmd}\n"
|
||||
f"Matched output:\n{_out}"
|
||||
)
|
||||
if _sup:
|
||||
text += f"\n({_sup} earlier matches were suppressed by rate limit)"
|
||||
text += "]"
|
||||
return text
|
||||
|
||||
# Default: completion event
|
||||
_exit = evt.get("exit_code", "?")
|
||||
_out = evt.get("output", "")
|
||||
return (
|
||||
f"[SYSTEM: Background process {_sid} completed "
|
||||
f"(exit code {_exit}).\n"
|
||||
f"Command: {_cmd}\n"
|
||||
f"Output:\n{_out}]"
|
||||
)
|
||||
|
||||
|
||||
def _detect_file_drop(user_input: str) -> "dict | None":
|
||||
"""Detect if *user_input* starts with a real local file path.
|
||||
|
||||
|
|
@ -8870,23 +8909,15 @@ class HermesCLI:
|
|||
# Periodic config watcher — auto-reload MCP on mcp_servers change
|
||||
if not self._agent_running:
|
||||
self._check_config_mcp_changes()
|
||||
# Check for background process completion notifications
|
||||
# while the agent is idle (user hasn't typed anything yet).
|
||||
# Check for background process notifications (completions
|
||||
# and watch pattern matches) while agent is idle.
|
||||
try:
|
||||
from tools.process_registry import process_registry
|
||||
if not process_registry.completion_queue.empty():
|
||||
completion = process_registry.completion_queue.get_nowait()
|
||||
_exit = completion.get("exit_code", "?")
|
||||
_cmd = completion.get("command", "unknown")
|
||||
_sid = completion.get("session_id", "unknown")
|
||||
_out = completion.get("output", "")
|
||||
_synth = (
|
||||
f"[SYSTEM: Background process {_sid} completed "
|
||||
f"(exit code {_exit}).\n"
|
||||
f"Command: {_cmd}\n"
|
||||
f"Output:\n{_out}]"
|
||||
)
|
||||
self._pending_input.put(_synth)
|
||||
evt = process_registry.completion_queue.get_nowait()
|
||||
_synth = _format_process_notification(evt)
|
||||
if _synth:
|
||||
self._pending_input.put(_synth)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
|
@ -9004,25 +9035,15 @@ class HermesCLI:
|
|||
_cprint(f"{_DIM}Voice auto-restart failed: {e}{_RST}")
|
||||
threading.Thread(target=_restart_recording, daemon=True).start()
|
||||
|
||||
# Drain process completion notifications — any background
|
||||
# process that finished with notify_on_complete while the
|
||||
# agent was running (or before) gets auto-injected as a
|
||||
# new user message so the agent can react to it.
|
||||
# Drain process notifications (completions + watch matches)
|
||||
# that arrived while the agent was running.
|
||||
try:
|
||||
from tools.process_registry import process_registry
|
||||
while not process_registry.completion_queue.empty():
|
||||
completion = process_registry.completion_queue.get_nowait()
|
||||
_exit = completion.get("exit_code", "?")
|
||||
_cmd = completion.get("command", "unknown")
|
||||
_sid = completion.get("session_id", "unknown")
|
||||
_out = completion.get("output", "")
|
||||
_synth = (
|
||||
f"[SYSTEM: Background process {_sid} completed "
|
||||
f"(exit code {_exit}).\n"
|
||||
f"Command: {_cmd}\n"
|
||||
f"Output:\n{_out}]"
|
||||
)
|
||||
self._pending_input.put(_synth)
|
||||
evt = process_registry.completion_queue.get_nowait()
|
||||
_synth = _format_process_notification(evt)
|
||||
if _synth:
|
||||
self._pending_input.put(_synth)
|
||||
except Exception:
|
||||
pass # Non-fatal — don't break the main loop
|
||||
|
||||
|
|
|
|||
|
|
@ -476,6 +476,33 @@ def _resolve_hermes_bin() -> Optional[list[str]]:
|
|||
return None
|
||||
|
||||
|
||||
def _format_gateway_process_notification(evt: dict) -> "str | None":
|
||||
"""Format a watch pattern event from completion_queue into a [SYSTEM:] message."""
|
||||
evt_type = evt.get("type", "completion")
|
||||
_sid = evt.get("session_id", "unknown")
|
||||
_cmd = evt.get("command", "unknown")
|
||||
|
||||
if evt_type == "watch_disabled":
|
||||
return f"[SYSTEM: {evt.get('message', '')}]"
|
||||
|
||||
if evt_type == "watch_match":
|
||||
_pat = evt.get("pattern", "?")
|
||||
_out = evt.get("output", "")
|
||||
_sup = evt.get("suppressed", 0)
|
||||
text = (
|
||||
f"[SYSTEM: Background process {_sid} matched "
|
||||
f"watch pattern \"{_pat}\".\n"
|
||||
f"Command: {_cmd}\n"
|
||||
f"Matched output:\n{_out}"
|
||||
)
|
||||
if _sup:
|
||||
text += f"\n({_sup} earlier matches were suppressed by rate limit)"
|
||||
text += "]"
|
||||
return text
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class GatewayRunner:
|
||||
"""
|
||||
Main gateway controller.
|
||||
|
|
@ -3430,6 +3457,29 @@ class GatewayRunner:
|
|||
except Exception as e:
|
||||
logger.error("Process watcher setup error: %s", e)
|
||||
|
||||
# Drain watch pattern notifications that arrived during the agent run.
|
||||
# Watch events and completions share the same queue; completions are
|
||||
# already handled by the per-process watcher task above, so we only
|
||||
# inject watch-type events here.
|
||||
try:
|
||||
from tools.process_registry import process_registry as _pr
|
||||
_watch_events = []
|
||||
while not _pr.completion_queue.empty():
|
||||
evt = _pr.completion_queue.get_nowait()
|
||||
evt_type = evt.get("type", "completion")
|
||||
if evt_type in ("watch_match", "watch_disabled"):
|
||||
_watch_events.append(evt)
|
||||
# else: completion events are handled by the watcher task
|
||||
for evt in _watch_events:
|
||||
synth_text = _format_gateway_process_notification(evt)
|
||||
if synth_text:
|
||||
try:
|
||||
await self._inject_watch_notification(synth_text, event)
|
||||
except Exception as e2:
|
||||
logger.error("Watch notification injection error: %s", e2)
|
||||
except Exception as e:
|
||||
logger.debug("Watch queue drain error: %s", e)
|
||||
|
||||
# NOTE: Dangerous command approvals are now handled inline by the
|
||||
# blocking gateway approval mechanism in tools/approval.py. The agent
|
||||
# thread blocks until the user responds with /approve or /deny, so by
|
||||
|
|
@ -6708,6 +6758,36 @@ class GatewayRunner:
|
|||
return prefix
|
||||
return user_text
|
||||
|
||||
async def _inject_watch_notification(self, synth_text: str, original_event) -> None:
|
||||
"""Inject a watch-pattern notification as a synthetic message event.
|
||||
|
||||
Uses the source from the original user event to route the notification
|
||||
back to the correct chat/adapter.
|
||||
"""
|
||||
source = getattr(original_event, "source", None)
|
||||
if not source:
|
||||
return
|
||||
platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform)
|
||||
adapter = None
|
||||
for p, a in self.adapters.items():
|
||||
if p.value == platform_name:
|
||||
adapter = a
|
||||
break
|
||||
if not adapter:
|
||||
return
|
||||
try:
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
synth_event = MessageEvent(
|
||||
text=synth_text,
|
||||
message_type=MessageType.TEXT,
|
||||
source=source,
|
||||
internal=True,
|
||||
)
|
||||
logger.info("Watch pattern notification — injecting for %s", platform_name)
|
||||
await adapter.handle_message(synth_event)
|
||||
except Exception as e:
|
||||
logger.error("Watch notification injection error: %s", e)
|
||||
|
||||
async def _run_process_watcher(self, watcher: dict) -> None:
|
||||
"""
|
||||
Periodically check a background process and push updates to the user.
|
||||
|
|
|
|||
304
tests/tools/test_watch_patterns.py
Normal file
304
tests/tools/test_watch_patterns.py
Normal file
|
|
@ -0,0 +1,304 @@
|
|||
"""Tests for watch_patterns background process monitoring feature.
|
||||
|
||||
Covers:
|
||||
- ProcessSession.watch_patterns field
|
||||
- ProcessRegistry._check_watch_patterns() matching + notification
|
||||
- Rate limiting (WATCH_MAX_PER_WINDOW) and overload kill switch
|
||||
- watch_queue population
|
||||
- Checkpoint persistence of watch_patterns
|
||||
- Terminal tool schema includes watch_patterns
|
||||
- Terminal tool handler passes watch_patterns through
|
||||
"""
|
||||
|
||||
import json
|
||||
import queue
|
||||
import time
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from tools.process_registry import (
|
||||
ProcessRegistry,
|
||||
ProcessSession,
|
||||
WATCH_MAX_PER_WINDOW,
|
||||
WATCH_WINDOW_SECONDS,
|
||||
WATCH_OVERLOAD_KILL_SECONDS,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def registry():
|
||||
"""Create a fresh ProcessRegistry."""
|
||||
return ProcessRegistry()
|
||||
|
||||
|
||||
def _make_session(
|
||||
sid="proc_test_watch",
|
||||
command="tail -f app.log",
|
||||
task_id="t1",
|
||||
watch_patterns=None,
|
||||
) -> ProcessSession:
|
||||
s = ProcessSession(
|
||||
id=sid,
|
||||
command=command,
|
||||
task_id=task_id,
|
||||
started_at=time.time(),
|
||||
watch_patterns=watch_patterns or [],
|
||||
)
|
||||
return s
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# ProcessSession field defaults
|
||||
# =========================================================================
|
||||
|
||||
class TestProcessSessionField:
|
||||
def test_default_empty(self):
|
||||
s = ProcessSession(id="proc_1", command="echo hi")
|
||||
assert s.watch_patterns == []
|
||||
assert s._watch_disabled is False
|
||||
assert s._watch_hits == 0
|
||||
assert s._watch_suppressed == 0
|
||||
|
||||
def test_can_set_patterns(self):
|
||||
s = _make_session(watch_patterns=["ERROR", "WARN"])
|
||||
assert s.watch_patterns == ["ERROR", "WARN"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Pattern matching + queue population
|
||||
# =========================================================================
|
||||
|
||||
class TestCheckWatchPatterns:
|
||||
def test_no_patterns_no_notification(self, registry):
|
||||
"""No watch_patterns → no notifications."""
|
||||
session = _make_session(watch_patterns=[])
|
||||
registry._check_watch_patterns(session, "ERROR: something broke\n")
|
||||
assert registry.completion_queue.empty()
|
||||
|
||||
def test_no_match_no_notification(self, registry):
|
||||
"""Output that doesn't match any pattern → no notification."""
|
||||
session = _make_session(watch_patterns=["ERROR", "FAIL"])
|
||||
registry._check_watch_patterns(session, "INFO: all good\nDEBUG: fine\n")
|
||||
assert registry.completion_queue.empty()
|
||||
|
||||
def test_basic_match(self, registry):
|
||||
"""Single matching line triggers a notification."""
|
||||
session = _make_session(watch_patterns=["ERROR"])
|
||||
registry._check_watch_patterns(session, "INFO: ok\nERROR: disk full\n")
|
||||
assert not registry.completion_queue.empty()
|
||||
evt = registry.completion_queue.get_nowait()
|
||||
assert evt["type"] == "watch_match"
|
||||
assert evt["pattern"] == "ERROR"
|
||||
assert "disk full" in evt["output"]
|
||||
assert evt["session_id"] == "proc_test_watch"
|
||||
|
||||
def test_multiple_patterns(self, registry):
|
||||
"""First matching pattern is reported."""
|
||||
session = _make_session(watch_patterns=["WARN", "ERROR"])
|
||||
registry._check_watch_patterns(session, "ERROR: bad\nWARN: hmm\n")
|
||||
evt = registry.completion_queue.get_nowait()
|
||||
# ERROR appears first in the output, and we check patterns in order
|
||||
# so "WARN" won't match "ERROR: bad" but "ERROR" will
|
||||
assert evt["pattern"] == "ERROR"
|
||||
assert "bad" in evt["output"]
|
||||
|
||||
def test_disabled_skips(self, registry):
|
||||
"""Disabled watch produces no notifications."""
|
||||
session = _make_session(watch_patterns=["ERROR"])
|
||||
session._watch_disabled = True
|
||||
registry._check_watch_patterns(session, "ERROR: boom\n")
|
||||
assert registry.completion_queue.empty()
|
||||
|
||||
def test_hit_counter_increments(self, registry):
|
||||
"""Each delivered notification increments _watch_hits."""
|
||||
session = _make_session(watch_patterns=["X"])
|
||||
registry._check_watch_patterns(session, "X\n")
|
||||
assert session._watch_hits == 1
|
||||
registry._check_watch_patterns(session, "X\n")
|
||||
assert session._watch_hits == 2
|
||||
|
||||
def test_output_truncation(self, registry):
|
||||
"""Very long matched output is truncated."""
|
||||
session = _make_session(watch_patterns=["X"])
|
||||
# Generate 30 matching lines (more than the 20-line cap)
|
||||
text = "\n".join(f"X line {i}" for i in range(30)) + "\n"
|
||||
registry._check_watch_patterns(session, text)
|
||||
evt = registry.completion_queue.get_nowait()
|
||||
# Should only have 20 lines max
|
||||
assert evt["output"].count("\n") <= 20
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Rate limiting
|
||||
# =========================================================================
|
||||
|
||||
class TestRateLimiting:
|
||||
def test_within_window_limit(self, registry):
|
||||
"""Notifications within the rate limit all get delivered."""
|
||||
session = _make_session(watch_patterns=["E"])
|
||||
for i in range(WATCH_MAX_PER_WINDOW):
|
||||
registry._check_watch_patterns(session, f"E {i}\n")
|
||||
assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW
|
||||
|
||||
def test_exceeds_window_limit(self, registry):
|
||||
"""Notifications beyond the rate limit are suppressed."""
|
||||
session = _make_session(watch_patterns=["E"])
|
||||
for i in range(WATCH_MAX_PER_WINDOW + 5):
|
||||
registry._check_watch_patterns(session, f"E {i}\n")
|
||||
# Only WATCH_MAX_PER_WINDOW should be in the queue
|
||||
assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW
|
||||
assert session._watch_suppressed == 5
|
||||
|
||||
def test_window_resets(self, registry):
|
||||
"""After the window expires, notifications can flow again."""
|
||||
session = _make_session(watch_patterns=["E"])
|
||||
# Fill the window
|
||||
for i in range(WATCH_MAX_PER_WINDOW):
|
||||
registry._check_watch_patterns(session, f"E {i}\n")
|
||||
# One more should be suppressed
|
||||
registry._check_watch_patterns(session, "E extra\n")
|
||||
assert session._watch_suppressed == 1
|
||||
|
||||
# Fast-forward past window
|
||||
session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1
|
||||
registry._check_watch_patterns(session, "E after reset\n")
|
||||
# Should deliver now (window reset)
|
||||
assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW + 1
|
||||
|
||||
def test_suppressed_count_in_next_delivery(self, registry):
|
||||
"""Suppressed count is reported in the next successful delivery."""
|
||||
session = _make_session(watch_patterns=["E"])
|
||||
for i in range(WATCH_MAX_PER_WINDOW):
|
||||
registry._check_watch_patterns(session, f"E {i}\n")
|
||||
# Suppress 3 more
|
||||
for i in range(3):
|
||||
registry._check_watch_patterns(session, f"E suppressed {i}\n")
|
||||
assert session._watch_suppressed == 3
|
||||
|
||||
# Fast-forward past window to allow delivery
|
||||
session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1
|
||||
registry._check_watch_patterns(session, "E back\n")
|
||||
# Drain to the last event
|
||||
last_evt = None
|
||||
while not registry.completion_queue.empty():
|
||||
last_evt = registry.completion_queue.get_nowait()
|
||||
assert last_evt["suppressed"] == 3
|
||||
assert session._watch_suppressed == 0 # reset after delivery
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Overload kill switch
|
||||
# =========================================================================
|
||||
|
||||
class TestOverloadKillSwitch:
|
||||
def test_sustained_overload_disables(self, registry):
|
||||
"""Sustained overload beyond threshold permanently disables watching."""
|
||||
session = _make_session(watch_patterns=["E"])
|
||||
# Fill the window to trigger rate limit
|
||||
for i in range(WATCH_MAX_PER_WINDOW):
|
||||
registry._check_watch_patterns(session, f"E {i}\n")
|
||||
|
||||
# Simulate sustained overload: set overload_since to past threshold
|
||||
session._watch_overload_since = time.time() - WATCH_OVERLOAD_KILL_SECONDS - 1
|
||||
# Force another suppressed hit
|
||||
registry._check_watch_patterns(session, "E overload\n")
|
||||
registry._check_watch_patterns(session, "E overload2\n")
|
||||
|
||||
assert session._watch_disabled is True
|
||||
# Should have a watch_disabled event in the queue
|
||||
disabled_evts = []
|
||||
while not registry.completion_queue.empty():
|
||||
evt = registry.completion_queue.get_nowait()
|
||||
if evt.get("type") == "watch_disabled":
|
||||
disabled_evts.append(evt)
|
||||
assert len(disabled_evts) == 1
|
||||
assert "too many matches" in disabled_evts[0]["message"]
|
||||
|
||||
def test_overload_resets_on_delivery(self, registry):
|
||||
"""Overload timer resets when a notification gets through."""
|
||||
session = _make_session(watch_patterns=["E"])
|
||||
# Start overload tracking
|
||||
session._watch_overload_since = time.time() - 10
|
||||
# But window allows delivery → overload should reset
|
||||
registry._check_watch_patterns(session, "E ok\n")
|
||||
assert session._watch_overload_since == 0.0
|
||||
assert session._watch_disabled is False
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Checkpoint persistence
|
||||
# =========================================================================
|
||||
|
||||
class TestCheckpointPersistence:
|
||||
def test_watch_patterns_in_checkpoint(self, registry):
|
||||
"""watch_patterns is included in checkpoint data."""
|
||||
session = _make_session(watch_patterns=["ERROR", "FAIL"])
|
||||
with registry._lock:
|
||||
registry._running[session.id] = session
|
||||
|
||||
with patch("utils.atomic_json_write") as mock_write:
|
||||
registry._write_checkpoint()
|
||||
args = mock_write.call_args
|
||||
entries = args[0][1] # second positional arg
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["watch_patterns"] == ["ERROR", "FAIL"]
|
||||
|
||||
def test_watch_patterns_recovery(self, registry, tmp_path, monkeypatch):
|
||||
"""watch_patterns survives checkpoint recovery."""
|
||||
import tools.process_registry as pr_mod
|
||||
checkpoint = tmp_path / "processes.json"
|
||||
checkpoint.write_text(json.dumps([{
|
||||
"session_id": "proc_recovered",
|
||||
"command": "tail -f log",
|
||||
"pid": 99999999, # non-existent
|
||||
"pid_scope": "host",
|
||||
"started_at": time.time(),
|
||||
"task_id": "",
|
||||
"session_key": "",
|
||||
"watcher_platform": "",
|
||||
"watcher_chat_id": "",
|
||||
"watcher_thread_id": "",
|
||||
"watcher_interval": 0,
|
||||
"notify_on_complete": False,
|
||||
"watch_patterns": ["PANIC", "OOM"],
|
||||
}]))
|
||||
monkeypatch.setattr(pr_mod, "CHECKPOINT_PATH", checkpoint)
|
||||
# PID doesn't exist, so nothing will be recovered
|
||||
count = registry.recover_from_checkpoint()
|
||||
# Won't recover since PID is fake, but verify the code path doesn't crash
|
||||
assert count == 0
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Terminal tool schema + handler
|
||||
# =========================================================================
|
||||
|
||||
class TestTerminalToolSchema:
|
||||
def test_schema_includes_watch_patterns(self):
|
||||
from tools.terminal_tool import TERMINAL_SCHEMA
|
||||
props = TERMINAL_SCHEMA["parameters"]["properties"]
|
||||
assert "watch_patterns" in props
|
||||
assert props["watch_patterns"]["type"] == "array"
|
||||
assert props["watch_patterns"]["items"] == {"type": "string"}
|
||||
|
||||
def test_handler_passes_watch_patterns(self):
|
||||
"""_handle_terminal passes watch_patterns to terminal_tool."""
|
||||
from tools.terminal_tool import _handle_terminal
|
||||
with patch("tools.terminal_tool.terminal_tool") as mock_tt:
|
||||
mock_tt.return_value = json.dumps({"output": "ok", "exit_code": 0})
|
||||
_handle_terminal(
|
||||
{"command": "echo hi", "watch_patterns": ["ERR"]},
|
||||
task_id="t1",
|
||||
)
|
||||
_, kwargs = mock_tt.call_args
|
||||
assert kwargs.get("watch_patterns") == ["ERR"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Code execution tool blocked params
|
||||
# =========================================================================
|
||||
|
||||
class TestCodeExecutionBlocked:
|
||||
def test_watch_patterns_blocked(self):
|
||||
from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS
|
||||
assert "watch_patterns" in _TERMINAL_BLOCKED_PARAMS
|
||||
|
|
@ -301,7 +301,7 @@ def _call(tool_name, args):
|
|||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Terminal parameters that must not be used from ephemeral sandbox scripts
|
||||
_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete"}
|
||||
_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete", "watch_patterns"}
|
||||
|
||||
|
||||
def _rpc_server_loop(
|
||||
|
|
|
|||
|
|
@ -58,6 +58,11 @@ MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer
|
|||
FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes
|
||||
MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning)
|
||||
|
||||
# Watch pattern rate limiting
|
||||
WATCH_MAX_PER_WINDOW = 8 # Max notifications delivered per window
|
||||
WATCH_WINDOW_SECONDS = 10 # Rolling window length
|
||||
WATCH_OVERLOAD_KILL_SECONDS = 45 # Sustained overload duration before disabling watch
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessSession:
|
||||
|
|
@ -83,6 +88,14 @@ class ProcessSession:
|
|||
watcher_thread_id: str = ""
|
||||
watcher_interval: int = 0 # 0 = no watcher configured
|
||||
notify_on_complete: bool = False # Queue agent notification on exit
|
||||
# Watch patterns — trigger agent notification when output matches any pattern
|
||||
watch_patterns: List[str] = field(default_factory=list)
|
||||
_watch_hits: int = field(default=0, repr=False) # total matches delivered
|
||||
_watch_suppressed: int = field(default=0, repr=False) # matches dropped by rate limit
|
||||
_watch_overload_since: float = field(default=0.0, repr=False) # when sustained overload began
|
||||
_watch_disabled: bool = field(default=False, repr=False) # permanently killed by overload
|
||||
_watch_window_hits: int = field(default=0, repr=False) # hits in current rate window
|
||||
_watch_window_start: float = field(default=0.0, repr=False)
|
||||
_lock: threading.Lock = field(default_factory=threading.Lock)
|
||||
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
|
||||
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
|
||||
|
|
@ -114,9 +127,10 @@ class ProcessRegistry:
|
|||
# Side-channel for check_interval watchers (gateway reads after agent run)
|
||||
self.pending_watchers: List[Dict[str, Any]] = []
|
||||
|
||||
# Completion notifications — processes with notify_on_complete push here
|
||||
# on exit. CLI process_loop and gateway drain this after each agent turn
|
||||
# to auto-trigger a new agent turn with the process results.
|
||||
# Notification queue — unified queue for all background process events.
|
||||
# Completion notifications (notify_on_complete) and watch pattern matches
|
||||
# both land here, distinguished by "type" field. CLI process_loop and
|
||||
# gateway drain this after each agent turn to auto-trigger new turns.
|
||||
import queue as _queue_mod
|
||||
self.completion_queue: _queue_mod.Queue = _queue_mod.Queue()
|
||||
|
||||
|
|
@ -128,6 +142,84 @@ class ProcessRegistry:
|
|||
lines.pop(0)
|
||||
return "\n".join(lines)
|
||||
|
||||
def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None:
|
||||
"""Scan new output for watch patterns and queue notifications.
|
||||
|
||||
Called from reader threads with new_text being the freshly-read chunk.
|
||||
Rate-limited: max WATCH_MAX_PER_WINDOW notifications per WATCH_WINDOW_SECONDS.
|
||||
If sustained overload exceeds WATCH_OVERLOAD_KILL_SECONDS, watching is
|
||||
disabled permanently for this process.
|
||||
"""
|
||||
if not session.watch_patterns or session._watch_disabled:
|
||||
return
|
||||
|
||||
# Scan new text line-by-line for pattern matches
|
||||
matched_lines = []
|
||||
matched_pattern = None
|
||||
for line in new_text.splitlines():
|
||||
for pat in session.watch_patterns:
|
||||
if pat in line:
|
||||
matched_lines.append(line.rstrip())
|
||||
if matched_pattern is None:
|
||||
matched_pattern = pat
|
||||
break # one match per line is enough
|
||||
|
||||
if not matched_lines:
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
with session._lock:
|
||||
# Reset window if it's expired
|
||||
if now - session._watch_window_start >= WATCH_WINDOW_SECONDS:
|
||||
session._watch_window_hits = 0
|
||||
session._watch_window_start = now
|
||||
|
||||
# Check rate limit
|
||||
if session._watch_window_hits >= WATCH_MAX_PER_WINDOW:
|
||||
session._watch_suppressed += len(matched_lines)
|
||||
|
||||
# Track sustained overload for kill switch
|
||||
if session._watch_overload_since == 0.0:
|
||||
session._watch_overload_since = now
|
||||
elif now - session._watch_overload_since > WATCH_OVERLOAD_KILL_SECONDS:
|
||||
session._watch_disabled = True
|
||||
self.completion_queue.put({
|
||||
"session_id": session.id,
|
||||
"command": session.command,
|
||||
"type": "watch_disabled",
|
||||
"suppressed": session._watch_suppressed,
|
||||
"message": (
|
||||
f"Watch patterns disabled for process {session.id} — "
|
||||
f"too many matches ({session._watch_suppressed} suppressed). "
|
||||
f"Use process(action='poll') to check output manually."
|
||||
),
|
||||
})
|
||||
return
|
||||
|
||||
# Under the rate limit — deliver notification
|
||||
session._watch_window_hits += 1
|
||||
session._watch_hits += 1
|
||||
# Clear overload tracker since we got a delivery through
|
||||
session._watch_overload_since = 0.0
|
||||
|
||||
# Include suppressed count if any events were dropped
|
||||
suppressed = session._watch_suppressed
|
||||
session._watch_suppressed = 0
|
||||
|
||||
# Trim matched output to a reasonable size
|
||||
output = "\n".join(matched_lines[:20])
|
||||
if len(output) > 2000:
|
||||
output = output[:2000] + "\n...(truncated)"
|
||||
|
||||
self.completion_queue.put({
|
||||
"session_id": session.id,
|
||||
"command": session.command,
|
||||
"type": "watch_match",
|
||||
"pattern": matched_pattern,
|
||||
"output": output,
|
||||
"suppressed": suppressed,
|
||||
})
|
||||
|
||||
@staticmethod
|
||||
def _is_host_pid_alive(pid: Optional[int]) -> bool:
|
||||
"""Best-effort liveness check for host-visible PIDs."""
|
||||
|
|
@ -394,6 +486,7 @@ class ProcessRegistry:
|
|||
session.output_buffer += chunk
|
||||
if len(session.output_buffer) > session.max_output_chars:
|
||||
session.output_buffer = session.output_buffer[-session.max_output_chars:]
|
||||
self._check_watch_patterns(session, chunk)
|
||||
except Exception as e:
|
||||
logger.debug("Process stdout reader ended: %s", e)
|
||||
finally:
|
||||
|
|
@ -413,6 +506,7 @@ class ProcessRegistry:
|
|||
quoted_log_path = shlex.quote(log_path)
|
||||
quoted_pid_path = shlex.quote(pid_path)
|
||||
quoted_exit_path = shlex.quote(exit_path)
|
||||
prev_output_len = 0 # track delta for watch pattern scanning
|
||||
while not session.exited:
|
||||
time.sleep(2) # Poll every 2 seconds
|
||||
try:
|
||||
|
|
@ -420,10 +514,15 @@ class ProcessRegistry:
|
|||
result = env.execute(f"cat {quoted_log_path} 2>/dev/null", timeout=10)
|
||||
new_output = result.get("output", "")
|
||||
if new_output:
|
||||
# Compute delta for watch pattern scanning
|
||||
delta = new_output[prev_output_len:] if len(new_output) > prev_output_len else ""
|
||||
prev_output_len = len(new_output)
|
||||
with session._lock:
|
||||
session.output_buffer = new_output
|
||||
if len(session.output_buffer) > session.max_output_chars:
|
||||
session.output_buffer = session.output_buffer[-session.max_output_chars:]
|
||||
if delta:
|
||||
self._check_watch_patterns(session, delta)
|
||||
|
||||
# Check if process is still running
|
||||
check = env.execute(
|
||||
|
|
@ -467,6 +566,7 @@ class ProcessRegistry:
|
|||
session.output_buffer += text
|
||||
if len(session.output_buffer) > session.max_output_chars:
|
||||
session.output_buffer = session.output_buffer[-session.max_output_chars:]
|
||||
self._check_watch_patterns(session, text)
|
||||
except EOFError:
|
||||
break
|
||||
except Exception:
|
||||
|
|
@ -502,6 +602,7 @@ class ProcessRegistry:
|
|||
from tools.ansi_strip import strip_ansi
|
||||
output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
|
||||
self.completion_queue.put({
|
||||
"type": "completion",
|
||||
"session_id": session.id,
|
||||
"command": session.command,
|
||||
"exit_code": session.exit_code,
|
||||
|
|
@ -872,6 +973,7 @@ class ProcessRegistry:
|
|||
"watcher_thread_id": s.watcher_thread_id,
|
||||
"watcher_interval": s.watcher_interval,
|
||||
"notify_on_complete": s.notify_on_complete,
|
||||
"watch_patterns": s.watch_patterns,
|
||||
})
|
||||
|
||||
# Atomic write to avoid corruption on crash
|
||||
|
|
@ -932,6 +1034,7 @@ class ProcessRegistry:
|
|||
watcher_thread_id=entry.get("watcher_thread_id", ""),
|
||||
watcher_interval=entry.get("watcher_interval", 0),
|
||||
notify_on_complete=entry.get("notify_on_complete", False),
|
||||
watch_patterns=entry.get("watch_patterns", []),
|
||||
)
|
||||
with self._lock:
|
||||
self._running[session.id] = session
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ import atexit
|
|||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -1140,6 +1140,7 @@ def terminal_tool(
|
|||
check_interval: Optional[int] = None,
|
||||
pty: bool = False,
|
||||
notify_on_complete: bool = False,
|
||||
watch_patterns: Optional[List[str]] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Execute a command in the configured terminal environment.
|
||||
|
|
@ -1154,6 +1155,7 @@ def terminal_tool(
|
|||
check_interval: Seconds between auto-checks for background processes (gateway only, min 30)
|
||||
pty: If True, use pseudo-terminal for interactive CLI tools (local backend only)
|
||||
notify_on_complete: If True and background=True, auto-notify the agent when the process exits
|
||||
watch_patterns: List of strings to watch for in background output; triggers notification on match
|
||||
|
||||
Returns:
|
||||
str: JSON string with output, exit_code, and error fields
|
||||
|
|
@ -1439,6 +1441,11 @@ def terminal_tool(
|
|||
"notify_on_complete": True,
|
||||
})
|
||||
|
||||
# Set watch patterns for output monitoring
|
||||
if watch_patterns and background:
|
||||
proc_session.watch_patterns = list(watch_patterns)
|
||||
result_data["watch_patterns"] = proc_session.watch_patterns
|
||||
|
||||
# Register check_interval watcher (gateway picks this up after agent run)
|
||||
if check_interval and background:
|
||||
effective_interval = max(30, check_interval)
|
||||
|
|
@ -1762,6 +1769,11 @@ TERMINAL_SCHEMA = {
|
|||
"type": "boolean",
|
||||
"description": "When true (and background=true), you'll be automatically notified when the process finishes — no polling needed. Use this for tasks that take a while (tests, builds, deployments) so you can keep working on other things in the meantime.",
|
||||
"default": False
|
||||
},
|
||||
"watch_patterns": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "List of strings to watch for in background process output. When any pattern matches a line of output, you'll be notified with the matching text — like notify_on_complete but triggers mid-process on specific output. Use for monitoring logs, watching for errors, or waiting for specific events (e.g. [\"ERROR\", \"FAIL\", \"listening on port\"])."
|
||||
}
|
||||
},
|
||||
"required": ["command"]
|
||||
|
|
@ -1779,6 +1791,7 @@ def _handle_terminal(args, **kw):
|
|||
check_interval=args.get("check_interval"),
|
||||
pty=args.get("pty", False),
|
||||
notify_on_complete=args.get("notify_on_complete", False),
|
||||
watch_patterns=args.get("watch_patterns"),
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue