mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-15 09:21:36 +00:00
perf(process): wake waiters on background completion (#45831)
This commit is contained in:
parent
6b76284c77
commit
1106879147
2 changed files with 34 additions and 1 deletions
|
|
@ -5,6 +5,7 @@ import os
|
|||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
|
@ -266,6 +267,31 @@ class TestOrphanedPipeReconciliation:
|
|||
except (ProcessLookupError, PermissionError):
|
||||
pass
|
||||
|
||||
def test_wait_wakes_when_session_moves_to_finished(self, registry):
|
||||
"""wait() should not sleep for the old 1s polling tick after exit."""
|
||||
s = _make_session(sid="proc_wait_event", output="done")
|
||||
registry._running[s.id] = s
|
||||
|
||||
def finish_later():
|
||||
time.sleep(0.05)
|
||||
s.exited = True
|
||||
s.exit_code = 0
|
||||
with patch.object(registry, "_write_checkpoint"):
|
||||
registry._move_to_finished(s)
|
||||
|
||||
t = threading.Thread(target=finish_later)
|
||||
t.start()
|
||||
start = time.monotonic()
|
||||
try:
|
||||
result = registry.wait(s.id, timeout=5)
|
||||
finally:
|
||||
t.join(timeout=1)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert result["status"] == "exited", result
|
||||
assert result["exit_code"] == 0
|
||||
assert elapsed < 0.3, f"wait() should wake on completion; took {elapsed:.3f}s"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Read log
|
||||
|
|
|
|||
|
|
@ -129,6 +129,7 @@ class ProcessSession:
|
|||
_watch_cooldown_until: float = field(default=0.0, repr=False)
|
||||
_watch_strike_candidate: bool = field(default=False, repr=False)
|
||||
_watch_consecutive_strikes: int = field(default=0, repr=False)
|
||||
_completion_event: threading.Event = field(default_factory=threading.Event, repr=False)
|
||||
_lock: threading.Lock = field(default_factory=threading.Lock)
|
||||
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
|
||||
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
|
||||
|
|
@ -870,6 +871,7 @@ class ProcessRegistry:
|
|||
with self._lock:
|
||||
was_running = self._running.pop(session.id, None) is not None
|
||||
self._finished[session.id] = session
|
||||
session._completion_event.set()
|
||||
self._write_checkpoint()
|
||||
|
||||
# Only enqueue completion notification on the FIRST move. Without
|
||||
|
|
@ -1093,6 +1095,8 @@ class ProcessRegistry:
|
|||
|
||||
while time.monotonic() < deadline:
|
||||
session = self._refresh_detached_session(session)
|
||||
if session is None:
|
||||
return {"status": "not_found", "error": f"No process with ID {session_id}"}
|
||||
# Reconcile against real child state — guards against orphaned-
|
||||
# pipe reader hangs where the reader is blocked but the direct
|
||||
# child has already exited (issue #17327).
|
||||
|
|
@ -1118,7 +1122,10 @@ class ProcessRegistry:
|
|||
result["timeout_note"] = timeout_note
|
||||
return result
|
||||
|
||||
time.sleep(1)
|
||||
remaining = deadline - time.monotonic()
|
||||
if remaining <= 0:
|
||||
break
|
||||
session._completion_event.wait(timeout=min(1.0, remaining))
|
||||
|
||||
result = {
|
||||
"status": "timeout",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue