feat(lsp): semantic diagnostics from real language servers in write_file/patch (#24168)

* feat(lsp): semantic diagnostics from real language servers in write_file/patch

Wire ~26 language servers (pyright, gopls, rust-analyzer, typescript-language-server,
clangd, bash-language-server, ...) into the post-write lint check used by write_file
and patch. The model now sees type errors, undefined names, missing imports, and
project-wide semantic issues introduced by its edits, not just syntax errors.

LSP is gated on git workspace detection: when the agent's cwd or the file being
edited is inside a git worktree, LSP runs against that workspace; otherwise the
existing in-process syntax checks are the only tier. This keeps users on
user-home cwds (Telegram/Discord gateway chats) from spawning daemons.

The post-write check is layered: in-process syntax check first (microseconds),
then LSP semantic diagnostics second when syntax is clean. Diagnostics are
delta-filtered against a baseline captured at write start, so the agent only
sees errors its edit introduced. A flaky/missing language server can never
break a write -- every LSP failure path falls back silently to the syntax-only
result.

New module agent/lsp/ split into:

- protocol.py: Content-Length JSON-RPC framer + envelope helpers
- client.py: async LSPClient (spawn, initialize, didOpen/didChange,
  ContentModified retry, push/pull diagnostic stores)
- workspace.py: git worktree walk-up + per-server NearestRoot resolver
- servers.py: registry of 26 language servers (extension match,
  root resolver, spawn builder per language)
- install.py: auto-install dispatch (npm install --prefix, go install
  with GOBIN, pip install --target) into HERMES_HOME/lsp/bin/
- manager.py: LSPService (per-(server_id, root) client registry, lazy
  spawn, broken-set, in-flight dedupe, sync facade for tools layer)
- reporter.py: <diagnostics> block formatter (severity-1-only, 20-per-file)
- cli.py: hermes lsp {status,list,install,install-all,restart,which}

Wired into tools/file_operations.py:

- write_file/patch_replace now call _snapshot_lsp_baseline before write
- _check_lint_delta gains a third tier: LSP semantic diagnostics when
  syntax is clean
- All LSP code paths swallow exceptions; write_file's contract unchanged

Config: 'lsp' section in DEFAULT_CONFIG with enabled (default true),
wait_mode, wait_timeout, install_strategy (default 'auto'), and per-server
overrides (disabled, command, env, initialization_options).

Tests: tests/agent/lsp/ -- 49 tests covering protocol framing (encode and
read_message round-trip, EOF/truncation/missing Content-Length), workspace
gate (git walk-up, exclude markers, fallback to file location), reporter
(severity filter, max-per-file cap, truncation), service-level delta filter,
and an in-process mock LSP server that exercises the full client lifecycle
including didChange version bumps, dedup, crash recovery, and idempotent
teardown.

Live E2E verified end-to-end through ShellFileOperations: pyright
auto-installed via npm into HERMES_HOME, baseline captured, type error
introduced, single delta diagnostic surfaced with correct line/column/code/
source, then patch fix removes the diagnostic from the output.

Docs: new website/docs/user-guide/features/lsp.md page covering supported
languages, configuration knobs, performance characteristics, and
troubleshooting; cli-commands.md updated with the 'hermes lsp' reference;
sidebar updated.

* feat(lsp): structured logging, backend gate, defensive walk caps

Cherry-picks the substantive ideas from #24155 (different scope, same
problem space) onto our PR.

agent/lsp/eventlog.py (new): dedicated structured logger
``hermes.lint.lsp`` with steady-state silence. Module-level dedup sets
keep a 1000-write session at exactly ONE INFO line ("active for
<root>") at the default INFO threshold; clean writes log at DEBUG so
they never reach agent.log under normal config. State transitions
(server starts, no project root for a file, server unavailable) fire
at INFO/WARNING once per (server_id, key); novel events (timeouts,
unexpected errors) fire WARNING per call. Grep recipe: ``rg 'lsp\\['``.

agent/lsp/manager.py: wire the eventlog into _get_or_spawn and
get_diagnostics_sync so users can answer "did LSP fire on this edit?"
with a single grep, plus surface "binary not on PATH" warnings once
instead of silently retrying every write.

tools/file_operations.py: backend-type gate. ``_lsp_local_only()``
returns False for non-local backends (Docker / Modal / SSH /
Daytona); ``_snapshot_lsp_baseline`` and ``_maybe_lsp_diagnostics``
now skip entirely on remote envs. The host-side language server
can't see files inside a sandbox, so this prevents pretending to
lint a file the host process can't open.

agent/lsp/protocol.py: 8 KiB cap on the header block in
``read_message``. A pathological server that streams headers
without ever emitting CRLF-CRLF would have looped forever consuming
bytes; now raises ``LSPProtocolError`` instead.

agent/lsp/workspace.py: 64-step cap on ``find_git_worktree`` and
``nearest_root`` upward walks, plus try/except containment around
``Path(...).resolve()`` and child ``.exists()`` calls. Defensive
against pathological inputs (symlink loops, encoding errors,
permission failures mid-walk) — the lint hook is hot-path code and
must never raise.

Tests:
- tests/agent/lsp/test_eventlog.py: 18 tests covering steady-state
  silence (clean writes stay DEBUG), state-transition INFO-once
  semantics (active for, no project root), action-required
  WARNING-once (server unavailable), per-call WARNING (timeouts,
  spawn failures), and the "1000 clean writes => 1 INFO" contract.
- tests/agent/lsp/test_backend_gate.py: 5 tests verifying
  _lsp_local_only / snapshot_baseline / maybe_lsp_diagnostics skip
  the LSP layer for non-local backends and route correctly for
  LocalEnvironment.
- tests/agent/lsp/test_protocol.py: new test_read_message_rejects_runaway_header
  exercising the 8 KiB cap.

Validation:
- 73/73 LSP tests pass (49 original + 18 eventlog + 5 backend-gate + 1 framer cap)
- 198/198 pass when run alongside existing file_operations tests
- Live E2E re-run with pyright still surfaces "ERROR [2:12] Type
  ... reportReturnType (Pyright)" through the full path, then patch
  fix removes it on the next call.

* feat(lsp): atexit cleanup + separate lsp_diagnostics JSON field

Two improvements salvaged from #24414's plugin-form alternative,
keeping our core-integrated design:

1. atexit cleanup of spawned language servers
   ----------------------------------------------------------------
   ``agent/lsp/__init__.get_service`` now registers an ``atexit``
   handler on first creation that tears down the LSPService on
   Python exit.  Without this, every ``hermes chat`` exit was
   leaking pyright/gopls/etc. processes for a few seconds while
   their stdout buffers drained -- they got reaped by the kernel
   eventually but a watchful ``ps aux`` would catch them.

   The handler runs once per process (gated by
   ``_atexit_registered``); idempotent ``shutdown_service``
   ensures double-fire is a no-op.  Errors during shutdown are
   swallowed at debug level since by the time atexit fires the
   user has already seen the agent's final response.

2. Separate ``lsp_diagnostics`` field on WriteResult / PatchResult
   ----------------------------------------------------------------
   Previously the LSP layer folded its diagnostic block into the
   ``lint.output`` string, conflating the syntax-check tier with
   the semantic tier.  The agent (and any downstream parsers) now
   read syntax errors and semantic errors as independent signals:

       {
         "bytes_written": 42,
         "lint": {"status": "ok", "output": ""},
         "lsp_diagnostics": "<diagnostics file=...>\nERROR [2:12] ..."
       }

   ``_check_lint_delta`` returns to its original two-tier shape
   (syntax check + delta filter); ``write_file`` and
   ``patch_replace`` independently fetch LSP diagnostics via
   ``_maybe_lsp_diagnostics`` and pass them into the new field.
   ``patch_replace`` propagates the inner write_file's
   ``lsp_diagnostics`` so the outer PatchResult carries the patch's
   delta correctly.

Tests: 19 new
- tests/agent/lsp/test_lifecycle.py (8 tests): atexit registration
  fires once and only once across N get_service calls; the
  registered callable is our internal shutdown wrapper;
  shutdown_service is idempotent and safe when never started;
  exceptions during shutdown are swallowed; inactive service is
  cached so we don't rebuild on every check.
- tests/agent/lsp/test_diagnostics_field.py (11 tests): WriteResult
  / PatchResult dataclass shape, to_dict include/omit semantics,
  channel separation (lint and lsp_diagnostics carry independent
  signals), write_file populates the field via
  _maybe_lsp_diagnostics only when the syntax tier is clean,
  patch_replace propagates the field forward from its internal
  write_file.

Validation:
- 92/92 LSP tests pass (73 prior + 8 lifecycle + 11 diagnostics field)
- 217/217 pass with file_operations + LSP combined
- Live E2E reverified: clean writes -> both fields empty/none; type
  error introduced -> lint clean (parses), lsp_diagnostics carries
  the pyright reportReturnType block; patch fix -> both fields
  clean again.

* fix(lsp): broken-set short-circuit so a wedged server isn't paid every write

Discovered while auditing failure paths: a language server binary that
hangs (sleep forever, no LSP traffic on stdin/stdout) caused EVERY
subsequent write to re-pay the 8s snapshot_baseline timeout. Five
writes = ~64s of dead time.

The bug: ``_get_or_spawn`` adds the (server_id, root) pair to
``_broken`` inside its inner exception handler, but when the OUTER
``_loop.run`` timeout fires, it cancels the inner task before that
handler runs. The pair never makes it to broken-set, so the next
write re-enters the spawn path and re-pays the timeout.

Fix:

- New ``_mark_broken_for_file`` helper at the service layer marks
  the (server_id, workspace_root) pair broken from the OUTSIDE when
  the outer timeout fires. Called from the except branches in
  ``snapshot_baseline``, ``get_diagnostics_sync`` (asyncio.TimeoutError
  + generic Exception). Also kills any orphan client process that
  survived the cancelled future, fire-and-forget with a 1s ceiling.

- ``enabled_for`` now consults the broken-set BEFORE returning True.
  Files in already-broken (server_id, root) pairs short-circuit to
  False, so the file_operations layer skips the LSP path entirely
  with no spawn cost. Until the service is restarted (``hermes lsp
  restart``) or the process exits.

- A single eventlog WARNING is emitted on first mark-broken so the
  user knows which server gave up. Subsequent edits in the same
  project stay silent.

Tests: 7 new in tests/agent/lsp/test_broken_set.py — covers the
key shape (server_id, per_server_root), enabled_for short-circuit,
sibling-file skip in same project, project isolation (broken in
A doesn't affect B), graceful no-op for missing-server / no-workspace,
and an end-to-end test that snapshots after a failure and verifies
the next ``enabled_for`` returns False.

Validation:

- Live retest of the wedged-binary scenario: 5 sequential writes,
  first 8.88s (the one snapshot timeout), subsequent four ~0.84s
  (no LSP cost). Down from 5x12.85s = 64s before this fix.
- 99/99 LSP tests pass (92 prior + 7 broken-set)
- 224/224 pass with file_operations + LSP combined
- Happy path E2E reverified — clean write, type error introduced,
  patch fix all behave correctly with the new broken-set logic.

Note: the FIRST write to a wedged binary still pays 8s (the
snapshot_baseline timeout). We could shorten that, but pyright/
tsserver normally take 2-3s and slow CI rust-analyzer can need
5+ seconds, so 8s is the conservative ceiling. Subsequent writes
are instant.
This commit is contained in:
Teknium 2026-05-12 16:31:54 -07:00 committed by GitHub
parent d89553c2d6
commit 83b93898c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 6144 additions and 17 deletions

View file

@ -0,0 +1 @@
"""Pytest helpers for LSP-related tests."""

View file

@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""A minimal in-process LSP server used by tests.
Speaks just enough LSP to drive :class:`agent.lsp.client.LSPClient`
through a full lifecycle: ``initialize``, ``initialized``,
``textDocument/didOpen``, ``textDocument/didChange``, then a
``textDocument/publishDiagnostics`` notification followed by
``shutdown`` + ``exit``.
Behaviour (all behaviours selectable via env var ``MOCK_LSP_SCRIPT``):
- ``"clean"`` initialize, accept didOpen/didChange, push empty
diagnostics on every open/change, exit cleanly on shutdown.
- ``"errors"`` same as ``clean`` but the published diagnostics
carry one severity-1 entry pointing at line 0:0.
- ``"crash"`` exit immediately after responding to ``initialize``
(simulates a crashing server).
- ``"slow"`` same as ``clean`` but sleeps 1s before responding to
``initialize`` (lets us test timeout behaviour).
The script writes JSON-RPC framed messages to stdout and reads from
stdin. No third-party dependencies uses only stdlib so it runs
under whatever Python the test process picks up.
"""
from __future__ import annotations
import json
import os
import sys
import time
def read_message():
"""Read one Content-Length framed JSON-RPC message from stdin."""
headers = {}
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
line = line.rstrip(b"\r\n")
if not line:
break
k, _, v = line.decode("ascii").partition(":")
headers[k.strip().lower()] = v.strip()
n = int(headers["content-length"])
body = sys.stdin.buffer.read(n)
return json.loads(body.decode("utf-8"))
def write_message(obj):
body = json.dumps(obj, separators=(",", ":")).encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {len(body)}\r\n\r\n".encode("ascii"))
sys.stdout.buffer.write(body)
sys.stdout.buffer.flush()
def main():
script = os.environ.get("MOCK_LSP_SCRIPT", "clean")
while True:
msg = read_message()
if msg is None:
return 0
if "id" in msg and msg.get("method") == "initialize":
if script == "slow":
time.sleep(1.0)
write_message(
{
"jsonrpc": "2.0",
"id": msg["id"],
"result": {
"capabilities": {
"textDocumentSync": 1, # Full
"diagnosticProvider": {"interFileDependencies": False, "workspaceDiagnostics": False},
},
"serverInfo": {"name": "mock-lsp", "version": "0.1"},
},
}
)
if script == "crash":
return 0
continue
if msg.get("method") == "initialized":
continue
if msg.get("method") == "workspace/didChangeConfiguration":
continue
if msg.get("method") == "workspace/didChangeWatchedFiles":
continue
if msg.get("method") in ("textDocument/didOpen", "textDocument/didChange"):
params = msg.get("params") or {}
td = params.get("textDocument") or {}
uri = td.get("uri", "")
version = td.get("version", 0)
diagnostics = []
if script == "errors":
diagnostics = [
{
"range": {
"start": {"line": 0, "character": 0},
"end": {"line": 0, "character": 5},
},
"severity": 1,
"code": "MOCK001",
"source": "mock-lsp",
"message": "synthetic error from mock-lsp",
}
]
write_message(
{
"jsonrpc": "2.0",
"method": "textDocument/publishDiagnostics",
"params": {
"uri": uri,
"version": version,
"diagnostics": diagnostics,
},
}
)
continue
if msg.get("method") == "textDocument/diagnostic":
# Pull endpoint — return empty.
write_message(
{
"jsonrpc": "2.0",
"id": msg["id"],
"result": {"kind": "full", "items": []},
}
)
continue
if msg.get("method") == "textDocument/didSave":
continue
if msg.get("method") == "shutdown":
write_message({"jsonrpc": "2.0", "id": msg["id"], "result": None})
continue
if msg.get("method") == "exit":
return 0
# Unknown request: respond with method-not-found.
if "id" in msg:
write_message(
{
"jsonrpc": "2.0",
"id": msg["id"],
"error": {"code": -32601, "message": f"method not found: {msg.get('method')}"},
}
)
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,108 @@
"""Integration test: LSP layer is skipped on non-local backends.
The host-side LSP server can't see files inside a Docker/Modal/SSH
sandbox. When the agent's terminal env isn't ``LocalEnvironment``,
the file_operations layer must skip both ``snapshot_baseline`` and
``get_diagnostics_sync`` calls falling back to the in-process
syntax check exactly as if LSP were disabled.
"""
from __future__ import annotations
import os
import sys
from unittest.mock import MagicMock
import pytest
from agent.lsp import eventlog
@pytest.fixture(autouse=True)
def _reset():
eventlog.reset_announce_caches()
def test_local_only_helper_returns_true_for_local_env():
from tools.environments.local import LocalEnvironment
from tools.file_operations import ShellFileOperations
fops = ShellFileOperations(LocalEnvironment(cwd="/tmp"))
assert fops._lsp_local_only() is True
def test_local_only_helper_returns_false_for_non_local_env():
"""A mocked non-local env (Docker/Modal/SSH stand-in) returns False."""
from tools.file_operations import ShellFileOperations
# Build something that's NOT a LocalEnvironment. We use a bare
# MagicMock — isinstance() against LocalEnvironment is False.
fake_env = MagicMock()
fake_env.execute = MagicMock(return_value=MagicMock(exit_code=0, stdout=""))
fake_env.cwd = "/sandbox"
fops = ShellFileOperations(fake_env)
assert fops._lsp_local_only() is False
def test_snapshot_baseline_skipped_for_non_local(monkeypatch):
"""Verify the LSP service's snapshot_baseline is NOT called when
the backend isn't local."""
from tools.file_operations import ShellFileOperations
fake_env = MagicMock()
fake_env.execute = MagicMock(return_value=MagicMock(exit_code=0, stdout=""))
fake_env.cwd = "/sandbox"
fops = ShellFileOperations(fake_env)
snapshot_called = []
class FakeService:
def snapshot_baseline(self, path):
snapshot_called.append(path)
monkeypatch.setattr("agent.lsp.get_service", lambda: FakeService())
fops._snapshot_lsp_baseline("/sandbox/x.py")
assert snapshot_called == [], "snapshot must be skipped for non-local backends"
def test_maybe_lsp_diagnostics_returns_empty_for_non_local(monkeypatch):
from tools.file_operations import ShellFileOperations
fake_env = MagicMock()
fake_env.execute = MagicMock(return_value=MagicMock(exit_code=0, stdout=""))
fake_env.cwd = "/sandbox"
fops = ShellFileOperations(fake_env)
called = []
class FakeService:
def enabled_for(self, path):
called.append(("enabled_for", path))
return True
def get_diagnostics_sync(self, path, **kw):
called.append(("get_diagnostics_sync", path))
return [{"severity": 1, "message": "should not see this"}]
monkeypatch.setattr("agent.lsp.get_service", lambda: FakeService())
result = fops._maybe_lsp_diagnostics("/sandbox/x.py")
assert result == ""
assert called == [], "service must not be queried for non-local backends"
def test_snapshot_baseline_called_for_local_env(tmp_path, monkeypatch):
from tools.environments.local import LocalEnvironment
from tools.file_operations import ShellFileOperations
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
snapshot_called = []
class FakeService:
def snapshot_baseline(self, path):
snapshot_called.append(path)
monkeypatch.setattr("agent.lsp.get_service", lambda: FakeService())
fops._snapshot_lsp_baseline(str(tmp_path / "x.py"))
assert snapshot_called == [str(tmp_path / "x.py")]

View file

@ -0,0 +1,213 @@
"""Tests for the broken-set short-circuit added to handle outer-timeout failures.
When ``snapshot_baseline`` or ``get_diagnostics_sync`` time out from the
service layer (because a language server hangs during initialize, or
the binary is wedged), the inner spawn task is cancelled but the
inner exception handler that adds to ``_broken`` never runs. Without
the service-layer fallback added in this module, every subsequent
edit re-pays the full timeout cost until the process exits.
This module verifies:
- ``_mark_broken_for_file`` adds the right key
- ``enabled_for`` short-circuits on broken keys
- a missing binary is broken-set'd after one snapshot attempt
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from agent.lsp.manager import LSPService
from agent.lsp.servers import SERVERS, ServerContext, ServerDef, SpawnSpec
from agent.lsp.workspace import clear_cache
@pytest.fixture(autouse=True)
def _clear_workspace_cache():
clear_cache()
yield
clear_cache()
def _make_git_workspace(tmp_path: Path) -> Path:
"""Build a minimal git repo with a pyproject so pyright's root resolver fires."""
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").mkdir()
(repo / "pyproject.toml").write_text("[project]\nname='t'\n")
return repo
def test_mark_broken_for_file_adds_correct_key(tmp_path, monkeypatch):
"""``_mark_broken_for_file`` keys the broken-set on
(server_id, per_server_root) so subsequent ``enabled_for`` calls
for files in the same project skip immediately."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
src = repo / "x.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(src), RuntimeError("simulated"))
# The pyright server resolves to the repo root via pyproject.toml.
assert ("pyright", str(repo)) in svc._broken
finally:
svc.shutdown()
def test_enabled_for_returns_false_after_broken(tmp_path, monkeypatch):
"""Once a (server_id, root) pair is in the broken-set,
``enabled_for`` returns False so the file_operations layer skips
the LSP path entirely."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
src = repo / "x.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
# Initially enabled.
assert svc.enabled_for(str(src)) is True
# Mark broken.
svc._mark_broken_for_file(str(src), RuntimeError("simulated"))
# Now disabled — the broken-set short-circuits.
assert svc.enabled_for(str(src)) is False
finally:
svc.shutdown()
def test_enabled_for_other_file_in_same_project_also_skipped(tmp_path, monkeypatch):
"""The broken key is (server_id, root), so ALL files routed through
the same server in the same project are skipped not just the one
that triggered the failure."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
a = repo / "a.py"
a.write_text("")
b = repo / "b.py"
b.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(a), RuntimeError("simulated"))
# Both files in the same project skip pyright now.
assert svc.enabled_for(str(a)) is False
assert svc.enabled_for(str(b)) is False
finally:
svc.shutdown()
def test_unrelated_project_not_affected_by_broken(tmp_path, monkeypatch):
"""Marking pyright broken for project A must NOT affect project B."""
repo_a = _make_git_workspace(tmp_path)
repo_b = tmp_path / "repo-b"
repo_b.mkdir()
(repo_b / ".git").mkdir()
(repo_b / "pyproject.toml").write_text("[project]\nname='b'\n")
a_src = repo_a / "x.py"
a_src.write_text("")
b_src = repo_b / "x.py"
b_src.write_text("")
monkeypatch.chdir(str(repo_a))
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(a_src), RuntimeError("simulated"))
# Project A skipped.
assert svc.enabled_for(str(a_src)) is False
# Project B still enabled — the broken key is per-project.
monkeypatch.chdir(str(repo_b))
assert svc.enabled_for(str(b_src)) is True
finally:
svc.shutdown()
def test_mark_broken_handles_missing_server_silently(tmp_path):
"""If the file extension doesn't match any registered server,
``_mark_broken_for_file`` no-ops nothing to mark."""
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
# No registered server for .xyz; must not raise.
svc._mark_broken_for_file(str(tmp_path / "weird.xyz"), RuntimeError("x"))
assert len(svc._broken) == 0
finally:
svc.shutdown()
def test_mark_broken_handles_no_workspace_silently(tmp_path):
"""File outside any git worktree → no workspace → no key to add."""
src = tmp_path / "orphan.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(src), RuntimeError("x"))
assert len(svc._broken) == 0
finally:
svc.shutdown()
def test_snapshot_failure_marks_broken_via_outer_timeout(tmp_path, monkeypatch):
"""End-to-end: ``snapshot_baseline``'s outer ``_loop.run`` timeout
triggers ``_mark_broken_for_file``, so a second call to
``enabled_for`` returns False."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
src = repo / "x.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
# Force the inner snapshot coroutine to raise.
async def boom(_path):
raise RuntimeError("outer-timeout simulated")
with patch.object(svc, "_snapshot_async", boom):
assert svc.enabled_for(str(src)) is True
svc.snapshot_baseline(str(src))
# After the failure, the file's pair is in the broken-set and
# ``enabled_for`` skips it.
assert ("pyright", str(repo)) in svc._broken
assert svc.enabled_for(str(src)) is False
finally:
svc.shutdown()

View file

@ -0,0 +1,143 @@
"""End-to-end client tests against the in-process mock LSP server.
Spins up :file:`_mock_lsp_server.py` as an actual subprocess, drives
it through real LSP traffic, and asserts diagnostic flow. This is
the closest thing we have to integration coverage without requiring
pyright/gopls/etc. to be installed in CI.
"""
from __future__ import annotations
import asyncio
import os
import sys
from pathlib import Path
import pytest
from agent.lsp.client import LSPClient
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
def _client(workspace: Path, script: str = "clean") -> LSPClient:
env = {"MOCK_LSP_SCRIPT": script, "PYTHONPATH": os.environ.get("PYTHONPATH", "")}
return LSPClient(
server_id=f"mock-{script}",
workspace_root=str(workspace),
command=[sys.executable, MOCK_SERVER],
env=env,
cwd=str(workspace),
)
@pytest.mark.asyncio
async def test_client_lifecycle_clean(tmp_path: Path):
"""Full lifecycle: spawn, initialize, open, get clean diagnostics, shutdown."""
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "clean")
await client.start()
try:
assert client.is_running
version = await client.open_file(str(f), language_id="python")
assert version == 0
await client.wait_for_diagnostics(str(f), version, mode="document")
diags = client.diagnostics_for(str(f))
assert diags == []
finally:
await client.shutdown()
assert not client.is_running
@pytest.mark.asyncio
async def test_client_receives_published_errors(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "errors")
await client.start()
try:
version = await client.open_file(str(f), language_id="python")
await client.wait_for_diagnostics(str(f), version, mode="document")
diags = client.diagnostics_for(str(f))
assert len(diags) == 1
d = diags[0]
assert d["severity"] == 1
assert d["code"] == "MOCK001"
assert d["source"] == "mock-lsp"
assert "synthetic error" in d["message"]
finally:
await client.shutdown()
@pytest.mark.asyncio
async def test_client_didchange_bumps_version(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "errors")
await client.start()
try:
v0 = await client.open_file(str(f), language_id="python")
f.write_text("print('hi 2')\n")
v1 = await client.open_file(str(f), language_id="python") # re-open path = didChange
assert v1 == v0 + 1
await client.wait_for_diagnostics(str(f), v1, mode="document")
# Mock pushed a diagnostic for both events; merged view has one
# entry (push store keyed by file path).
diags = client.diagnostics_for(str(f))
assert len(diags) == 1
finally:
await client.shutdown()
@pytest.mark.asyncio
async def test_client_handles_crashing_server(tmp_path: Path):
"""When the server exits right after initialize, subsequent requests
fail gracefully (not hang)."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "crash")
await client.start() # should succeed (mock answers initialize before crashing)
# Give the OS a moment to deliver the EOF.
await asyncio.sleep(0.2)
# The reader loop should detect EOF and mark pending requests as failed.
try:
await asyncio.wait_for(
client.open_file(str(f), language_id="python"), timeout=2.0
)
except Exception:
pass # any exception is acceptable; the contract is "doesn't hang"
await client.shutdown()
@pytest.mark.asyncio
async def test_client_shutdown_idempotent(tmp_path: Path):
"""Calling shutdown twice must be safe."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "clean")
await client.start()
await client.shutdown()
await client.shutdown() # must not raise
@pytest.mark.asyncio
async def test_client_diagnostics_are_deduped(tmp_path: Path):
"""Repeated identical pushes must not produce duplicate diagnostics."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "errors")
await client.start()
try:
for _ in range(3):
v = await client.open_file(str(f), language_id="python")
await client.wait_for_diagnostics(str(f), v, mode="document")
diags = client.diagnostics_for(str(f))
# Push store overwrites on every notification — should have 1.
assert len(diags) == 1
finally:
await client.shutdown()

View file

@ -0,0 +1,146 @@
"""Tests for the ``lsp_diagnostics`` field on WriteResult / PatchResult.
The field exists so the agent can read syntax errors (``lint``) and
semantic errors (``lsp_diagnostics``) as separate signals rather than
having LSP output prepended to the lint string.
"""
from __future__ import annotations
import os
import sys
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from tools.environments.local import LocalEnvironment
from tools.file_operations import (
PatchResult,
ShellFileOperations,
WriteResult,
)
# ---------------------------------------------------------------------------
# Dataclass shape
# ---------------------------------------------------------------------------
def test_writeresult_lsp_diagnostics_optional():
r = WriteResult()
assert r.lsp_diagnostics is None
def test_writeresult_to_dict_omits_field_when_none():
r = WriteResult(bytes_written=10)
assert "lsp_diagnostics" not in r.to_dict()
def test_writeresult_to_dict_includes_field_when_set():
r = WriteResult(bytes_written=10, lsp_diagnostics="<diagnostics>...</diagnostics>")
d = r.to_dict()
assert d["lsp_diagnostics"] == "<diagnostics>...</diagnostics>"
def test_patchresult_to_dict_includes_field_when_set():
r = PatchResult(success=True, lsp_diagnostics="ERROR [1:1] thing")
d = r.to_dict()
assert d["lsp_diagnostics"] == "ERROR [1:1] thing"
def test_patchresult_to_dict_omits_field_when_none():
r = PatchResult(success=True)
assert "lsp_diagnostics" not in r.to_dict()
def test_patchresult_to_dict_omits_field_when_empty_string():
"""Empty string counts as falsy — agent shouldn't see an empty field."""
r = PatchResult(success=True, lsp_diagnostics="")
assert "lsp_diagnostics" not in r.to_dict()
# ---------------------------------------------------------------------------
# Channel separation: lint and lsp_diagnostics stay independent
# ---------------------------------------------------------------------------
def test_lint_and_lsp_diagnostics_are_separate_channels():
"""A WriteResult can carry BOTH a syntax-error lint AND an LSP
diagnostic block. They belong in separate fields."""
r = WriteResult(
bytes_written=42,
lint={"status": "error", "output": "SyntaxError: ..."},
lsp_diagnostics="<diagnostics>ERROR [1:5] type mismatch</diagnostics>",
)
d = r.to_dict()
assert "lint" in d
assert "lsp_diagnostics" in d
assert d["lint"]["output"] == "SyntaxError: ..."
assert "type mismatch" in d["lsp_diagnostics"]
# ---------------------------------------------------------------------------
# write_file populates the field via _maybe_lsp_diagnostics
# ---------------------------------------------------------------------------
def test_write_file_populates_lsp_diagnostics_when_layer_returns_block(tmp_path):
"""When the LSP layer returns a non-empty block, write_file puts it
into the ``lsp_diagnostics`` field NOT into ``lint.output``."""
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "x.py"
block = "<diagnostics file=\"x.py\">\nERROR [1:1] problem\n</diagnostics>"
with patch.object(fops, "_maybe_lsp_diagnostics", return_value=block):
res = fops.write_file(str(target), "x = 1\n")
assert res.lsp_diagnostics == block
# Lint is the syntax check, which is clean for "x = 1" — must NOT
# have the LSP block folded into it.
assert res.lint == {"status": "ok", "output": ""}
def test_write_file_lsp_diagnostics_none_when_layer_returns_empty(tmp_path):
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "x.py"
with patch.object(fops, "_maybe_lsp_diagnostics", return_value=""):
res = fops.write_file(str(target), "x = 1\n")
assert res.lsp_diagnostics is None
def test_write_file_skips_lsp_when_syntax_failed(tmp_path):
"""If the syntax check finds errors, the LSP layer should not be
consulted (a file that won't parse won't yield meaningful semantic
diagnostics)."""
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "broken.py"
with patch.object(fops, "_maybe_lsp_diagnostics") as mock_lsp:
res = fops.write_file(str(target), "def x(:\n") # syntax error
assert mock_lsp.call_count == 0
assert res.lsp_diagnostics is None
assert res.lint["status"] == "error"
# ---------------------------------------------------------------------------
# patch_replace propagates the field from the inner write_file
# ---------------------------------------------------------------------------
def test_patch_replace_propagates_lsp_diagnostics(tmp_path):
"""patch_replace's internal write_file populates lsp_diagnostics —
the outer PatchResult must carry it forward."""
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "x.py"
target.write_text("x = 1\n")
block = "<diagnostics>ERROR [1:5] semantic issue</diagnostics>"
with patch.object(fops, "_maybe_lsp_diagnostics", return_value=block):
res = fops.patch_replace(str(target), "x = 1", "x = 2")
assert res.success is True
assert res.lsp_diagnostics == block

View file

@ -0,0 +1,199 @@
"""Tests for the structured logging dedup model.
The contract: a 1000-write session in one project should emit exactly
ONE INFO line ("active for <root>") at the default INFO threshold.
Steady-state events stay at DEBUG; first-time-seen events surface
once at INFO/WARNING.
"""
from __future__ import annotations
import logging
import pytest
from agent.lsp import eventlog
@pytest.fixture(autouse=True)
def _reset():
eventlog.reset_announce_caches()
yield
eventlog.reset_announce_caches()
@pytest.fixture
def caplog_lsp(caplog):
caplog.set_level(logging.DEBUG, logger="hermes.lint.lsp")
return caplog
# ---------------------------------------------------------------------------
# Steady-state silence (DEBUG)
# ---------------------------------------------------------------------------
def test_clean_emits_at_debug(caplog_lsp):
for _ in range(10):
eventlog.log_clean("pyright", "/proj/x.py")
info_records = [r for r in caplog_lsp.records if r.levelno >= logging.INFO]
debug_records = [r for r in caplog_lsp.records if r.levelno == logging.DEBUG]
assert info_records == []
assert len(debug_records) == 10
def test_disabled_emits_at_debug(caplog_lsp):
eventlog.log_disabled("pyright", "/x.py", "feature off")
eventlog.log_disabled("pyright", "/x.py", "ext not mapped")
assert all(r.levelno == logging.DEBUG for r in caplog_lsp.records)
# ---------------------------------------------------------------------------
# State transitions: INFO once, DEBUG thereafter
# ---------------------------------------------------------------------------
def test_active_for_fires_once_per_root(caplog_lsp):
for _ in range(50):
eventlog.log_active("pyright", "/proj")
info_records = [
r for r in caplog_lsp.records
if r.levelno == logging.INFO and "active for" in r.getMessage()
]
assert len(info_records) == 1
def test_active_for_fires_per_distinct_root(caplog_lsp):
eventlog.log_active("pyright", "/proj-a")
eventlog.log_active("pyright", "/proj-b")
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 2
def test_active_for_separate_per_server(caplog_lsp):
eventlog.log_active("pyright", "/proj")
eventlog.log_active("typescript", "/proj")
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 2
def test_no_project_root_fires_once_per_path(caplog_lsp):
for _ in range(5):
eventlog.log_no_project_root("pyright", "/orphan.py")
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 1
# ---------------------------------------------------------------------------
# Diagnostics events fire INFO every time
# ---------------------------------------------------------------------------
def test_diagnostics_always_info(caplog_lsp):
for i in range(5):
eventlog.log_diagnostics("pyright", f"/x{i}.py", 1)
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 5
assert all("diags" in r.getMessage() for r in info)
# ---------------------------------------------------------------------------
# Action-required: WARNING once, DEBUG thereafter (or per call for novel events)
# ---------------------------------------------------------------------------
def test_server_unavailable_warns_once_per_binary(caplog_lsp):
for _ in range(20):
eventlog.log_server_unavailable("pyright", "pyright-langserver")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert "pyright-langserver" in warns[0].getMessage()
def test_server_unavailable_separate_per_binary(caplog_lsp):
eventlog.log_server_unavailable("pyright", "pyright-langserver")
eventlog.log_server_unavailable("typescript", "typescript-language-server")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 2
def test_no_server_configured_warns_once(caplog_lsp):
for _ in range(10):
eventlog.log_no_server_configured("pyright")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 1
def test_timeout_warns_every_call(caplog_lsp):
for _ in range(3):
eventlog.log_timeout("pyright", "/x.py")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 3
def test_server_error_warns_every_call(caplog_lsp):
for _ in range(3):
eventlog.log_server_error("pyright", "/x.py", RuntimeError("boom"))
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 3
def test_spawn_failed_warns(caplog_lsp):
eventlog.log_spawn_failed("pyright", "/proj", FileNotFoundError("nope"))
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert "spawn/initialize failed" in warns[0].getMessage()
# ---------------------------------------------------------------------------
# Format: log lines all carry the lsp[<server_id>] prefix for grep
# ---------------------------------------------------------------------------
def test_log_lines_use_lsp_prefix(caplog_lsp):
eventlog.log_clean("pyright", "/x.py")
eventlog.log_active("pyright", "/proj")
eventlog.log_diagnostics("typescript", "/y.ts", 2)
for r in caplog_lsp.records:
assert r.getMessage().startswith("lsp[")
# ---------------------------------------------------------------------------
# Steady-state contract: 1000 clean writes → 1 INFO at most
# ---------------------------------------------------------------------------
def test_thousand_clean_writes_emit_one_info(caplog_lsp):
"""A long session writes lots of files cleanly; agent.log should
show ONE 'active for' INFO and zero other INFO lines."""
eventlog.log_active("pyright", "/proj")
for _ in range(1000):
eventlog.log_clean("pyright", "/proj/x.py")
info_records = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info_records) == 1
assert "active for" in info_records[0].getMessage()
# ---------------------------------------------------------------------------
# Path shortening
# ---------------------------------------------------------------------------
def test_short_path_uses_relative_when_inside_cwd(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
sub = tmp_path / "x.py"
sub.write_text("")
out = eventlog._short_path(str(sub))
assert out == "x.py"
def test_short_path_keeps_absolute_when_outside(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path / "a") if (tmp_path / "a").exists() else None
monkeypatch.chdir(tmp_path)
other = "/var/log/foo.txt"
out = eventlog._short_path(other)
# Outside cwd: keeps absolute (no leading "../")
assert out == "/var/log/foo.txt" or not out.startswith("..")
def test_short_path_handles_empty_string():
assert eventlog._short_path("") == ""

View file

@ -0,0 +1,144 @@
"""Tests for service-singleton lifecycle: atexit handler, idempotent shutdown.
These cover the exit-cleanup behavior added to plug the language-server
process leak without the atexit hook, ``hermes chat`` exits while
pyright/gopls/etc. are still alive on the host.
"""
from __future__ import annotations
import atexit
from unittest.mock import MagicMock, patch
import pytest
from agent import lsp as lsp_module
@pytest.fixture(autouse=True)
def _reset_singleton():
"""Force a clean module state before each test.
Tests in this file share process-global state (the lazy
singleton + atexit registration flag); reset both before and
after every test so order doesn't matter.
"""
lsp_module._service = None
lsp_module._atexit_registered = False
yield
lsp_module._service = None
lsp_module._atexit_registered = False
def test_get_service_registers_atexit_handler_once(monkeypatch):
"""First call to ``get_service`` must register an atexit handler;
subsequent calls must NOT register another one (Python's ``atexit``
runs every registered callable, so a duplicate would shutdown
twice harmless but wasteful)."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = True
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
registrations = []
def fake_register(fn):
registrations.append(fn)
monkeypatch.setattr(atexit, "register", fake_register)
a = lsp_module.get_service()
b = lsp_module.get_service()
c = lsp_module.get_service()
assert a is fake_svc
assert b is fake_svc
assert c is fake_svc
assert len(registrations) == 1
# The registered callable must be our internal shutdown wrapper.
assert registrations[0] is lsp_module._atexit_shutdown
def test_atexit_shutdown_calls_shutdown_service(monkeypatch):
"""The atexit-registered wrapper invokes ``shutdown_service`` and
swallows any exception by the time atexit fires, the user has
already seen the response and a noisy traceback would be clutter."""
called = []
monkeypatch.setattr(
lsp_module, "shutdown_service", lambda: called.append("shutdown")
)
lsp_module._atexit_shutdown()
assert called == ["shutdown"]
def test_atexit_shutdown_swallows_exceptions(monkeypatch):
def boom():
raise RuntimeError("server already dead")
monkeypatch.setattr(lsp_module, "shutdown_service", boom)
# Must not raise.
lsp_module._atexit_shutdown()
def test_shutdown_service_idempotent(monkeypatch):
"""Calling shutdown twice must be safe — first call cleans up,
second call no-ops (nothing to shut down)."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = True
fake_svc.shutdown = MagicMock()
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
lsp_module.get_service()
lsp_module.shutdown_service()
lsp_module.shutdown_service() # must not raise
assert fake_svc.shutdown.call_count == 1
def test_shutdown_service_no_op_when_never_started():
"""Calling shutdown without ever creating the service is safe."""
lsp_module.shutdown_service() # must not raise
def test_shutdown_service_swallows_exception(monkeypatch):
"""An exception during ``svc.shutdown()`` must not propagate —
the caller (often atexit) has nothing useful to do with it."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = True
fake_svc.shutdown = MagicMock(side_effect=RuntimeError("kill -9 already"))
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
lsp_module.get_service()
lsp_module.shutdown_service() # must not raise
def test_get_service_returns_none_for_inactive_service(monkeypatch):
"""A service whose ``is_active()`` returns False is treated as
not running callers see ``None`` and fall back."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = False
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
assert lsp_module.get_service() is None
# Subsequent call returns None too — but the inactive instance is
# cached so we don't re-build it on every check.
assert lsp_module.get_service() is None
def test_get_service_returns_none_when_create_fails(monkeypatch):
"""Service factory returning ``None`` (no config, etc.) propagates."""
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: None)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
assert lsp_module.get_service() is None

View file

@ -0,0 +1,197 @@
"""Tests for the LSP protocol framing layer.
The framer is small but load-bearing Content-Length parsing is the
single most common reason for hand-rolled LSP clients to silently
deadlock. These tests exercise:
- exact wire format of outgoing messages (encode_message)
- partial-read tolerance + EOF handling (read_message)
- envelope helpers (request, response, notification, error)
- message classification
"""
from __future__ import annotations
import asyncio
import json
import pytest
from agent.lsp.protocol import (
ERROR_CONTENT_MODIFIED,
ERROR_METHOD_NOT_FOUND,
LSPProtocolError,
LSPRequestError,
classify_message,
encode_message,
make_error_response,
make_notification,
make_request,
make_response,
read_message,
)
# ---------------------------------------------------------------------------
# encode_message
# ---------------------------------------------------------------------------
def test_encode_message_uses_compact_separators_and_utf8():
msg = {"jsonrpc": "2.0", "id": 1, "method": "x", "params": {"k": "ä"}}
out = encode_message(msg)
# Header is plain ASCII Content-Length CRLF CRLF
header_end = out.index(b"\r\n\r\n") + 4
header = out[:header_end].decode("ascii")
body = out[header_end:]
assert "Content-Length:" in header
declared = int(header.split("Content-Length:")[1].split("\r\n")[0].strip())
# Declared length must equal actual body bytes.
assert declared == len(body)
# Body parses as JSON and round-trips.
parsed = json.loads(body.decode("utf-8"))
assert parsed == msg
# Body uses compact separators (no spaces between kv).
assert b'"id":1' in body
def test_encode_message_handles_unicode_in_strings():
msg = {"jsonrpc": "2.0", "method": "log", "params": {"text": "🚀 ünıcödé"}}
out = encode_message(msg)
header_end = out.index(b"\r\n\r\n") + 4
declared = int(out[: out.index(b"\r\n")].split(b": ")[1])
assert declared == len(out[header_end:])
assert json.loads(out[header_end:].decode("utf-8")) == msg
# ---------------------------------------------------------------------------
# read_message
# ---------------------------------------------------------------------------
async def _stream_from_bytes(data: bytes) -> asyncio.StreamReader:
"""Build an asyncio.StreamReader pre-populated with ``data``."""
reader = asyncio.StreamReader()
reader.feed_data(data)
reader.feed_eof()
return reader
@pytest.mark.asyncio
async def test_read_message_round_trip():
msg = {"jsonrpc": "2.0", "method": "ping"}
reader = await _stream_from_bytes(encode_message(msg))
parsed = await read_message(reader)
assert parsed == msg
@pytest.mark.asyncio
async def test_read_message_clean_eof_returns_none():
reader = await _stream_from_bytes(b"")
assert await read_message(reader) is None
@pytest.mark.asyncio
async def test_read_message_truncated_body_raises():
msg = encode_message({"jsonrpc": "2.0", "method": "x"})
truncated = msg[: -3] # cut the body
reader = await _stream_from_bytes(truncated)
with pytest.raises(LSPProtocolError):
await read_message(reader)
@pytest.mark.asyncio
async def test_read_message_missing_content_length_raises():
bad = b"X-Other: 5\r\n\r\n12345"
reader = await _stream_from_bytes(bad)
with pytest.raises(LSPProtocolError):
await read_message(reader)
@pytest.mark.asyncio
async def test_read_message_two_messages_back_to_back():
a = encode_message({"jsonrpc": "2.0", "method": "a"})
b = encode_message({"jsonrpc": "2.0", "method": "b"})
reader = await _stream_from_bytes(a + b)
assert (await read_message(reader))["method"] == "a"
assert (await read_message(reader))["method"] == "b"
@pytest.mark.asyncio
async def test_read_message_rejects_runaway_header():
"""A pathological server that streams headers without ever emitting
the CRLF-CRLF terminator must not loop forever the 8 KiB cap kicks
in and surfaces a protocol error."""
flood = (b"X-Junk: " + b"A" * 200 + b"\r\n") * 60 # ~12 KiB worth
reader = await _stream_from_bytes(flood)
with pytest.raises(LSPProtocolError) as exc:
await read_message(reader)
assert "8 KiB" in str(exc.value)
# ---------------------------------------------------------------------------
# envelope helpers
# ---------------------------------------------------------------------------
def test_make_request_includes_id_and_method():
msg = make_request(7, "ping", {"v": 1})
assert msg == {"jsonrpc": "2.0", "id": 7, "method": "ping", "params": {"v": 1}}
def test_make_request_omits_params_when_none():
msg = make_request(7, "ping", None)
assert "params" not in msg
def test_make_notification_omits_id():
msg = make_notification("log", {"line": "hi"})
assert "id" not in msg
assert msg["method"] == "log"
def test_make_response_carries_result():
msg = make_response(7, {"ok": True})
assert msg["id"] == 7 and msg["result"] == {"ok": True}
def test_make_error_response_shape():
msg = make_error_response(7, ERROR_CONTENT_MODIFIED, "stale", {"hint": "retry"})
assert msg["error"]["code"] == ERROR_CONTENT_MODIFIED
assert msg["error"]["message"] == "stale"
assert msg["error"]["data"] == {"hint": "retry"}
# ---------------------------------------------------------------------------
# classify_message
# ---------------------------------------------------------------------------
def test_classify_message_request():
msg = {"jsonrpc": "2.0", "id": 1, "method": "x"}
assert classify_message(msg) == ("request", 1)
def test_classify_message_response():
msg = {"jsonrpc": "2.0", "id": 1, "result": None}
assert classify_message(msg) == ("response", 1)
def test_classify_message_notification():
msg = {"jsonrpc": "2.0", "method": "log"}
assert classify_message(msg) == ("notification", "log")
def test_classify_message_invalid():
assert classify_message({"id": 1})[0] == "invalid"
assert classify_message({"jsonrpc": "1.0", "method": "x"})[0] == "invalid"
# ---------------------------------------------------------------------------
# LSPRequestError
# ---------------------------------------------------------------------------
def test_lsp_request_error_carries_code_and_data():
e = LSPRequestError(ERROR_METHOD_NOT_FOUND, "no", {"x": 1})
assert e.code == ERROR_METHOD_NOT_FOUND
assert e.message == "no"
assert e.data == {"x": 1}

View file

@ -0,0 +1,94 @@
"""Tests for the diagnostic reporter (formatting layer)."""
from __future__ import annotations
from agent.lsp.reporter import (
DEFAULT_SEVERITIES,
MAX_PER_FILE,
format_diagnostic,
report_for_file,
truncate,
)
def _diag(line=0, col=0, sev=1, code="E001", source="ls", msg="oops"):
return {
"range": {
"start": {"line": line, "character": col},
"end": {"line": line, "character": col + 1},
},
"severity": sev,
"code": code,
"source": source,
"message": msg,
}
def test_format_diagnostic_uses_one_indexed_position():
line = format_diagnostic(_diag(line=4, col=2))
assert "[5:3]" in line # +1 on both
def test_format_diagnostic_includes_severity_label():
assert format_diagnostic(_diag(sev=1)).startswith("ERROR")
assert format_diagnostic(_diag(sev=2)).startswith("WARN")
assert format_diagnostic(_diag(sev=3)).startswith("INFO")
assert format_diagnostic(_diag(sev=4)).startswith("HINT")
def test_format_diagnostic_includes_code_and_source():
line = format_diagnostic(_diag(code="X42", source="src"))
assert "[X42]" in line
assert "(src)" in line
def test_format_diagnostic_omits_missing_optional_fields():
line = format_diagnostic(
{
"range": {
"start": {"line": 0, "character": 0},
"end": {"line": 0, "character": 0},
},
"severity": 1,
"message": "bare",
}
)
assert "[" not in line.split("]", 1)[1] # no extra brackets after the position
assert "(" not in line
def test_report_for_file_returns_empty_when_only_warnings():
"""Default severity filter is ERROR-only."""
report = report_for_file("/x.py", [_diag(sev=2)])
assert report == ""
def test_report_for_file_emits_block_with_errors():
diag = _diag(msg="real error")
report = report_for_file("/x.py", [diag])
assert "<diagnostics file=\"/x.py\">" in report
assert "real error" in report
assert "</diagnostics>" in report
def test_report_for_file_caps_at_max_per_file():
diags = [_diag(line=i) for i in range(MAX_PER_FILE + 5)]
report = report_for_file("/x.py", diags)
assert "and 5 more" in report
def test_report_for_file_respects_custom_severities():
diag = _diag(sev=2, msg="warn")
report = report_for_file("/x.py", [diag], severities=frozenset({1, 2}))
assert "warn" in report
def test_truncate_below_limit_unchanged():
s = "abc" * 100
assert truncate(s, limit=4000) == s
def test_truncate_above_limit_appends_marker():
s = "x" * 10000
out = truncate(s, limit=200)
assert out.endswith("[truncated]")
assert len(out) <= 200

View file

@ -0,0 +1,149 @@
"""Tests for the synchronous LSPService wrapper.
Drives the service through ``snapshot_baseline``
``get_diagnostics_sync`` against the mock LSP server, exercising the
delta filter that ``tools/file_operations._check_lint_delta`` relies
on.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
import pytest
from agent.lsp.manager import LSPService
from agent.lsp.servers import (
SERVERS,
ServerContext,
ServerDef,
SpawnSpec,
find_server_for_file,
)
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
def _install_mock_server(monkeypatch, script: str = "errors", server_id: str = "pyright"):
"""Replace one registered server with a wrapper that spawns the mock.
We reuse ``pyright`` so .py files route to it. This keeps the
test free of any LSP toolchain dependency.
"""
target_index = next(i for i, s in enumerate(SERVERS) if s.server_id == server_id)
original = SERVERS[target_index]
def _spawn(root: str, ctx: ServerContext) -> SpawnSpec:
env = {"MOCK_LSP_SCRIPT": script}
return SpawnSpec(
command=[sys.executable, MOCK_SERVER],
workspace_root=root,
cwd=root,
env=env,
initialization_options={},
)
replacement = ServerDef(
server_id=server_id,
extensions=original.extensions,
resolve_root=lambda fp, ws: ws, # always use workspace root
build_spawn=_spawn,
seed_first_push=False,
description="mock " + server_id,
)
# Patch the SERVERS list element directly + restore on teardown.
SERVERS[target_index] = replacement
yield
SERVERS[target_index] = original
@pytest.fixture
def mock_pyright(monkeypatch, tmp_path):
"""Install the mock as ``pyright`` and create a fake git workspace."""
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").mkdir()
(repo / "pyproject.toml").write_text("") # so pyright's root resolver finds it
monkeypatch.chdir(str(repo))
gen = _install_mock_server(monkeypatch, "errors", "pyright")
next(gen)
yield repo
try:
next(gen)
except StopIteration:
pass
def test_service_returns_empty_when_disabled(tmp_path):
svc = LSPService(
enabled=False,
wait_mode="document",
wait_timeout=2.0,
install_strategy="auto",
)
assert not svc.is_active()
f = tmp_path / "x.py"
f.write_text("")
assert svc.get_diagnostics_sync(str(f)) == []
svc.shutdown()
def test_service_skips_files_outside_workspace(tmp_path):
"""Files outside any git worktree must not trigger LSP."""
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
f = tmp_path / "x.py"
f.write_text("")
# No .git anywhere — service should report not enabled for this file.
assert not svc.enabled_for(str(f))
svc.shutdown()
def test_service_e2e_delta_filter(mock_pyright):
"""End-to-end: snapshot baseline → wait → delta returned."""
repo = mock_pyright
f = repo / "x.py"
f.write_text("print('hi')\n")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=3.0,
install_strategy="manual",
)
try:
assert svc.enabled_for(str(f))
# Baseline first — server pushes 1 error.
svc.snapshot_baseline(str(f))
# Re-poll: same error is in baseline, so delta is empty.
new_diags = svc.get_diagnostics_sync(str(f))
assert new_diags == []
finally:
svc.shutdown()
def test_service_status_includes_clients(mock_pyright):
repo = mock_pyright
f = repo / "x.py"
f.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=3.0,
install_strategy="manual",
)
try:
svc.get_diagnostics_sync(str(f))
info = svc.get_status()
assert info["enabled"] is True
assert any(c["server_id"] == "pyright" for c in info["clients"])
finally:
svc.shutdown()

View file

@ -0,0 +1,139 @@
"""Tests for workspace + project-root resolution."""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from agent.lsp.workspace import (
clear_cache,
find_git_worktree,
is_inside_workspace,
nearest_root,
normalize_path,
resolve_workspace_for_file,
)
@pytest.fixture(autouse=True)
def _clear():
clear_cache()
yield
clear_cache()
def test_find_git_worktree_returns_none_outside_repo(tmp_path: Path):
sub = tmp_path / "sub"
sub.mkdir()
assert find_git_worktree(str(sub)) is None
def test_find_git_worktree_finds_dotgit(tmp_path: Path):
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").mkdir()
sub = repo / "src" / "deep"
sub.mkdir(parents=True)
assert find_git_worktree(str(sub)) == str(repo)
def test_find_git_worktree_handles_dotgit_file(tmp_path: Path):
"""``.git`` can also be a file (gitfile pointing into a worktree)."""
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").write_text("gitdir: /elsewhere\n")
assert find_git_worktree(str(repo)) == str(repo)
def test_is_inside_workspace_true_for_subpath(tmp_path: Path):
root = tmp_path / "p"
root.mkdir()
sub = root / "x" / "y.py"
sub.parent.mkdir(parents=True)
sub.write_text("")
assert is_inside_workspace(str(sub), str(root))
def test_is_inside_workspace_false_for_unrelated(tmp_path: Path):
a = tmp_path / "a"
b = tmp_path / "b"
a.mkdir()
b.mkdir()
f = b / "x.py"
f.write_text("")
assert not is_inside_workspace(str(f), str(a))
def test_nearest_root_finds_first_marker(tmp_path: Path):
root = tmp_path / "p"
deep = root / "src" / "pkg"
deep.mkdir(parents=True)
(root / "pyproject.toml").write_text("")
found = nearest_root(str(deep / "mod.py"), ["pyproject.toml"])
assert found == str(root)
def test_nearest_root_excludes_take_priority(tmp_path: Path):
"""If an exclude marker matches first, return None."""
root = tmp_path / "p"
sub = root / "deno-app"
sub.mkdir(parents=True)
(sub / "deno.json").write_text("{}")
(root / "package.json").write_text("{}") # would match if not for exclude
found = nearest_root(
str(sub / "main.ts"),
["package.json"],
excludes=["deno.json"],
)
assert found is None
def test_nearest_root_returns_none_when_no_marker(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("")
assert nearest_root(str(f), ["pyproject.toml"]) is None
def test_resolve_workspace_for_file_uses_cwd_first(tmp_path: Path, monkeypatch):
repo = tmp_path / "repo"
(repo / ".git").mkdir(parents=True)
file_path = repo / "x.py"
file_path.write_text("")
# cwd is inside the repo
monkeypatch.chdir(str(repo))
root, gated = resolve_workspace_for_file(str(file_path))
assert root == str(repo)
assert gated is True
def test_resolve_workspace_for_file_no_repo_returns_none(tmp_path: Path, monkeypatch):
monkeypatch.chdir(str(tmp_path))
f = tmp_path / "x.py"
f.write_text("")
root, gated = resolve_workspace_for_file(str(f))
assert root is None
assert gated is False
def test_resolve_workspace_falls_back_to_file_location(tmp_path: Path, monkeypatch):
"""When cwd isn't a git repo but the file is inside one, we still
discover the workspace from the file's path."""
not_a_repo = tmp_path / "loose"
not_a_repo.mkdir()
monkeypatch.chdir(str(not_a_repo))
repo = tmp_path / "actual-repo"
(repo / ".git").mkdir(parents=True)
f = repo / "x.py"
f.write_text("")
root, gated = resolve_workspace_for_file(str(f))
assert root == str(repo)
assert gated is True
def test_normalize_path_expands_tilde(monkeypatch):
monkeypatch.setenv("HOME", "/home/user")
p = normalize_path("~/x.py")
assert p == os.path.abspath("/home/user/x.py")