mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Wraps every sync->async coroutine-scheduling site in the codebase with a new agent.async_utils.safe_schedule_threadsafe() helper that closes the coroutine on scheduling failure (closed loop, shutdown race, etc.) instead of leaking it as 'coroutine was never awaited' RuntimeWarnings plus reference leaks. 22 production call sites migrated across the codebase: - acp_adapter/events.py, acp_adapter/permissions.py - agent/lsp/manager.py - cron/scheduler.py (media + text delivery paths) - gateway/platforms/feishu.py (5 sites, via existing _submit_on_loop helper which now delegates to safe_schedule_threadsafe) - gateway/run.py (10 sites: telegram rename, agent:step hook, status callback, interim+bg-review, clarify send, exec-approval button+text, temp-bubble cleanup, channel-directory refresh) - plugins/memory/hindsight, plugins/platforms/google_chat - tools/browser_supervisor.py (3), browser_cdp_tool.py, computer_use/cua_backend.py, slash_confirm.py - tools/environments/modal.py (_AsyncWorker) - tools/mcp_tool.py (2 + 8 _run_on_mcp_loop callers converted to factory-style so the coroutine is never constructed on a dead loop) - tui_gateway/ws.py Tests: new tests/agent/test_async_utils.py covers helper behavior under live loop, dead loop, None loop, and scheduling exceptions. Regression tests added at three PR-original sites (acp events, acp permissions, mcp loop runner) mirroring contributor's intent. Live-tested end-to-end: - Helper stress test: 1500 schedules across live/dead/race scenarios, zero leaked coroutines - Race exercised: 5000 schedules with loop killed mid-flight, 100 ok / 4900 None returns, zero leaks - hermes chat -q with terminal tool call (exercises step_callback bridge) - MCP probe against failing subprocess servers + factory path - Real gateway daemon boot + SIGINT shutdown across multiple platform adapter inits - WSTransport 100 live + 50 dead-loop writes - Cron delivery path live + dead loop Salvages PR #2657 — adopts contributor's intent over a much wider site list and a single centralized helper instead of inline try/except at each site. 3 of the original PR's 6 sites no longer exist on main (environments/patches.py deleted, DingTalk refactored to native async); the equivalent fix lives in tools/environments/modal.py instead. Co-authored-by: JithendraNara <jithendranaidunara@gmail.com>
644 lines
25 KiB
Python
644 lines
25 KiB
Python
"""Service-level orchestration for LSP clients.
|
|
|
|
The :class:`LSPService` is the bridge between the synchronous
|
|
file_operations layer and the async :class:`agent.lsp.client.LSPClient`.
|
|
|
|
Design choices:
|
|
|
|
- A **single asyncio event loop** runs in a background thread. All
|
|
client work happens on that loop. Synchronous callers from
|
|
``tools/file_operations.py`` use :meth:`get_diagnostics_sync` to
|
|
open + wait + drain in one blocking call.
|
|
|
|
- One client per ``(server_id, workspace_root)`` key. Lazy spawn:
|
|
the first request for a key spawns the client; subsequent requests
|
|
re-use it.
|
|
|
|
- A **broken-set** records ``(server_id, workspace_root)`` pairs that
|
|
failed to spawn or initialize. These are never retried for the
|
|
life of the service. Mirrors OpenCode's design.
|
|
|
|
- A **delta baseline** map keeps "diagnostics-as-of-the-last-snapshot"
|
|
per file. ``snapshot_baseline()`` is called BEFORE a write; the
|
|
next ``get_diagnostics_sync()`` returns only diagnostics that
|
|
weren't in the baseline. This is the lift from Claude Code's
|
|
``beforeFileEdited`` / ``getNewDiagnostics`` pattern, except wired
|
|
to the local LSP layer instead of MCP IDE RPC.
|
|
|
|
The service is **off by default** — call :meth:`is_active` to check
|
|
whether it's actually doing anything. When LSP is disabled in
|
|
config, when no git workspace can be detected, when all configured
|
|
servers are missing binaries and auto-install is off, ``is_active``
|
|
returns False and the file_operations layer falls through to the
|
|
in-process syntax check.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from concurrent.futures import Future as ConcurrentFuture
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
|
|
from agent.lsp import eventlog
|
|
from agent.lsp.client import (
|
|
DIAGNOSTICS_DOCUMENT_WAIT,
|
|
LSPClient,
|
|
file_uri,
|
|
)
|
|
from agent.lsp.servers import (
|
|
ServerContext,
|
|
ServerDef,
|
|
SpawnSpec,
|
|
find_server_for_file,
|
|
language_id_for,
|
|
)
|
|
from agent.lsp.workspace import (
|
|
clear_cache,
|
|
is_inside_workspace,
|
|
resolve_workspace_for_file,
|
|
)
|
|
|
|
logger = logging.getLogger("agent.lsp.manager")
|
|
|
|
DEFAULT_IDLE_TIMEOUT = 600 # seconds; servers idle for >10min get reaped
|
|
|
|
|
|
class _BackgroundLoop:
|
|
"""A daemon thread that owns one asyncio event loop.
|
|
|
|
Provides :meth:`run` for synchronous callers — submits a coroutine
|
|
to the loop and blocks until it finishes (or a timeout fires).
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._ready = threading.Event()
|
|
|
|
def start(self) -> None:
|
|
if self._thread is not None:
|
|
return
|
|
self._thread = threading.Thread(
|
|
target=self._run_forever,
|
|
name="hermes-lsp-loop",
|
|
daemon=True,
|
|
)
|
|
self._thread.start()
|
|
self._ready.wait(timeout=5.0)
|
|
|
|
def _run_forever(self) -> None:
|
|
loop = asyncio.new_event_loop()
|
|
self._loop = loop
|
|
asyncio.set_event_loop(loop)
|
|
self._ready.set()
|
|
try:
|
|
loop.run_forever()
|
|
finally:
|
|
try:
|
|
loop.close()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
def run(self, coro, *, timeout: Optional[float] = None) -> Any:
|
|
"""Submit a coroutine to the loop and block until done.
|
|
|
|
Returns the coroutine's result, or raises its exception.
|
|
"""
|
|
from agent.async_utils import safe_schedule_threadsafe
|
|
if self._loop is None:
|
|
if asyncio.iscoroutine(coro):
|
|
coro.close()
|
|
raise RuntimeError("background loop not started")
|
|
fut = safe_schedule_threadsafe(coro, self._loop)
|
|
if fut is None:
|
|
raise RuntimeError("background loop not running")
|
|
try:
|
|
return fut.result(timeout=timeout)
|
|
except Exception:
|
|
fut.cancel()
|
|
raise
|
|
|
|
def stop(self) -> None:
|
|
loop = self._loop
|
|
if loop is None:
|
|
return
|
|
try:
|
|
loop.call_soon_threadsafe(loop.stop)
|
|
except RuntimeError:
|
|
pass
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=2.0)
|
|
self._loop = None
|
|
self._thread = None
|
|
|
|
|
|
class LSPService:
|
|
"""The process-wide LSP service.
|
|
|
|
Created once via :meth:`create_from_config`; the
|
|
:func:`agent.lsp.get_service` accessor manages the singleton.
|
|
Most callers should use that accessor rather than constructing
|
|
:class:`LSPService` directly.
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# construction + factory
|
|
# ------------------------------------------------------------------
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
enabled: bool,
|
|
wait_mode: str,
|
|
wait_timeout: float,
|
|
install_strategy: str,
|
|
binary_overrides: Optional[Dict[str, List[str]]] = None,
|
|
env_overrides: Optional[Dict[str, Dict[str, str]]] = None,
|
|
init_overrides: Optional[Dict[str, Dict[str, Any]]] = None,
|
|
disabled_servers: Optional[List[str]] = None,
|
|
idle_timeout: float = DEFAULT_IDLE_TIMEOUT,
|
|
) -> None:
|
|
self._enabled = enabled
|
|
self._wait_mode = wait_mode if wait_mode in ("document", "full") else "document"
|
|
self._wait_timeout = wait_timeout
|
|
self._install_strategy = install_strategy
|
|
self._binary_overrides = binary_overrides or {}
|
|
self._env_overrides = env_overrides or {}
|
|
self._init_overrides = init_overrides or {}
|
|
self._disabled_servers = set(disabled_servers or [])
|
|
self._idle_timeout = idle_timeout
|
|
|
|
self._loop = _BackgroundLoop()
|
|
if self._enabled:
|
|
self._loop.start()
|
|
|
|
# Per-(server_id, workspace_root) state
|
|
self._clients: Dict[Tuple[str, str], LSPClient] = {}
|
|
self._broken: set = set()
|
|
self._spawning: Dict[Tuple[str, str], asyncio.Future] = {}
|
|
self._last_used: Dict[Tuple[str, str], float] = {}
|
|
self._state_lock = threading.Lock()
|
|
|
|
# Delta baseline: file path → snapshot of diagnostics taken
|
|
# immediately before a write. ``get_diagnostics_sync`` filters
|
|
# out anything in the baseline so the agent only sees errors
|
|
# introduced by the current edit.
|
|
self._delta_baseline: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
|
@classmethod
|
|
def create_from_config(cls) -> Optional["LSPService"]:
|
|
"""Build a service from ``hermes_cli.config`` settings.
|
|
|
|
Returns ``None`` if the config can't be loaded. The service
|
|
itself returns ``is_active()`` False when LSP is disabled.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
cfg = load_config()
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("LSP config load failed: %s", e)
|
|
return None
|
|
|
|
lsp_cfg = (cfg.get("lsp") or {}) if isinstance(cfg, dict) else {}
|
|
if not isinstance(lsp_cfg, dict):
|
|
lsp_cfg = {}
|
|
|
|
enabled = bool(lsp_cfg.get("enabled", True))
|
|
wait_mode = lsp_cfg.get("wait_mode", "document")
|
|
wait_timeout = float(lsp_cfg.get("wait_timeout", DIAGNOSTICS_DOCUMENT_WAIT))
|
|
install_strategy = lsp_cfg.get("install_strategy", "auto")
|
|
servers_cfg = lsp_cfg.get("servers") or {}
|
|
disabled = []
|
|
binary_overrides: Dict[str, List[str]] = {}
|
|
env_overrides: Dict[str, Dict[str, str]] = {}
|
|
init_overrides: Dict[str, Dict[str, Any]] = {}
|
|
if isinstance(servers_cfg, dict):
|
|
for name, sub in servers_cfg.items():
|
|
if not isinstance(sub, dict):
|
|
continue
|
|
if sub.get("disabled"):
|
|
disabled.append(name)
|
|
cmd = sub.get("command")
|
|
if isinstance(cmd, list) and cmd:
|
|
binary_overrides[name] = cmd
|
|
env = sub.get("env")
|
|
if isinstance(env, dict):
|
|
env_overrides[name] = {k: str(v) for k, v in env.items()}
|
|
init = sub.get("initialization_options")
|
|
if isinstance(init, dict):
|
|
init_overrides[name] = init
|
|
|
|
return cls(
|
|
enabled=enabled,
|
|
wait_mode=wait_mode,
|
|
wait_timeout=wait_timeout,
|
|
install_strategy=install_strategy,
|
|
binary_overrides=binary_overrides,
|
|
env_overrides=env_overrides,
|
|
init_overrides=init_overrides,
|
|
disabled_servers=disabled,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def is_active(self) -> bool:
|
|
"""Return True iff this service should be consulted at all."""
|
|
return self._enabled
|
|
|
|
def enabled_for(self, file_path: str) -> bool:
|
|
"""Return True iff LSP should run for this specific file.
|
|
|
|
Gates on workspace detection (file or cwd inside a git worktree),
|
|
on whether any registered server matches the extension, and
|
|
on whether the (server_id, workspace_root) pair is in the
|
|
broken-set from a previous spawn failure.
|
|
|
|
Files in already-broken pairs return False so the file_operations
|
|
layer skips the LSP path entirely — no spawn attempts, no
|
|
timeout cost — until the service is restarted (``hermes lsp
|
|
restart``) or the process exits.
|
|
"""
|
|
if not self._enabled:
|
|
return False
|
|
srv = find_server_for_file(file_path)
|
|
if srv is None or srv.server_id in self._disabled_servers:
|
|
return False
|
|
ws_root, gated_in = resolve_workspace_for_file(file_path)
|
|
if not (ws_root and gated_in):
|
|
return False
|
|
# Broken-set short-circuit. Use the per-server root if we can
|
|
# compute one cheaply; otherwise fall back to the workspace
|
|
# root as the broken key (which is what _get_or_spawn would
|
|
# have used anyway when it failed).
|
|
try:
|
|
per_server_root = srv.resolve_root(file_path, ws_root) or ws_root
|
|
except Exception: # noqa: BLE001
|
|
per_server_root = ws_root
|
|
if (srv.server_id, per_server_root) in self._broken:
|
|
return False
|
|
return True
|
|
|
|
def snapshot_baseline(self, file_path: str) -> None:
|
|
"""Snapshot current diagnostics for ``file_path`` as the delta baseline.
|
|
|
|
Called BEFORE a write so the next ``get_diagnostics_sync()``
|
|
can filter out pre-existing errors. Best-effort — failures
|
|
are silently swallowed so a flaky server can't break a write.
|
|
|
|
Outer timeouts (e.g. server hangs during initialize) mark the
|
|
(server_id, workspace_root) pair as broken so subsequent edits
|
|
skip it instantly instead of re-paying the timeout cost.
|
|
"""
|
|
if not self.enabled_for(file_path):
|
|
return
|
|
try:
|
|
diags = self._loop.run(self._snapshot_async(file_path), timeout=8.0)
|
|
self._delta_baseline[os.path.abspath(file_path)] = diags or []
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("baseline snapshot failed for %s: %s", file_path, e)
|
|
self._mark_broken_for_file(file_path, e)
|
|
self._delta_baseline[os.path.abspath(file_path)] = []
|
|
|
|
def get_diagnostics_sync(
|
|
self,
|
|
file_path: str,
|
|
*,
|
|
delta: bool = True,
|
|
timeout: Optional[float] = None,
|
|
line_shift: Optional[Callable[[int], Optional[int]]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Synchronously open ``file_path`` in the right server, wait for
|
|
diagnostics, return them.
|
|
|
|
If ``delta`` is True (default), the result is filtered against
|
|
any baseline previously captured via :meth:`snapshot_baseline`.
|
|
Diagnostics present in the baseline are removed so the caller
|
|
only sees errors introduced by the current edit.
|
|
|
|
When ``line_shift`` is provided, baseline diagnostics are
|
|
remapped through it before the set-difference. This handles
|
|
the case where the edit deleted or inserted lines, causing
|
|
pre-existing diagnostics below the edit point to surface at
|
|
different line numbers in the post-edit snapshot — without
|
|
the shift, they'd all look "introduced by this edit". Pass
|
|
a callable built by
|
|
:func:`agent.lsp.range_shift.build_line_shift` (pre_text,
|
|
post_text). Omit when pre/post content isn't available;
|
|
the unshifted comparison still catches diagnostics that
|
|
didn't move.
|
|
|
|
Returns an empty list when LSP is disabled, when no workspace
|
|
can be detected, when no server matches, or when the server
|
|
can't be spawned. Never raises.
|
|
"""
|
|
if not self.enabled_for(file_path):
|
|
return []
|
|
|
|
# Resolve server_id eagerly so we can emit structured logs even
|
|
# when the request errors out below.
|
|
srv = find_server_for_file(file_path)
|
|
server_id = srv.server_id if srv else "?"
|
|
|
|
try:
|
|
t = timeout if timeout is not None else self._wait_timeout + 2.0
|
|
diags = self._loop.run(self._open_and_wait_async(file_path), timeout=t) or []
|
|
except asyncio.TimeoutError as e:
|
|
eventlog.log_timeout(server_id, file_path)
|
|
logger.debug("LSP diagnostics timeout for %s: %s", file_path, e)
|
|
self._mark_broken_for_file(file_path, e)
|
|
return []
|
|
except Exception as e: # noqa: BLE001
|
|
eventlog.log_server_error(server_id, file_path, e)
|
|
logger.debug("LSP diagnostics fetch failed for %s: %s", file_path, e)
|
|
self._mark_broken_for_file(file_path, e)
|
|
return []
|
|
|
|
abs_path = os.path.abspath(file_path)
|
|
if delta:
|
|
baseline = self._delta_baseline.get(abs_path) or []
|
|
if baseline:
|
|
if line_shift is not None:
|
|
# Remap baseline diagnostics into post-edit
|
|
# coordinates so shifted-but-otherwise-identical
|
|
# entries hash equal under _diag_key. Entries
|
|
# that mapped into a deleted region drop out
|
|
# silently — they no longer apply.
|
|
from agent.lsp.range_shift import shift_baseline
|
|
baseline = shift_baseline(baseline, line_shift)
|
|
seen = {_diag_key(d) for d in baseline}
|
|
diags = [d for d in diags if _diag_key(d) not in seen]
|
|
# Roll baseline forward — next call returns deltas relative
|
|
# to the just-emitted state, mirroring claude-code's
|
|
# diagnosticTracking.
|
|
try:
|
|
fresh = self._loop.run(self._current_diags_async(file_path), timeout=2.0) or []
|
|
except Exception: # noqa: BLE001
|
|
fresh = []
|
|
if fresh:
|
|
self._delta_baseline[abs_path] = fresh
|
|
|
|
if diags:
|
|
eventlog.log_diagnostics(server_id, file_path, len(diags))
|
|
else:
|
|
eventlog.log_clean(server_id, file_path)
|
|
return diags
|
|
|
|
def _mark_broken_for_file(self, file_path: str, exc: BaseException) -> None:
|
|
"""Mark the (server_id, workspace_root) pair as broken so subsequent
|
|
edits skip it instantly instead of re-paying timeout cost.
|
|
|
|
Called when the outer ``_loop.run`` timeout cancels an in-flight
|
|
spawn/initialize that the inner ``_get_or_spawn`` task was still
|
|
holding open. Without this, every subsequent write would re-enter
|
|
the spawn path and re-pay the full ``snapshot_baseline``
|
|
timeout (8s) until the binary is fixed.
|
|
|
|
Also kills any orphan client process that survived the cancelled
|
|
future, and emits a single eventlog WARNING so the user knows
|
|
which server gave up.
|
|
|
|
``exc`` is whatever exception the outer wrapper caught — used
|
|
only for logging, never re-raised.
|
|
"""
|
|
srv = find_server_for_file(file_path)
|
|
if srv is None:
|
|
return
|
|
ws_root, gated = resolve_workspace_for_file(file_path)
|
|
if not (ws_root and gated):
|
|
return
|
|
try:
|
|
per_server_root = srv.resolve_root(file_path, ws_root) or ws_root
|
|
except Exception: # noqa: BLE001
|
|
per_server_root = ws_root
|
|
key = (srv.server_id, per_server_root)
|
|
already_broken = key in self._broken
|
|
self._broken.add(key)
|
|
|
|
# Kill any client we managed to spawn before the timeout. The
|
|
# cancelled future never reached the broken-set add inside
|
|
# ``_get_or_spawn`` so the client may still be hanging in
|
|
# ``_clients`` with a half-initialized state.
|
|
with self._state_lock:
|
|
client = self._clients.pop(key, None)
|
|
if client is not None:
|
|
try:
|
|
# Fire-and-forget shutdown — give it a second to cleanup,
|
|
# but don't block. We're already on a slow path.
|
|
self._loop.run(client.shutdown(), timeout=1.0)
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
if not already_broken:
|
|
eventlog.log_spawn_failed(srv.server_id, per_server_root, exc)
|
|
|
|
def shutdown(self) -> None:
|
|
"""Tear down all clients and stop the background loop."""
|
|
if not self._enabled:
|
|
return
|
|
try:
|
|
self._loop.run(self._shutdown_async(), timeout=10.0)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("LSP shutdown error: %s", e)
|
|
self._loop.stop()
|
|
clear_cache()
|
|
|
|
# ------------------------------------------------------------------
|
|
# async internals
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _snapshot_async(self, file_path: str) -> List[Dict[str, Any]]:
|
|
client = await self._get_or_spawn(file_path)
|
|
if client is None:
|
|
return []
|
|
try:
|
|
version = await client.open_file(file_path, language_id=language_id_for(file_path))
|
|
await client.wait_for_diagnostics(file_path, version, mode=self._wait_mode)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("snapshot open/wait failed: %s", e)
|
|
return []
|
|
self._last_used[(client.server_id, client.workspace_root)] = time.time()
|
|
return list(client.diagnostics_for(file_path))
|
|
|
|
async def _open_and_wait_async(self, file_path: str) -> List[Dict[str, Any]]:
|
|
client = await self._get_or_spawn(file_path)
|
|
if client is None:
|
|
return []
|
|
try:
|
|
version = await client.open_file(file_path, language_id=language_id_for(file_path))
|
|
await client.save_file(file_path)
|
|
await client.wait_for_diagnostics(file_path, version, mode=self._wait_mode)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("open/wait failed for %s: %s", file_path, e)
|
|
return []
|
|
self._last_used[(client.server_id, client.workspace_root)] = time.time()
|
|
return list(client.diagnostics_for(file_path))
|
|
|
|
async def _current_diags_async(self, file_path: str) -> List[Dict[str, Any]]:
|
|
ws, gated = resolve_workspace_for_file(file_path)
|
|
srv = find_server_for_file(file_path)
|
|
if not (ws and gated and srv):
|
|
return []
|
|
with self._state_lock:
|
|
client = self._clients.get((srv.server_id, ws))
|
|
if client is None:
|
|
return []
|
|
return list(client.diagnostics_for(file_path))
|
|
|
|
async def _get_or_spawn(self, file_path: str) -> Optional[LSPClient]:
|
|
srv = find_server_for_file(file_path)
|
|
if srv is None:
|
|
return None
|
|
if srv.server_id in self._disabled_servers:
|
|
eventlog.log_disabled(srv.server_id, file_path, "disabled in config")
|
|
return None
|
|
ws_root, gated = resolve_workspace_for_file(file_path)
|
|
if not (ws_root and gated):
|
|
eventlog.log_no_project_root(srv.server_id, file_path)
|
|
return None
|
|
per_server_root = srv.resolve_root(file_path, ws_root)
|
|
if per_server_root is None:
|
|
eventlog.log_disabled(
|
|
srv.server_id, file_path, "exclude marker hit (server gated off)"
|
|
)
|
|
return None # exclude marker hit, server gated off
|
|
|
|
key = (srv.server_id, per_server_root)
|
|
if key in self._broken:
|
|
return None
|
|
with self._state_lock:
|
|
client = self._clients.get(key)
|
|
if client is not None and client.is_running:
|
|
eventlog.log_active(srv.server_id, per_server_root)
|
|
return client
|
|
spawning = self._spawning.get(key)
|
|
if spawning is not None:
|
|
try:
|
|
return await spawning
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
|
|
# Begin spawn
|
|
loop = asyncio.get_running_loop()
|
|
spawn_future: asyncio.Future = loop.create_future()
|
|
with self._state_lock:
|
|
self._spawning[key] = spawn_future
|
|
try:
|
|
ctx = ServerContext(
|
|
workspace_root=per_server_root,
|
|
install_strategy=self._install_strategy,
|
|
binary_overrides=self._binary_overrides,
|
|
env_overrides=self._env_overrides,
|
|
init_overrides=self._init_overrides,
|
|
)
|
|
spec = srv.build_spawn(per_server_root, ctx)
|
|
if spec is None:
|
|
# ``build_spawn`` returns None when the binary can't be
|
|
# located (auto-install disabled, manual-only server,
|
|
# or install attempt failed). Surface this once via
|
|
# the structured logger so the user can act on it.
|
|
eventlog.log_server_unavailable(srv.server_id, srv.server_id)
|
|
self._broken.add(key)
|
|
spawn_future.set_result(None)
|
|
return None
|
|
client = LSPClient(
|
|
server_id=srv.server_id,
|
|
workspace_root=spec.workspace_root,
|
|
command=spec.command,
|
|
env=spec.env,
|
|
cwd=spec.cwd,
|
|
initialization_options=spec.initialization_options,
|
|
seed_diagnostics_on_first_push=spec.seed_diagnostics_on_first_push or srv.seed_first_push,
|
|
)
|
|
try:
|
|
await client.start()
|
|
except Exception as e: # noqa: BLE001
|
|
eventlog.log_spawn_failed(srv.server_id, per_server_root, e)
|
|
self._broken.add(key)
|
|
spawn_future.set_result(None)
|
|
return None
|
|
with self._state_lock:
|
|
self._clients[key] = client
|
|
self._last_used[key] = time.time()
|
|
eventlog.log_active(srv.server_id, per_server_root)
|
|
spawn_future.set_result(client)
|
|
return client
|
|
finally:
|
|
with self._state_lock:
|
|
self._spawning.pop(key, None)
|
|
|
|
async def _shutdown_async(self) -> None:
|
|
with self._state_lock:
|
|
clients = list(self._clients.values())
|
|
self._clients.clear()
|
|
self._broken.clear()
|
|
self._last_used.clear()
|
|
await asyncio.gather(
|
|
*(c.shutdown() for c in clients),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# status / introspection (used by ``hermes lsp status``)
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Return a snapshot of the service for the CLI status command."""
|
|
with self._state_lock:
|
|
clients = [
|
|
{
|
|
"server_id": k[0],
|
|
"workspace_root": k[1],
|
|
"state": c.state,
|
|
"running": c.is_running,
|
|
}
|
|
for k, c in self._clients.items()
|
|
]
|
|
broken = list(self._broken)
|
|
return {
|
|
"enabled": self._enabled,
|
|
"wait_mode": self._wait_mode,
|
|
"wait_timeout": self._wait_timeout,
|
|
"install_strategy": self._install_strategy,
|
|
"clients": clients,
|
|
"broken": broken,
|
|
"disabled_servers": sorted(self._disabled_servers),
|
|
}
|
|
|
|
|
|
def _diag_key(d: Dict[str, Any]) -> str:
|
|
"""Content equality key used for cross-edit delta filtering.
|
|
|
|
Includes the diagnostic's position range — when used together
|
|
with :func:`agent.lsp.range_shift.shift_baseline`, the baseline
|
|
is line-shifted into post-edit coordinates BEFORE this key is
|
|
computed, so identical-but-shifted diagnostics hash equal. Two
|
|
genuinely distinct diagnostics at different lines (e.g. the same
|
|
error class introduced at a second site) hash differently and
|
|
are surfaced as new.
|
|
|
|
Mirrors :func:`agent.lsp.client._diagnostic_key`; intentionally
|
|
identical so the two layers agree on diagnostic identity.
|
|
"""
|
|
rng = d.get("range") or {}
|
|
start = rng.get("start") or {}
|
|
end = rng.get("end") or {}
|
|
code = d.get("code")
|
|
if code is not None and not isinstance(code, str):
|
|
code = str(code)
|
|
return "\x00".join(
|
|
[
|
|
str(d.get("severity") or 1),
|
|
str(code or ""),
|
|
str(d.get("source") or ""),
|
|
str(d.get("message") or "").strip(),
|
|
f"{start.get('line', 0)}:{start.get('character', 0)}-{end.get('line', 0)}:{end.get('character', 0)}",
|
|
]
|
|
)
|
|
|
|
|
|
__all__ = ["LSPService"]
|