Merge branch 'main' into hermes/delegation-readiness-doctor-clean

This commit is contained in:
NplusM420 2026-04-23 19:46:36 -05:00 committed by GitHub
commit cb855b84a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 4915 additions and 218 deletions

View file

@ -0,0 +1,254 @@
"""Tests for Moonshot/Kimi flavored-JSON-Schema sanitizer.
Moonshot's tool-parameter validator rejects several shapes that the rest of
the JSON Schema ecosystem accepts:
1. Properties without ``type`` Moonshot requires ``type`` on every node.
2. ``type`` at the parent of ``anyOf`` Moonshot requires it only inside
``anyOf`` children.
These tests cover the repairs applied by ``agent/moonshot_schema.py``.
"""
from __future__ import annotations
import pytest
from agent.moonshot_schema import (
is_moonshot_model,
sanitize_moonshot_tool_parameters,
sanitize_moonshot_tools,
)
class TestMoonshotModelDetection:
"""is_moonshot_model() must match across aggregator prefixes."""
@pytest.mark.parametrize(
"model",
[
"kimi-k2.6",
"kimi-k2-thinking",
"moonshotai/Kimi-K2.6",
"moonshotai/kimi-k2.6",
"nous/moonshotai/kimi-k2.6",
"openrouter/moonshotai/kimi-k2-thinking",
"MOONSHOTAI/KIMI-K2.6",
],
)
def test_positive_matches(self, model):
assert is_moonshot_model(model) is True
@pytest.mark.parametrize(
"model",
[
"",
None,
"anthropic/claude-sonnet-4.6",
"openai/gpt-5.4",
"google/gemini-3-flash-preview",
"deepseek-chat",
],
)
def test_negative_matches(self, model):
assert is_moonshot_model(model) is False
class TestMissingTypeFilled:
"""Rule 1: every property must carry a type."""
def test_property_without_type_gets_string(self):
params = {
"type": "object",
"properties": {"query": {"description": "a bare property"}},
}
out = sanitize_moonshot_tool_parameters(params)
assert out["properties"]["query"]["type"] == "string"
def test_property_with_enum_infers_type_from_first_value(self):
params = {
"type": "object",
"properties": {"flag": {"enum": [True, False]}},
}
out = sanitize_moonshot_tool_parameters(params)
assert out["properties"]["flag"]["type"] == "boolean"
def test_nested_properties_are_repaired(self):
params = {
"type": "object",
"properties": {
"filter": {
"type": "object",
"properties": {
"field": {"description": "no type"},
},
},
},
}
out = sanitize_moonshot_tool_parameters(params)
assert out["properties"]["filter"]["properties"]["field"]["type"] == "string"
def test_array_items_without_type_get_repaired(self):
params = {
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {"description": "tag entry"},
},
},
}
out = sanitize_moonshot_tool_parameters(params)
assert out["properties"]["tags"]["items"]["type"] == "string"
def test_ref_node_is_not_given_synthetic_type(self):
"""$ref nodes should NOT get a synthetic type — the referenced
definition supplies it, and Moonshot would reject the conflict."""
params = {
"type": "object",
"properties": {"payload": {"$ref": "#/$defs/Payload"}},
"$defs": {"Payload": {"type": "object", "properties": {}}},
}
out = sanitize_moonshot_tool_parameters(params)
assert "type" not in out["properties"]["payload"]
assert out["properties"]["payload"]["$ref"] == "#/$defs/Payload"
class TestAnyOfParentType:
"""Rule 2: type must not appear at the anyOf parent level."""
def test_parent_type_stripped_when_anyof_present(self):
params = {
"type": "object",
"properties": {
"from_format": {
"type": "string",
"anyOf": [
{"type": "string"},
{"type": "null"},
],
},
},
}
out = sanitize_moonshot_tool_parameters(params)
from_format = out["properties"]["from_format"]
assert "type" not in from_format
assert "anyOf" in from_format
def test_anyof_children_missing_type_get_filled(self):
params = {
"type": "object",
"properties": {
"value": {
"anyOf": [
{"type": "string"},
{"description": "A typeless option"},
],
},
},
}
out = sanitize_moonshot_tool_parameters(params)
children = out["properties"]["value"]["anyOf"]
assert children[0]["type"] == "string"
assert "type" in children[1]
class TestTopLevelGuarantees:
"""The returned top-level schema is always a well-formed object."""
def test_non_dict_input_returns_empty_object(self):
assert sanitize_moonshot_tool_parameters(None) == {"type": "object", "properties": {}}
assert sanitize_moonshot_tool_parameters("garbage") == {"type": "object", "properties": {}}
assert sanitize_moonshot_tool_parameters([]) == {"type": "object", "properties": {}}
def test_non_object_top_level_coerced(self):
params = {"type": "string"}
out = sanitize_moonshot_tool_parameters(params)
assert out["type"] == "object"
assert "properties" in out
def test_does_not_mutate_input(self):
params = {
"type": "object",
"properties": {"q": {"description": "no type"}},
}
snapshot = {
"type": params["type"],
"properties": {"q": dict(params["properties"]["q"])},
}
sanitize_moonshot_tool_parameters(params)
assert params["type"] == snapshot["type"]
assert "type" not in params["properties"]["q"]
class TestToolListSanitizer:
"""sanitize_moonshot_tools() walks an OpenAI-format tool list."""
def test_applies_per_tool(self):
tools = [
{
"type": "function",
"function": {
"name": "search",
"description": "Search",
"parameters": {
"type": "object",
"properties": {"q": {"description": "query"}},
},
},
},
{
"type": "function",
"function": {
"name": "noop",
"description": "Does nothing",
"parameters": {"type": "object", "properties": {}},
},
},
]
out = sanitize_moonshot_tools(tools)
assert out[0]["function"]["parameters"]["properties"]["q"]["type"] == "string"
# Second tool already clean — should be structurally equivalent
assert out[1]["function"]["parameters"] == {"type": "object", "properties": {}}
def test_empty_list_is_passthrough(self):
assert sanitize_moonshot_tools([]) == []
assert sanitize_moonshot_tools(None) is None
def test_skips_malformed_entries(self):
"""Entries without a function dict are passed through untouched."""
tools = [{"type": "function"}, {"not": "a tool"}]
out = sanitize_moonshot_tools(tools)
assert out == tools
class TestRealWorldMCPShape:
"""End-to-end: a realistic MCP-style schema that used to 400 on Moonshot."""
def test_combined_rewrites(self):
# Shape: missing type on a property, anyOf with parent type, array
# items without type — all in one tool.
params = {
"type": "object",
"properties": {
"query": {"description": "search text"},
"filter": {
"type": "string",
"anyOf": [
{"type": "string"},
{"type": "null"},
],
},
"tags": {
"type": "array",
"items": {"description": "tag"},
},
},
"required": ["query"],
}
out = sanitize_moonshot_tool_parameters(params)
assert out["properties"]["query"]["type"] == "string"
assert "type" not in out["properties"]["filter"]
assert out["properties"]["filter"]["anyOf"][0]["type"] == "string"
assert out["properties"]["tags"]["items"]["type"] == "string"
assert out["required"] == ["query"]

View file

@ -238,6 +238,56 @@ class TestChatCompletionsKimi:
)
assert kw["extra_body"]["thinking"] == {"type": "disabled"}
def test_moonshot_tool_schemas_are_sanitized_by_model_name(self, transport):
"""Aggregator routes (Nous, OpenRouter) hit Moonshot by model name, not base URL."""
tools = [
{
"type": "function",
"function": {
"name": "search",
"description": "Search",
"parameters": {
"type": "object",
"properties": {
"q": {"description": "query"}, # missing type
},
},
},
},
]
kw = transport.build_kwargs(
model="moonshotai/kimi-k2.6",
messages=[{"role": "user", "content": "Hi"}],
tools=tools,
max_tokens_param_fn=lambda n: {"max_tokens": n},
)
assert kw["tools"][0]["function"]["parameters"]["properties"]["q"]["type"] == "string"
def test_non_moonshot_tools_are_not_mutated(self, transport):
"""Other models don't go through the Moonshot sanitizer."""
original_params = {
"type": "object",
"properties": {"q": {"description": "query"}}, # missing type
}
tools = [
{
"type": "function",
"function": {
"name": "search",
"description": "Search",
"parameters": original_params,
},
},
]
kw = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6",
messages=[{"role": "user", "content": "Hi"}],
tools=tools,
max_tokens_param_fn=lambda n: {"max_tokens": n},
)
# The parameters dict is passed through untouched (no synthetic type)
assert "type" not in kw["tools"][0]["function"]["parameters"]["properties"]["q"]
class TestChatCompletionsValidate:

View file

@ -710,7 +710,15 @@ class TestRunJobSessionPersistence:
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["enabled_toolsets"] == ["web", "terminal", "file"]
def test_run_job_enabled_toolsets_none_when_not_set(self, tmp_path):
def test_run_job_enabled_toolsets_resolves_from_platform_config_when_not_set(self, tmp_path):
"""When a job has no explicit enabled_toolsets, the scheduler now
resolves them from ``hermes tools`` platform config for ``cron``
(PR #14xxx — blanket fix for Norbert's surprise ``moa`` run).
The legacy "pass None → AIAgent loads full default" path is still
reachable, but only when ``_get_platform_tools`` raises (safety net
for any unexpected config shape).
"""
job = {
"id": "no-toolset-job",
"name": "test",
@ -725,7 +733,39 @@ class TestRunJobSessionPersistence:
run_job(job)
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["enabled_toolsets"] is None
# Resolution happened — not None, is a list.
assert isinstance(kwargs["enabled_toolsets"], list)
# The cron default is _HERMES_CORE_TOOLS with _DEFAULT_OFF_TOOLSETS
# (``moa``, ``homeassistant``, ``rl``) removed. The most important
# invariant: ``moa`` is NOT in the default cron toolset, so a cron
# run cannot accidentally spin up frontier models.
assert "moa" not in kwargs["enabled_toolsets"]
def test_run_job_per_job_toolsets_win_over_platform_config(self, tmp_path):
"""Per-job enabled_toolsets (via cronjob tool) always take precedence
over the platform-level ``hermes tools`` config."""
job = {
"id": "override-job",
"name": "test",
"prompt": "hello",
"enabled_toolsets": ["terminal"],
}
fake_db, patches = self._make_run_job_patches(tmp_path)
# Even if the user has ``hermes tools`` configured to enable web+file
# for cron, the per-job override wins.
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patch("run_agent.AIAgent") as mock_agent_cls, \
patch(
"hermes_cli.tools_config._get_platform_tools",
return_value={"web", "file"},
):
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
run_job(job)
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["enabled_toolsets"] == ["terminal"]
def test_run_job_empty_response_returns_empty_not_placeholder(self, tmp_path):
"""Empty final_response should stay empty for delivery logic (issue #2234).

View file

@ -1,22 +1,28 @@
"""Regression tests for the TUI gateway's `complete.path` handler.
Reported during the TUI v2 blitz retest: typing `@folder:` (and `@folder`
with no colon yet) still surfaced files alongside directories in the
TUI composer, because the gateway-side completion lives in
`tui_gateway/server.py` and was never touched by the earlier fix to
`hermes_cli/commands.py`.
Reported during the TUI v2 blitz retest:
- typing `@folder:` (and `@folder` with no colon yet) surfaced files
alongside directories the gateway-side completion lives in
`tui_gateway/server.py` and was never touched by the earlier fix to
`hermes_cli/commands.py`.
- typing `@appChrome` required the full `@ui-tui/src/components/app`
path to find the file users expect Cmd-P-style fuzzy basename
matching across the repo, not a strict directory prefix filter.
Covers:
- `@folder:` only yields directories
- `@file:` only yields regular files
- Bare `@folder` / `@file` (no colon) lists cwd directly
- Explicit prefix is preserved in the completion text
- `@<name>` with no slash fuzzy-matches basenames anywhere in the tree
"""
from __future__ import annotations
from pathlib import Path
import pytest
from tui_gateway import server
@ -33,6 +39,15 @@ def _items(word: str):
return [(it["text"], it["display"], it.get("meta", "")) for it in resp["result"]["items"]]
@pytest.fixture(autouse=True)
def _reset_fuzzy_cache(monkeypatch):
# Each test walks a fresh tmp dir; clear the cached listing so prior
# roots can't leak through the TTL window.
server._fuzzy_cache.clear()
yield
server._fuzzy_cache.clear()
def test_at_folder_colon_only_dirs(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
_fixture(tmp_path)
@ -89,3 +104,176 @@ def test_bare_at_still_shows_static_refs(tmp_path, monkeypatch):
for expected in ("@diff", "@staged", "@file:", "@folder:", "@url:", "@git:"):
assert expected in texts, f"missing static ref {expected!r} in {texts!r}"
# ── Fuzzy basename matching ──────────────────────────────────────────────
# Users shouldn't have to know the full path — typing `@appChrome` should
# find `ui-tui/src/components/appChrome.tsx`.
def _nested_fixture(tmp_path: Path):
(tmp_path / "readme.md").write_text("x")
(tmp_path / ".env").write_text("x")
(tmp_path / "ui-tui/src/components").mkdir(parents=True)
(tmp_path / "ui-tui/src/components/appChrome.tsx").write_text("x")
(tmp_path / "ui-tui/src/components/appLayout.tsx").write_text("x")
(tmp_path / "ui-tui/src/components/thinking.tsx").write_text("x")
(tmp_path / "ui-tui/src/hooks").mkdir(parents=True)
(tmp_path / "ui-tui/src/hooks/useCompletion.ts").write_text("x")
(tmp_path / "tui_gateway").mkdir()
(tmp_path / "tui_gateway/server.py").write_text("x")
def test_fuzzy_at_finds_file_without_directory_prefix(tmp_path, monkeypatch):
"""`@appChrome` — with no slash — should surface the nested file."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
entries = _items("@appChrome")
texts = [t for t, _, _ in entries]
assert "@file:ui-tui/src/components/appChrome.tsx" in texts, texts
# Display is the basename, meta is the containing directory, so the
# picker can show `appChrome.tsx ui-tui/src/components` on one row.
row = next(r for r in entries if r[0] == "@file:ui-tui/src/components/appChrome.tsx")
assert row[1] == "appChrome.tsx"
assert row[2] == "ui-tui/src/components"
def test_fuzzy_ranks_exact_before_prefix_before_subseq(tmp_path, monkeypatch):
"""Better matches sort before weaker matches regardless of path depth."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
(tmp_path / "server.py").write_text("x") # exact basename match at root
texts = [t for t, _, _ in _items("@server")]
# Exact `server.py` beats `tui_gateway/server.py` (prefix match) — both
# rank 1 on basename but exact basename wins on the sort key; shorter
# rel path breaks ties.
assert texts[0] == "@file:server.py", texts
assert "@file:tui_gateway/server.py" in texts
def test_fuzzy_camelcase_word_boundary(tmp_path, monkeypatch):
"""Mid-basename camelCase pieces match without substring scanning."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
texts = [t for t, _, _ in _items("@Chrome")]
# `Chrome` starts a camelCase word inside `appChrome.tsx`.
assert "@file:ui-tui/src/components/appChrome.tsx" in texts, texts
def test_fuzzy_subsequence_catches_sparse_queries(tmp_path, monkeypatch):
"""`@uCo` → `useCompletion.ts` via subsequence, last-resort tier."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
texts = [t for t, _, _ in _items("@uCo")]
assert "@file:ui-tui/src/hooks/useCompletion.ts" in texts, texts
def test_fuzzy_at_file_prefix_preserved(tmp_path, monkeypatch):
"""Explicit `@file:` prefix still wins the completion tag."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
texts = [t for t, _, _ in _items("@file:appChrome")]
assert "@file:ui-tui/src/components/appChrome.tsx" in texts, texts
def test_fuzzy_skipped_when_path_has_slash(tmp_path, monkeypatch):
"""Any `/` in the query = user is navigating; keep directory listing."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
texts = [t for t, _, _ in _items("@ui-tui/src/components/app")]
# Directory-listing mode prefixes with `@file:` / `@folder:` per entry.
# It should only surface direct children of the named dir — not the
# nested `useCompletion.ts`.
assert any("appChrome.tsx" in t for t in texts), texts
assert not any("useCompletion.ts" in t for t in texts), texts
def test_fuzzy_skipped_when_folder_tag(tmp_path, monkeypatch):
"""`@folder:<name>` still lists directories — fuzzy scanner only walks
files (git-tracked + untracked), so defer to the dir-listing path."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
texts = [t for t, _, _ in _items("@folder:ui")]
# Root has `ui-tui/` as a directory; the listing branch should surface it.
assert any(t.startswith("@folder:ui-tui") for t in texts), texts
def test_fuzzy_hides_dotfiles_unless_asked(tmp_path, monkeypatch):
"""`.env` doesn't leak into `@env` but does show for `@.env`."""
monkeypatch.chdir(tmp_path)
_nested_fixture(tmp_path)
assert not any(".env" in t for t, _, _ in _items("@env"))
assert any(t.endswith(".env") for t, _, _ in _items("@.env"))
def test_fuzzy_caps_results(tmp_path, monkeypatch):
"""The 30-item cap survives a big tree."""
monkeypatch.chdir(tmp_path)
for i in range(60):
(tmp_path / f"mod_{i:03d}.py").write_text("x")
items = _items("@mod")
assert len(items) == 30
def test_fuzzy_paths_relative_to_cwd_inside_subdir(tmp_path, monkeypatch):
"""When the gateway runs from a subdirectory of a git repo, fuzzy
completion paths must resolve under that cwd not under the repo root.
Without this, `@appChrome` from inside `apps/web/` would suggest
`@file:apps/web/src/foo.tsx` but the agent (resolving from cwd) would
look for `apps/web/apps/web/src/foo.tsx` and fail. We translate every
`git ls-files` result back to a `relpath(root)` and drop anything
outside `root` so the completion contract stays "paths are cwd-relative".
"""
import subprocess
subprocess.run(["git", "init", "-q"], cwd=tmp_path, check=True)
subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=tmp_path, check=True)
subprocess.run(["git", "config", "user.name", "test"], cwd=tmp_path, check=True)
(tmp_path / "apps" / "web" / "src").mkdir(parents=True)
(tmp_path / "apps" / "web" / "src" / "appChrome.tsx").write_text("x")
(tmp_path / "apps" / "api" / "src").mkdir(parents=True)
(tmp_path / "apps" / "api" / "src" / "server.ts").write_text("x")
(tmp_path / "README.md").write_text("x")
subprocess.run(["git", "add", "."], cwd=tmp_path, check=True)
subprocess.run(["git", "commit", "-q", "-m", "init"], cwd=tmp_path, check=True)
# Run from `apps/web/` — completions should be relative to here, and
# files outside this subtree (apps/api, README.md at root) shouldn't
# appear at all.
monkeypatch.chdir(tmp_path / "apps" / "web")
texts = [t for t, _, _ in _items("@appChrome")]
assert "@file:src/appChrome.tsx" in texts, texts
assert not any("apps/web/" in t for t in texts), texts
server._fuzzy_cache.clear()
other_texts = [t for t, _, _ in _items("@server")]
assert not any("server.ts" in t for t in other_texts), other_texts
server._fuzzy_cache.clear()
readme_texts = [t for t, _, _ in _items("@README")]
assert not any("README.md" in t for t in readme_texts), readme_texts

View file

@ -463,7 +463,7 @@ class TestPlatformToolsetConsistency:
gateway_includes = set(TOOLSETS["hermes-gateway"]["includes"])
# Exclude non-messaging platforms from the check
non_messaging = {"cli", "api_server"}
non_messaging = {"cli", "api_server", "cron"}
for platform, meta in PLATFORMS.items():
if platform in non_messaging:
continue

View file

@ -0,0 +1,255 @@
"""Tests for ``hermes_cli.voice`` — the TUI gateway's voice wrapper.
The module is imported *lazily* by ``tui_gateway/server.py`` so that a
box with missing audio deps fails at call time (returning a clean RPC
error) rather than at gateway startup. These tests therefore only
assert the public contract the gateway depends on: the three symbols
exist, ``stop_and_transcribe`` is a no-op when nothing is recording,
and ``speak_text`` tolerates empty input without touching the provider
stack.
"""
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
class TestPublicAPI:
def test_gateway_symbols_importable(self):
"""Match the exact import shape tui_gateway/server.py uses."""
from hermes_cli.voice import (
speak_text,
start_recording,
stop_and_transcribe,
)
assert callable(start_recording)
assert callable(stop_and_transcribe)
assert callable(speak_text)
class TestStopWithoutStart:
def test_returns_none_when_no_recording_active(self, monkeypatch):
"""Idempotent no-op: stop before start must not raise or touch state."""
import hermes_cli.voice as voice
monkeypatch.setattr(voice, "_recorder", None)
assert voice.stop_and_transcribe() is None
class TestSpeakTextGuards:
@pytest.mark.parametrize("text", ["", " ", "\n\t "])
def test_empty_text_is_noop(self, text):
"""Empty / whitespace-only text must return without importing tts_tool
(the gateway spawns a thread per call, so a no-op on empty input
keeps the thread pool from churning on trivial inputs)."""
from hermes_cli.voice import speak_text
# Should simply return None without raising.
assert speak_text(text) is None
class TestContinuousAPI:
"""Continuous (VAD) mode API — CLI-parity loop entry points."""
def test_continuous_exports(self):
from hermes_cli.voice import (
is_continuous_active,
start_continuous,
stop_continuous,
)
assert callable(start_continuous)
assert callable(stop_continuous)
assert callable(is_continuous_active)
def test_not_active_by_default(self, monkeypatch):
import hermes_cli.voice as voice
# Isolate from any state left behind by other tests in the session.
monkeypatch.setattr(voice, "_continuous_active", False)
monkeypatch.setattr(voice, "_continuous_recorder", None)
assert voice.is_continuous_active() is False
def test_stop_continuous_idempotent_when_inactive(self, monkeypatch):
"""stop_continuous must not raise when no loop is active — the
gateway's voice.toggle off path calls it unconditionally."""
import hermes_cli.voice as voice
monkeypatch.setattr(voice, "_continuous_active", False)
monkeypatch.setattr(voice, "_continuous_recorder", None)
# Should return cleanly without exceptions
assert voice.stop_continuous() is None
assert voice.is_continuous_active() is False
def test_double_start_is_idempotent(self, monkeypatch):
"""A second start_continuous while already active is a no-op — prevents
two overlapping capture threads fighting over the microphone when the
UI double-fires (e.g. both /voice on and Ctrl+B within the same tick)."""
import hermes_cli.voice as voice
monkeypatch.setattr(voice, "_continuous_active", True)
called = {"n": 0}
class FakeRecorder:
def start(self, on_silence_stop=None):
called["n"] += 1
def cancel(self):
pass
monkeypatch.setattr(voice, "_continuous_recorder", FakeRecorder())
voice.start_continuous(on_transcript=lambda _t: None)
# The guard inside start_continuous short-circuits before rec.start()
assert called["n"] == 0
class TestContinuousLoopSimulation:
"""End-to-end simulation of the VAD loop with a fake recorder.
Proves auto-restart works: the silence callback must trigger transcribe
on_transcript re-call rec.start(on_silence_stop=same_cb). Also covers
the 3-strikes no-speech halt.
"""
@pytest.fixture
def fake_recorder(self, monkeypatch):
import hermes_cli.voice as voice
# Reset module state between tests.
monkeypatch.setattr(voice, "_continuous_active", False)
monkeypatch.setattr(voice, "_continuous_recorder", None)
monkeypatch.setattr(voice, "_continuous_no_speech_count", 0)
monkeypatch.setattr(voice, "_continuous_on_transcript", None)
monkeypatch.setattr(voice, "_continuous_on_status", None)
monkeypatch.setattr(voice, "_continuous_on_silent_limit", None)
class FakeRecorder:
_silence_threshold = 200
_silence_duration = 3.0
is_recording = False
def __init__(self):
self.start_calls = 0
self.last_callback = None
self.stopped = 0
self.cancelled = 0
# Preset WAV path returned by stop()
self.next_stop_wav = "/tmp/fake.wav"
def start(self, on_silence_stop=None):
self.start_calls += 1
self.last_callback = on_silence_stop
self.is_recording = True
def stop(self):
self.stopped += 1
self.is_recording = False
return self.next_stop_wav
def cancel(self):
self.cancelled += 1
self.is_recording = False
rec = FakeRecorder()
monkeypatch.setattr(voice, "create_audio_recorder", lambda: rec)
# Skip real file ops in the silence callback.
monkeypatch.setattr(voice.os.path, "isfile", lambda _p: False)
return rec
def test_loop_auto_restarts_after_transcript(self, fake_recorder, monkeypatch):
import hermes_cli.voice as voice
monkeypatch.setattr(
voice,
"transcribe_recording",
lambda _p: {"success": True, "transcript": "hello world"},
)
monkeypatch.setattr(voice, "is_whisper_hallucination", lambda _t: False)
transcripts = []
statuses = []
voice.start_continuous(
on_transcript=lambda t: transcripts.append(t),
on_status=lambda s: statuses.append(s),
)
assert fake_recorder.start_calls == 1
assert statuses == ["listening"]
# Simulate AudioRecorder's silence detector firing.
fake_recorder.last_callback()
assert transcripts == ["hello world"]
assert fake_recorder.start_calls == 2 # auto-restarted
assert statuses == ["listening", "transcribing", "listening"]
assert voice.is_continuous_active() is True
voice.stop_continuous()
def test_silent_limit_halts_loop_after_three_strikes(self, fake_recorder, monkeypatch):
import hermes_cli.voice as voice
# Transcription returns no speech — fake_recorder.stop() returns the
# path, but transcribe returns empty text, counting as silence.
monkeypatch.setattr(
voice,
"transcribe_recording",
lambda _p: {"success": True, "transcript": ""},
)
monkeypatch.setattr(voice, "is_whisper_hallucination", lambda _t: False)
transcripts = []
silent_limit_fired = []
voice.start_continuous(
on_transcript=lambda t: transcripts.append(t),
on_silent_limit=lambda: silent_limit_fired.append(True),
)
# Fire silence callback 3 times
for _ in range(3):
fake_recorder.last_callback()
assert transcripts == []
assert silent_limit_fired == [True]
assert voice.is_continuous_active() is False
assert fake_recorder.cancelled >= 1
def test_stop_during_transcription_discards_restart(self, fake_recorder, monkeypatch):
"""User hits Ctrl+B mid-transcription: the in-flight transcript must
still fire (it's a real utterance), but the loop must NOT restart."""
import hermes_cli.voice as voice
stop_triggered = {"flag": False}
def late_transcribe(_p):
# Simulate stop_continuous arriving while we're inside transcribe
voice.stop_continuous()
stop_triggered["flag"] = True
return {"success": True, "transcript": "final word"}
monkeypatch.setattr(voice, "transcribe_recording", late_transcribe)
monkeypatch.setattr(voice, "is_whisper_hallucination", lambda _t: False)
transcripts = []
voice.start_continuous(on_transcript=lambda t: transcripts.append(t))
initial_starts = fake_recorder.start_calls # 1
fake_recorder.last_callback()
assert stop_triggered["flag"] is True
# Loop is stopped — no auto-restart
assert fake_recorder.start_calls == initial_starts
# The in-flight transcript was suppressed because we stopped mid-flight
assert transcripts == []
assert voice.is_continuous_active() is False

View file

@ -1473,3 +1473,207 @@ class TestDiscoverUserThemes:
assert "ok" in names
assert "bad" not in names # malformed YAML
assert len(results) == 1 # only the valid one
class TestNormaliseThemeExtensions:
"""Tests for the extended normaliser fields (assets, customCSS,
componentStyles, layoutVariant) the surfaces themes use to reskin
the dashboard without shipping code."""
def test_layout_variant_defaults_to_standard(self):
from hermes_cli.web_server import _normalise_theme_definition
result = _normalise_theme_definition({"name": "t"})
assert result["layoutVariant"] == "standard"
def test_layout_variant_accepts_known_values(self):
from hermes_cli.web_server import _normalise_theme_definition
for variant in ("standard", "cockpit", "tiled"):
r = _normalise_theme_definition({"name": "t", "layoutVariant": variant})
assert r["layoutVariant"] == variant
def test_layout_variant_rejects_unknown(self):
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({"name": "t", "layoutVariant": "warship"})
assert r["layoutVariant"] == "standard"
r2 = _normalise_theme_definition({"name": "t", "layoutVariant": 12})
assert r2["layoutVariant"] == "standard"
def test_assets_named_slots_passthrough(self):
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({
"name": "t",
"assets": {
"bg": "https://example.com/bg.jpg",
"hero": "linear-gradient(180deg, red, blue)",
"crest": "/ds-assets/crest.svg",
"logo": " ", # whitespace-only — dropped
"notAKnownKey": "ignored",
},
})
assert r["assets"]["bg"] == "https://example.com/bg.jpg"
assert r["assets"]["hero"].startswith("linear-gradient")
assert r["assets"]["crest"] == "/ds-assets/crest.svg"
assert "logo" not in r["assets"] # whitespace-only rejected
assert "notAKnownKey" not in r["assets"] # unknown slot ignored
def test_assets_custom_block(self):
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({
"name": "t",
"assets": {
"custom": {
"scan-lines": "/img/scan.png",
"my_overlay": "/img/ov.png",
"bad key!": "x", # non-alnum key — rejected
"empty": "", # empty value — rejected
},
},
})
assert r["assets"]["custom"] == {
"scan-lines": "/img/scan.png",
"my_overlay": "/img/ov.png",
}
def test_assets_absent_means_no_field(self):
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({"name": "t"})
assert "assets" not in r
def test_custom_css_passthrough_and_capped(self):
from hermes_cli.web_server import _normalise_theme_definition
# Small CSS passes through verbatim.
r = _normalise_theme_definition({
"name": "t",
"customCSS": "body { color: red; }",
})
assert r["customCSS"] == "body { color: red; }"
# 40 KiB of CSS gets clipped to the 32 KiB cap.
huge = "/* x */ " * (40 * 1024 // 8 + 10)
r2 = _normalise_theme_definition({"name": "t", "customCSS": huge})
assert len(r2["customCSS"]) <= 32 * 1024
def test_custom_css_empty_dropped(self):
from hermes_cli.web_server import _normalise_theme_definition
for val in ("", " \n\t", None):
r = _normalise_theme_definition({"name": "t", "customCSS": val})
assert "customCSS" not in r
def test_component_styles_per_bucket(self):
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({
"name": "t",
"componentStyles": {
"card": {
"clipPath": "polygon(0 0, 100% 0, 100% 100%, 0 100%)",
"boxShadow": "inset 0 0 0 1px red",
"bad prop!": "ignored", # non-alnum prop rejected
},
"header": {"background": "linear-gradient(red, blue)"},
"rogueBucket": {"foo": "bar"}, # not a known bucket — rejected
},
})
assert r["componentStyles"]["card"] == {
"clipPath": "polygon(0 0, 100% 0, 100% 100%, 0 100%)",
"boxShadow": "inset 0 0 0 1px red",
}
assert r["componentStyles"]["header"]["background"].startswith("linear-gradient")
assert "rogueBucket" not in r["componentStyles"]
def test_component_styles_empty_buckets_dropped(self):
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({
"name": "t",
"componentStyles": {
"card": {}, # empty — dropped entirely
"header": {"bad prop!": "ignored"}, # all props rejected — bucket dropped
"footer": {"background": "black"},
},
})
assert "card" not in r.get("componentStyles", {})
assert "header" not in r.get("componentStyles", {})
assert r["componentStyles"]["footer"]["background"] == "black"
def test_component_styles_accepts_numeric_values(self):
"""Numeric values (e.g. opacity: 0.8) are coerced to strings."""
from hermes_cli.web_server import _normalise_theme_definition
r = _normalise_theme_definition({
"name": "t",
"componentStyles": {"card": {"opacity": 0.8, "zIndex": 5}},
})
assert r["componentStyles"]["card"] == {"opacity": "0.8", "zIndex": "5"}
class TestDashboardPluginManifestExtensions:
"""Tests for the extended plugin manifest fields (tab.override,
tab.hidden, slots) read by _discover_dashboard_plugins()."""
def _write_plugin(self, tmp_path, name, manifest):
import json
plug_dir = tmp_path / "plugins" / name / "dashboard"
plug_dir.mkdir(parents=True)
(plug_dir / "manifest.json").write_text(json.dumps(manifest))
return plug_dir
def test_override_and_hidden_carried_through(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
self._write_plugin(tmp_path, "skin-home", {
"name": "skin-home",
"label": "Skin Home",
"tab": {"path": "/skin-home", "override": "/", "hidden": True},
"slots": ["sidebar", "header-left"],
"entry": "dist/index.js",
})
from hermes_cli import web_server
# Bust the process-level cache so the test plugin is picked up.
web_server._dashboard_plugins_cache = None
plugins = web_server._get_dashboard_plugins(force_rescan=True)
entry = next(p for p in plugins if p["name"] == "skin-home")
assert entry["tab"]["override"] == "/"
assert entry["tab"]["hidden"] is True
assert entry["slots"] == ["sidebar", "header-left"]
def test_override_requires_leading_slash(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
self._write_plugin(tmp_path, "bad-override", {
"name": "bad-override",
"label": "Bad",
"tab": {"path": "/bad", "override": "no-leading-slash"},
"entry": "dist/index.js",
})
from hermes_cli import web_server
web_server._dashboard_plugins_cache = None
plugins = web_server._get_dashboard_plugins(force_rescan=True)
entry = next(p for p in plugins if p["name"] == "bad-override")
assert "override" not in entry["tab"]
def test_slots_default_empty(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
self._write_plugin(tmp_path, "no-slots", {
"name": "no-slots",
"label": "No Slots",
"tab": {"path": "/no-slots"},
"entry": "dist/index.js",
})
from hermes_cli import web_server
web_server._dashboard_plugins_cache = None
plugins = web_server._get_dashboard_plugins(force_rescan=True)
entry = next(p for p in plugins if p["name"] == "no-slots")
assert entry["slots"] == []
assert "hidden" not in entry["tab"]
assert "override" not in entry["tab"]
def test_slots_filters_non_string_entries(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
self._write_plugin(tmp_path, "mixed-slots", {
"name": "mixed-slots",
"label": "Mixed",
"tab": {"path": "/mixed-slots"},
"slots": ["sidebar", "", 42, None, "header-right"],
"entry": "dist/index.js",
})
from hermes_cli import web_server
web_server._dashboard_plugins_cache = None
plugins = web_server._get_dashboard_plugins(force_rescan=True)
entry = next(p for p in plugins if p["name"] == "mixed-slots")
assert entry["slots"] == ["sidebar", "header-right"]

View file

@ -134,6 +134,31 @@ class TestCoerceValue:
"""A non-numeric string in [number, string] should stay a string."""
assert _coerce_value("hello", ["number", "string"]) == "hello"
def test_array_type_parsed_from_json_string(self):
"""Stringified JSON arrays are parsed into native lists."""
assert _coerce_value('["a", "b"]', "array") == ["a", "b"]
assert _coerce_value("[1, 2, 3]", "array") == [1, 2, 3]
def test_object_type_parsed_from_json_string(self):
"""Stringified JSON objects are parsed into native dicts."""
assert _coerce_value('{"k": "v"}', "object") == {"k": "v"}
assert _coerce_value('{"n": 1}', "object") == {"n": 1}
def test_array_invalid_json_preserved(self):
"""Unparseable strings are returned unchanged."""
assert _coerce_value("not-json", "array") == "not-json"
def test_object_invalid_json_preserved(self):
assert _coerce_value("not-json", "object") == "not-json"
def test_array_type_wrong_shape_preserved(self):
"""A JSON object passed for an 'array' slot is preserved as a string."""
assert _coerce_value('{"k": "v"}', "array") == '{"k": "v"}'
def test_object_type_wrong_shape_preserved(self):
"""A JSON array passed for an 'object' slot is preserved as a string."""
assert _coerce_value('["a"]', "object") == '["a"]'
# ── Full coerce_tool_args with registry ───────────────────────────────────
@ -212,6 +237,32 @@ class TestCoerceToolArgs:
assert result["items"] == [1, 2, 3]
assert result["config"] == {"key": "val"}
def test_coerces_stringified_array_arg(self):
"""Regression for #3947 — MCP servers using z.array() expect lists, not strings."""
schema = self._mock_schema({
"messageIds": {"type": "array", "items": {"type": "string"}},
})
with patch("model_tools.registry.get_schema", return_value=schema):
args = {"messageIds": '["abc", "def"]'}
result = coerce_tool_args("test_tool", args)
assert result["messageIds"] == ["abc", "def"]
def test_coerces_stringified_object_arg(self):
"""Stringified JSON objects get parsed into dicts."""
schema = self._mock_schema({"config": {"type": "object"}})
with patch("model_tools.registry.get_schema", return_value=schema):
args = {"config": '{"max": 50}'}
result = coerce_tool_args("test_tool", args)
assert result["config"] == {"max": 50}
def test_invalid_json_array_preserved_as_string(self):
"""If the string isn't valid JSON, pass it through — let the tool decide."""
schema = self._mock_schema({"items": {"type": "array"}})
with patch("model_tools.registry.get_schema", return_value=schema):
args = {"items": "not-json"}
result = coerce_tool_args("test_tool", args)
assert result["items"] == "not-json"
def test_extra_args_without_schema_left_alone(self):
"""Args not in the schema properties are not touched."""
schema = self._mock_schema({"limit": {"type": "integer"}})

View file

@ -120,6 +120,177 @@ class TestSchemaConversion:
assert schema["parameters"] == {"type": "object", "properties": {}}
def test_definitions_refs_are_rewritten_to_defs(self):
from tools.mcp_tool import _convert_mcp_schema
mcp_tool = _make_mcp_tool(
name="submit",
description="Submit a payload",
input_schema={
"type": "object",
"properties": {
"input": {"$ref": "#/definitions/Payload"},
},
"required": ["input"],
"definitions": {
"Payload": {
"type": "object",
"properties": {
"query": {"type": "string"},
},
"required": ["query"],
}
},
},
)
schema = _convert_mcp_schema("forms", mcp_tool)
assert schema["parameters"]["properties"]["input"]["$ref"] == "#/$defs/Payload"
assert "$defs" in schema["parameters"]
assert "definitions" not in schema["parameters"]
def test_nested_definition_refs_are_rewritten_recursively(self):
from tools.mcp_tool import _convert_mcp_schema
mcp_tool = _make_mcp_tool(
name="nested",
description="Nested schema",
input_schema={
"type": "object",
"properties": {
"items": {
"type": "array",
"items": {"$ref": "#/definitions/Entry"},
},
},
"definitions": {
"Entry": {
"type": "object",
"properties": {
"child": {"$ref": "#/definitions/Child"},
},
},
"Child": {
"type": "object",
"properties": {
"value": {"type": "string"},
},
},
},
},
)
schema = _convert_mcp_schema("forms", mcp_tool)
assert schema["parameters"]["properties"]["items"]["items"]["$ref"] == "#/$defs/Entry"
assert schema["parameters"]["$defs"]["Entry"]["properties"]["child"]["$ref"] == "#/$defs/Child"
def test_missing_type_on_object_is_coerced(self):
"""Schemas that describe an object but omit ``type`` get type='object'."""
from tools.mcp_tool import _normalize_mcp_input_schema
schema = _normalize_mcp_input_schema({
"properties": {"q": {"type": "string"}},
"required": ["q"],
})
assert schema["type"] == "object"
assert schema["properties"]["q"]["type"] == "string"
assert schema["required"] == ["q"]
def test_null_type_on_object_is_coerced(self):
"""type: None should be treated like missing type (common MCP server bug)."""
from tools.mcp_tool import _normalize_mcp_input_schema
schema = _normalize_mcp_input_schema({
"type": None,
"properties": {"x": {"type": "integer"}},
})
assert schema["type"] == "object"
def test_required_pruned_when_property_missing(self):
"""Gemini 400s on required names that don't exist in properties."""
from tools.mcp_tool import _normalize_mcp_input_schema
schema = _normalize_mcp_input_schema({
"type": "object",
"properties": {"a": {"type": "string"}},
"required": ["a", "ghost", "phantom"],
})
assert schema["required"] == ["a"]
def test_required_removed_when_all_names_dangle(self):
from tools.mcp_tool import _normalize_mcp_input_schema
schema = _normalize_mcp_input_schema({
"type": "object",
"properties": {},
"required": ["ghost"],
})
assert "required" not in schema
def test_required_pruning_applies_recursively_inside_nested_objects(self):
"""Nested object schemas also get required pruning."""
from tools.mcp_tool import _normalize_mcp_input_schema
schema = _normalize_mcp_input_schema({
"type": "object",
"properties": {
"filter": {
"type": "object",
"properties": {"field": {"type": "string"}},
"required": ["field", "missing"],
},
},
})
assert schema["properties"]["filter"]["required"] == ["field"]
def test_object_in_array_items_gets_properties_filled(self):
"""Array-item object schemas without properties get an empty dict."""
from tools.mcp_tool import _normalize_mcp_input_schema
schema = _normalize_mcp_input_schema({
"type": "object",
"properties": {
"items": {
"type": "array",
"items": {"type": "object"},
},
},
})
assert schema["properties"]["items"]["items"]["properties"] == {}
def test_convert_mcp_schema_survives_missing_inputschema_attribute(self):
"""A Tool object without .inputSchema must not crash registration."""
import types
from tools.mcp_tool import _convert_mcp_schema
bare_tool = types.SimpleNamespace(name="probe", description="Probe")
schema = _convert_mcp_schema("srv", bare_tool)
assert schema["name"] == "mcp_srv_probe"
assert schema["parameters"] == {"type": "object", "properties": {}}
def test_convert_mcp_schema_with_none_inputschema(self):
"""Tool with inputSchema=None produces a valid empty object schema."""
import types
from tools.mcp_tool import _convert_mcp_schema
# Note: _make_mcp_tool(input_schema=None) falls back to a default —
# build the namespace directly so .inputSchema really is None.
mcp_tool = types.SimpleNamespace(name="probe", description="Probe", inputSchema=None)
schema = _convert_mcp_schema("srv", mcp_tool)
assert schema["parameters"] == {"type": "object", "properties": {}}
def test_tool_name_prefix_format(self):
from tools.mcp_tool import _convert_mcp_schema