mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-08 03:01:47 +00:00
281 lines
9.3 KiB
Python
281 lines
9.3 KiB
Python
"""Tests for cli._cprint's bg-thread cooperation with prompt_toolkit.
|
|
|
|
Background: when a prompt_toolkit Application is running, a bg thread that
|
|
calls ``_pt_print`` directly can race with the input-area redraw and the
|
|
printed line can end up visually buried behind the prompt. ``_cprint`` now
|
|
routes cross-thread prints through ``run_in_terminal`` via
|
|
``loop.call_soon_threadsafe`` so the self-improvement background review's
|
|
``💾 Self-improvement review: …`` summary actually surfaces to the user.
|
|
|
|
These tests verify the routing logic without spinning up a real PT app.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import types
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
import cli
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_output_history():
|
|
cli._configure_output_history(False, 200)
|
|
yield
|
|
cli._configure_output_history(True, 200)
|
|
|
|
|
|
def test_cprint_no_app_direct_print(monkeypatch):
|
|
"""No active app → direct _pt_print, no run_in_terminal involvement."""
|
|
calls = []
|
|
monkeypatch.setattr(cli, "_pt_print", lambda x: calls.append(("pt_print", x)))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: ("ANSI", t))
|
|
|
|
# Patch the prompt_toolkit import the function performs internally.
|
|
fake_pt_app = types.ModuleType("prompt_toolkit.application")
|
|
fake_pt_app.get_app_or_none = lambda: None
|
|
fake_pt_app.run_in_terminal = lambda *a, **kw: calls.append(("run_in_terminal",))
|
|
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
|
|
|
|
cli._cprint("hello")
|
|
|
|
assert calls == [("pt_print", ("ANSI", "hello"))]
|
|
|
|
|
|
def test_cprint_app_not_running_direct_print(monkeypatch):
|
|
"""App exists but not running (e.g. teardown) → direct print."""
|
|
calls = []
|
|
monkeypatch.setattr(cli, "_pt_print", lambda x: calls.append(("pt_print", x)))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
|
|
|
|
fake_app = SimpleNamespace(_is_running=False, loop=None)
|
|
fake_pt_app = types.ModuleType("prompt_toolkit.application")
|
|
fake_pt_app.get_app_or_none = lambda: fake_app
|
|
fake_pt_app.run_in_terminal = lambda *a, **kw: calls.append(("run_in_terminal",))
|
|
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
|
|
|
|
cli._cprint("x")
|
|
|
|
assert calls == [("pt_print", "x")]
|
|
|
|
|
|
def test_cprint_bg_thread_schedules_on_app_loop(monkeypatch):
|
|
"""App running + different thread → schedules via call_soon_threadsafe."""
|
|
scheduled = []
|
|
direct_prints = []
|
|
|
|
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
|
|
|
|
class FakeLoop:
|
|
def is_running(self):
|
|
return True
|
|
|
|
def call_soon_threadsafe(self, cb, *args):
|
|
scheduled.append(cb)
|
|
|
|
fake_loop = FakeLoop()
|
|
|
|
# Install a fake "current loop" that is NOT the app's loop, so the
|
|
# cross-thread branch is taken.
|
|
fake_current_loop = SimpleNamespace(is_running=lambda: True)
|
|
fake_asyncio = types.ModuleType("asyncio")
|
|
|
|
class _Policy:
|
|
def get_event_loop(self):
|
|
return fake_current_loop
|
|
|
|
fake_asyncio.get_event_loop_policy = lambda: _Policy()
|
|
monkeypatch.setitem(sys.modules, "asyncio", fake_asyncio)
|
|
|
|
fake_app = SimpleNamespace(_is_running=True, loop=fake_loop)
|
|
fake_pt_app = types.ModuleType("prompt_toolkit.application")
|
|
fake_pt_app.get_app_or_none = lambda: fake_app
|
|
|
|
run_in_terminal_calls = []
|
|
|
|
def _fake_run_in_terminal(func, **kw):
|
|
run_in_terminal_calls.append(func)
|
|
# Simulate run_in_terminal actually calling func (as the real PT
|
|
# impl would once the app loop tick picks it up).
|
|
func()
|
|
return None
|
|
|
|
fake_pt_app.run_in_terminal = _fake_run_in_terminal
|
|
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
|
|
|
|
cli._cprint("💾 Self-improvement review: Skill updated")
|
|
|
|
# call_soon_threadsafe must have been called with a scheduling cb.
|
|
assert len(scheduled) == 1
|
|
|
|
# Invoking the scheduled callback should hit run_in_terminal.
|
|
scheduled[0]()
|
|
assert len(run_in_terminal_calls) == 1
|
|
|
|
# And run_in_terminal's inner func should have emitted a pt_print.
|
|
assert direct_prints == ["💾 Self-improvement review: Skill updated"]
|
|
|
|
|
|
def test_cprint_same_thread_as_app_loop_direct_print(monkeypatch):
|
|
"""App running on same thread → direct print (no scheduling)."""
|
|
direct_prints = []
|
|
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
|
|
|
|
class FakeLoop:
|
|
def is_running(self):
|
|
return True
|
|
|
|
def call_soon_threadsafe(self, cb, *args):
|
|
raise AssertionError(
|
|
"call_soon_threadsafe must not be used on the app's own thread"
|
|
)
|
|
|
|
fake_loop = FakeLoop()
|
|
fake_asyncio = types.ModuleType("asyncio")
|
|
|
|
class _Policy:
|
|
def get_event_loop(self):
|
|
return fake_loop # same as app loop
|
|
|
|
fake_asyncio.get_event_loop_policy = lambda: _Policy()
|
|
monkeypatch.setitem(sys.modules, "asyncio", fake_asyncio)
|
|
|
|
fake_app = SimpleNamespace(_is_running=True, loop=fake_loop)
|
|
fake_pt_app = types.ModuleType("prompt_toolkit.application")
|
|
fake_pt_app.get_app_or_none = lambda: fake_app
|
|
fake_pt_app.run_in_terminal = lambda *a, **kw: None
|
|
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
|
|
|
|
cli._cprint("x")
|
|
|
|
assert direct_prints == ["x"]
|
|
|
|
|
|
def test_cprint_swallows_app_loop_attr_error(monkeypatch):
|
|
"""Loop missing on app → fall back to direct print, no crash."""
|
|
direct_prints = []
|
|
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
|
|
|
|
class WeirdApp:
|
|
_is_running = True
|
|
|
|
@property
|
|
def loop(self):
|
|
raise RuntimeError("no loop for you")
|
|
|
|
fake_pt_app = types.ModuleType("prompt_toolkit.application")
|
|
fake_pt_app.get_app_or_none = lambda: WeirdApp()
|
|
fake_pt_app.run_in_terminal = lambda *a, **kw: None
|
|
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
|
|
|
|
cli._cprint("fallback")
|
|
|
|
assert direct_prints == ["fallback"]
|
|
|
|
|
|
def test_cprint_swallows_prompt_toolkit_import_error(monkeypatch):
|
|
"""If prompt_toolkit.application itself fails to import, fall back."""
|
|
direct_prints = []
|
|
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
|
|
|
|
# Drop cached prompt_toolkit.application AND install a meta-path finder
|
|
# that raises ImportError on re-import.
|
|
monkeypatch.delitem(sys.modules, "prompt_toolkit.application", raising=False)
|
|
|
|
class _BlockFinder:
|
|
def find_module(self, name, path=None):
|
|
if name == "prompt_toolkit.application":
|
|
return self
|
|
return None
|
|
|
|
def load_module(self, name):
|
|
raise ImportError("blocked for test")
|
|
|
|
def find_spec(self, name, path=None, target=None):
|
|
if name == "prompt_toolkit.application":
|
|
# Returning a bogus spec that will fail on load works too,
|
|
# but raising here keeps the test simple.
|
|
raise ImportError("blocked for test")
|
|
return None
|
|
|
|
blocker = _BlockFinder()
|
|
sys.meta_path.insert(0, blocker)
|
|
try:
|
|
cli._cprint("fallback2")
|
|
finally:
|
|
sys.meta_path.remove(blocker)
|
|
|
|
assert direct_prints == ["fallback2"]
|
|
|
|
|
|
def test_output_history_strips_ansi_and_keeps_recent_lines():
|
|
cli._configure_output_history(True, 10)
|
|
|
|
for idx in range(12):
|
|
cli._record_output_history(f"\x1b[31mline-{idx}\x1b[0m")
|
|
|
|
assert list(cli._OUTPUT_HISTORY) == [f"line-{idx}" for idx in range(2, 12)]
|
|
|
|
|
|
def test_replay_output_history_does_not_record_replayed_lines(monkeypatch):
|
|
cli._configure_output_history(True, 10)
|
|
cli._record_output_history("visible output")
|
|
printed = []
|
|
|
|
def _fake_print(value):
|
|
printed.append(value)
|
|
cli._record_output_history("duplicated replay")
|
|
|
|
monkeypatch.setattr(cli, "_pt_print", _fake_print)
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda text: text)
|
|
|
|
cli._replay_output_history()
|
|
|
|
assert printed == ["visible output"]
|
|
assert list(cli._OUTPUT_HISTORY) == ["visible output"]
|
|
|
|
|
|
def test_replay_output_history_rerenders_callable_entries(monkeypatch):
|
|
cli._configure_output_history(True, 10)
|
|
widths_seen = []
|
|
printed = []
|
|
|
|
def _render_current_width():
|
|
widths_seen.append("called")
|
|
return ["top border", "body"]
|
|
|
|
cli._record_output_history_entry(_render_current_width)
|
|
monkeypatch.setattr(cli, "_pt_print", lambda value: printed.append(value))
|
|
monkeypatch.setattr(cli, "_PT_ANSI", lambda text: text)
|
|
|
|
cli._replay_output_history()
|
|
|
|
assert widths_seen == ["called"]
|
|
assert printed == ["top border", "body"]
|
|
assert list(cli._OUTPUT_HISTORY) == [_render_current_width]
|
|
|
|
|
|
def test_suspend_output_history_blocks_recording():
|
|
cli._configure_output_history(True, 10)
|
|
|
|
with cli._suspend_output_history():
|
|
cli._record_output_history("hidden")
|
|
cli._record_output_history_entry("also hidden")
|
|
|
|
assert list(cli._OUTPUT_HISTORY) == []
|
|
|
|
|
|
def test_clear_output_history_removes_replayable_lines():
|
|
cli._configure_output_history(True, 10)
|
|
cli._record_output_history("before clear")
|
|
|
|
cli._clear_output_history()
|
|
|
|
assert list(cli._OUTPUT_HISTORY) == []
|