hermes-agent/tests/tui_gateway/test_protocol.py

233 lines
6.9 KiB
Python

"""Tests for tui_gateway JSON-RPC protocol plumbing."""
import io
import json
import sys
import threading
from unittest.mock import MagicMock, patch
import pytest
_original_stdout = sys.stdout
@pytest.fixture(autouse=True)
def _restore_stdout():
yield
sys.stdout = _original_stdout
@pytest.fixture()
def server():
with patch.dict("sys.modules", {
"hermes_constants": MagicMock(get_hermes_home=MagicMock(return_value="/tmp/hermes_test")),
"hermes_cli.env_loader": MagicMock(),
"hermes_cli.banner": MagicMock(),
"hermes_state": MagicMock(),
}):
import importlib
mod = importlib.import_module("tui_gateway.server")
yield mod
mod._sessions.clear()
mod._pending.clear()
mod._answers.clear()
mod._methods.clear()
importlib.reload(mod)
@pytest.fixture()
def capture(server):
"""Redirect server's real stdout to a StringIO and return (server, buf)."""
buf = io.StringIO()
server._real_stdout = buf
return server, buf
# ── JSON-RPC envelope ────────────────────────────────────────────────
def test_unknown_method(server):
resp = server.handle_request({"id": "1", "method": "bogus"})
assert resp["error"]["code"] == -32601
def test_ok_envelope(server):
assert server._ok("r1", {"x": 1}) == {
"jsonrpc": "2.0", "id": "r1", "result": {"x": 1},
}
def test_err_envelope(server):
assert server._err("r2", 4001, "nope") == {
"jsonrpc": "2.0", "id": "r2", "error": {"code": 4001, "message": "nope"},
}
# ── write_json ───────────────────────────────────────────────────────
def test_write_json(capture):
server, buf = capture
assert server.write_json({"test": True})
assert json.loads(buf.getvalue()) == {"test": True}
def test_write_json_broken_pipe(server):
class _Broken:
def write(self, _): raise BrokenPipeError
def flush(self): raise BrokenPipeError
server._real_stdout = _Broken()
assert server.write_json({"x": 1}) is False
# ── _emit ────────────────────────────────────────────────────────────
def test_emit_with_payload(capture):
server, buf = capture
server._emit("test.event", "s1", {"key": "val"})
msg = json.loads(buf.getvalue())
assert msg["method"] == "event"
assert msg["params"]["type"] == "test.event"
assert msg["params"]["session_id"] == "s1"
assert msg["params"]["payload"]["key"] == "val"
def test_emit_without_payload(capture):
server, buf = capture
server._emit("ping", "s2")
assert "payload" not in json.loads(buf.getvalue())["params"]
# ── Blocking prompt round-trip ───────────────────────────────────────
def test_block_and_respond(capture):
server, _ = capture
result = [None]
threading.Thread(
target=lambda: result.__setitem__(0, server._block("test.prompt", "s1", {"q": "?"}, timeout=5)),
).start()
for _ in range(100):
if server._pending:
break
threading.Event().wait(0.01)
rid = next(iter(server._pending))
server._answers[rid] = "my_answer"
server._pending[rid].set()
threading.Event().wait(0.1)
assert result[0] == "my_answer"
def test_clear_pending(server):
ev = threading.Event()
server._pending["r1"] = ev
server._clear_pending()
assert ev.is_set()
assert server._answers["r1"] == ""
# ── Session lookup ───────────────────────────────────────────────────
def test_sess_missing(server):
_, err = server._sess({"session_id": "nope"}, "r1")
assert err["error"]["code"] == 4001
def test_sess_found(server):
server._sessions["abc"] = {"agent": MagicMock()}
s, err = server._sess({"session_id": "abc"}, "r1")
assert s is not None
assert err is None
# ── session.resume payload ────────────────────────────────────────────
def test_session_resume_returns_hydrated_messages(server, monkeypatch):
class _DB:
def get_session(self, _sid):
return {"id": "20260409_010101_abc123"}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages(self, _sid):
return [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "yo"},
{"role": "tool", "content": "searched"},
{"role": "assistant", "content": " "},
{"role": "assistant", "content": None},
{"role": "narrator", "content": "skip"},
]
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_make_agent", lambda sid, key, session_id=None: object())
monkeypatch.setattr(server, "_init_session", lambda sid, key, agent, history, cols=80: None)
monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "test/model"})
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": "20260409_010101_abc123", "cols": 100},
}
)
assert "error" not in resp
assert resp["result"]["message_count"] == 3
assert resp["result"]["messages"] == [
{"role": "user", "text": "hello"},
{"role": "assistant", "text": "yo"},
{"role": "tool", "text": "searched"},
]
# ── Config I/O ───────────────────────────────────────────────────────
def test_config_load_missing(server, tmp_path):
server._hermes_home = tmp_path
assert server._load_cfg() == {}
def test_config_roundtrip(server, tmp_path):
server._hermes_home = tmp_path
server._save_cfg({"model": "test/model"})
assert server._load_cfg()["model"] == "test/model"
# ── _cli_exec_blocked ────────────────────────────────────────────────
@pytest.mark.parametrize("argv", [
[],
["setup"],
["gateway"],
["sessions", "browse"],
["config", "edit"],
])
def test_cli_exec_blocked(server, argv):
assert server._cli_exec_blocked(argv) is not None
@pytest.mark.parametrize("argv", [
["version"],
["sessions", "list"],
])
def test_cli_exec_allowed(server, argv):
assert server._cli_exec_blocked(argv) is None