import json import os import subprocess import sys import threading import uuid from pathlib import Path from hermes_constants import get_hermes_home from hermes_cli.env_loader import load_hermes_dotenv _hermes_home = get_hermes_home() load_hermes_dotenv(hermes_home=_hermes_home, project_env=Path(__file__).parent.parent / ".env") from tui_gateway.render import make_stream_renderer, render_diff, render_message _sessions: dict[str, dict] = {} _methods: dict[str, callable] = {} _pending: dict[str, threading.Event] = {} _answers: dict[str, str] = {} _db = None # ── Plumbing ────────────────────────────────────────────────────────── def _get_db(): global _db if _db is None: from hermes_state import SessionDB _db = SessionDB() return _db def _emit(event: str, sid: str, payload: dict | None = None): params = {"type": event, "session_id": sid} if payload: params["payload"] = payload sys.stdout.write(json.dumps({"jsonrpc": "2.0", "method": "event", "params": params}) + "\n") sys.stdout.flush() def _ok(rid, result: dict) -> dict: return {"jsonrpc": "2.0", "id": rid, "result": result} def _err(rid, code: int, msg: str) -> dict: return {"jsonrpc": "2.0", "id": rid, "error": {"code": code, "message": msg}} def method(name: str): def dec(fn): _methods[name] = fn return fn return dec def handle_request(req: dict) -> dict | None: fn = _methods.get(req.get("method", "")) if not fn: return _err(req.get("id"), -32601, f"unknown method: {req.get('method')}") return fn(req.get("id"), req.get("params", {})) def _sess(params, rid): s = _sessions.get(params.get("session_id", "")) return (s, None) if s else (None, _err(rid, 4001, "session not found")) # ── Config I/O ──────────────────────────────────────────────────────── def _load_cfg() -> dict: try: import yaml p = _hermes_home / "config.yaml" if p.exists(): with open(p) as f: return yaml.safe_load(f) or {} except Exception: pass return {} def _save_cfg(cfg: dict): import yaml with open(_hermes_home / "config.yaml", "w") as f: yaml.safe_dump(cfg, f) # ── Blocking prompt factory ────────────────────────────────────────── def _block(event: str, sid: str, payload: dict, timeout: int = 300) -> str: rid = uuid.uuid4().hex[:8] ev = threading.Event() _pending[rid] = ev payload["request_id"] = rid _emit(event, sid, payload) ev.wait(timeout=timeout) _pending.pop(rid, None) return _answers.pop(rid, "") def _clear_pending(): for rid, ev in list(_pending.items()): _answers[rid] = "" ev.set() # ── Agent factory ──────────────────────────────────────────────────── def resolve_skin() -> dict: try: from hermes_cli.skin_engine import init_skin_from_config, get_active_skin init_skin_from_config(_load_cfg()) skin = get_active_skin() return {"name": skin.name, "colors": skin.colors, "branding": skin.branding} except Exception: return {} def _resolve_model() -> str: env = os.environ.get("HERMES_MODEL", "") if env: return env m = _load_cfg().get("model", "") if isinstance(m, dict): return m.get("default", "") if isinstance(m, str) and m: return m return "anthropic/claude-sonnet-4" def _get_usage(agent) -> dict: g = lambda k, fb=None: getattr(agent, k, 0) or (getattr(agent, fb, 0) if fb else 0) return { "input": g("session_input_tokens", "session_prompt_tokens"), "output": g("session_output_tokens", "session_completion_tokens"), "total": g("session_total_tokens"), "calls": g("session_api_calls"), } def _session_info(agent) -> dict: info: dict = {"model": getattr(agent, "model", ""), "tools": {}, "skills": {}} try: from model_tools import get_toolset_for_tool for t in getattr(agent, "tools", []) or []: name = t["function"]["name"] info["tools"].setdefault(get_toolset_for_tool(name) or "other", []).append(name) except Exception: pass try: from hermes_cli.banner import get_available_skills info["skills"] = get_available_skills() except Exception: pass return info def _agent_cbs(sid: str) -> dict: return dict( tool_start_callback=lambda tc_id, name, args: _emit("tool.start", sid, {"tool_id": tc_id, "name": name}), tool_complete_callback=lambda tc_id, name, args, result: _emit("tool.complete", sid, {"tool_id": tc_id, "name": name}), tool_progress_callback=lambda name, preview, args: _emit("tool.progress", sid, {"name": name, "preview": preview}), tool_gen_callback=lambda name: _emit("tool.generating", sid, {"name": name}), thinking_callback=lambda text: _emit("thinking.delta", sid, {"text": text}), reasoning_callback=lambda text: _emit("reasoning.delta", sid, {"text": text}), status_callback=lambda text: _emit("status.update", sid, {"text": text}), clarify_callback=lambda q, c: _block("clarify.request", sid, {"question": q, "choices": c}), ) def _wire_callbacks(sid: str): from tools.terminal_tool import set_sudo_password_callback from tools.skills_tool import set_secret_capture_callback set_sudo_password_callback(lambda: _block("sudo.request", sid, {}, timeout=120)) def secret_cb(env_var, prompt, metadata=None): pl = {"prompt": prompt, "env_var": env_var} if metadata: pl["metadata"] = metadata val = _block("secret.request", sid, pl) if not val: return {"success": True, "stored_as": env_var, "validated": False, "skipped": True, "message": "skipped"} from hermes_cli.config import save_env_value_secure return {**save_env_value_secure(env_var, val), "skipped": False, "message": "ok"} set_secret_capture_callback(secret_cb) def _make_agent(sid: str, key: str, session_id: str | None = None): from run_agent import AIAgent return AIAgent( model=_resolve_model(), quiet_mode=True, platform="tui", session_id=session_id or key, session_db=_get_db(), **_agent_cbs(sid), ) def _init_session(sid: str, key: str, agent, history: list, cols: int = 80): _sessions[sid] = { "agent": agent, "session_key": key, "history": history, "attached_images": [], "image_counter": 0, "cols": cols, } try: from tools.approval import register_gateway_notify, load_permanent_allowlist register_gateway_notify(key, lambda data: _emit("approval.request", sid, data)) load_permanent_allowlist() except Exception: pass _wire_callbacks(sid) _emit("session.info", sid, _session_info(agent)) def _with_checkpoints(session, fn): return fn(session["agent"]._checkpoint_mgr, os.getenv("TERMINAL_CWD", os.getcwd())) def _enrich_with_attached_images(user_text: str, image_paths: list[str]) -> str: """Pre-analyze attached images via vision and prepend descriptions to user text.""" import asyncio, json as _json from tools.vision_tools import vision_analyze_tool prompt = ( "Describe everything visible in this image in thorough detail. " "Include any text, code, data, objects, people, layout, colors, " "and any other notable visual information." ) parts: list[str] = [] for path in image_paths: p = Path(path) if not p.exists(): continue hint = f"[You can examine it with vision_analyze using image_url: {p}]" try: r = _json.loads(asyncio.run(vision_analyze_tool(image_url=str(p), user_prompt=prompt))) desc = r.get("analysis", "") if r.get("success") else None parts.append(f"[The user attached an image:\n{desc}]\n{hint}" if desc else f"[The user attached an image but analysis failed.]\n{hint}") except Exception: parts.append(f"[The user attached an image but analysis failed.]\n{hint}") text = user_text or "" prefix = "\n\n".join(parts) if prefix: return f"{prefix}\n\n{text}" if text else prefix return text or "What do you see in this image?" # ── Methods: session ───────────────────────────────────────────────── @method("session.create") def _(rid, params: dict) -> dict: sid = uuid.uuid4().hex[:8] key = f"tui-{sid}" os.environ["HERMES_SESSION_KEY"] = key os.environ["HERMES_INTERACTIVE"] = "1" try: agent = _make_agent(sid, key) _get_db().create_session(key, source="tui", model=_resolve_model()) _init_session(sid, key, agent, [], cols=int(params.get("cols", 80))) except Exception as e: return _err(rid, 5000, f"agent init failed: {e}") return _ok(rid, {"session_id": sid}) @method("session.list") def _(rid, params: dict) -> dict: try: rows = _get_db().list_sessions_rich(source="tui", limit=params.get("limit", 20)) return _ok(rid, {"sessions": [ {"id": s["id"], "title": s.get("title") or "", "preview": s.get("preview") or "", "started_at": s.get("started_at") or 0, "message_count": s.get("message_count") or 0} for s in rows ]}) except Exception as e: return _err(rid, 5006, str(e)) @method("session.resume") def _(rid, params: dict) -> dict: target = params.get("session_id", "") if not target: return _err(rid, 4006, "session_id required") db = _get_db() found = db.get_session(target) if not found: found = db.get_session_by_title(target) if found: target = found["id"] else: return _err(rid, 4007, "session not found") sid = uuid.uuid4().hex[:8] os.environ["HERMES_SESSION_KEY"] = target os.environ["HERMES_INTERACTIVE"] = "1" try: db.reopen_session(target) history = [{"role": m["role"], "content": m["content"]} for m in db.get_messages(target) if m.get("role") in ("user", "assistant", "tool", "system")] agent = _make_agent(sid, target, session_id=target) _init_session(sid, target, agent, history, cols=int(params.get("cols", 80))) except Exception as e: return _err(rid, 5000, f"resume failed: {e}") return _ok(rid, {"session_id": sid, "resumed": target, "message_count": len(history)}) @method("session.title") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err title, key = params.get("title", ""), session["session_key"] if not title: return _ok(rid, {"title": _get_db().get_session_title(key) or "", "session_key": key}) try: _get_db().set_session_title(key, title) return _ok(rid, {"title": title}) except Exception as e: return _err(rid, 5007, str(e)) @method("session.usage") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) return err or _ok(rid, _get_usage(session["agent"])) @method("session.history") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) return err or _ok(rid, {"count": len(session.get("history", []))}) @method("session.undo") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err history, removed = session.get("history", []), 0 while history and history[-1].get("role") in ("assistant", "tool"): history.pop(); removed += 1 if history and history[-1].get("role") == "user": history.pop(); removed += 1 return _ok(rid, {"removed": removed}) @method("session.compress") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err agent = session["agent"] try: if hasattr(agent, "compress_context"): agent.compress_context() return _ok(rid, {"status": "compressed", "usage": _get_usage(agent)}) except Exception as e: return _err(rid, 5005, str(e)) @method("session.save") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err import time as _time filename = f"hermes_conversation_{_time.strftime('%Y%m%d_%H%M%S')}.json" try: with open(filename, "w") as f: json.dump({"model": getattr(session["agent"], "model", ""), "messages": session.get("history", [])}, f, indent=2, ensure_ascii=False) return _ok(rid, {"file": filename}) except Exception as e: return _err(rid, 5011, str(e)) @method("session.interrupt") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err if hasattr(session["agent"], "interrupt"): session["agent"].interrupt() _clear_pending() try: from tools.approval import resolve_gateway_approval resolve_gateway_approval(session["session_key"], "deny", resolve_all=True) except Exception: pass return _ok(rid, {"status": "interrupted"}) @method("terminal.resize") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err session["cols"] = int(params.get("cols", 80)) return _ok(rid, {"cols": session["cols"]}) # ── Methods: prompt ────────────────────────────────────────────────── @method("prompt.submit") def _(rid, params: dict) -> dict: sid, text = params.get("session_id", ""), params.get("text", "") session = _sessions.get(sid) if not session: return _err(rid, 4001, "session not found") agent, history = session["agent"], session["history"] _emit("message.start", sid) def run(): try: cols = session.get("cols", 80) streamer = make_stream_renderer(cols) images = session.pop("attached_images", []) prompt = _enrich_with_attached_images(text, images) if images else text def _stream(delta): payload = {"text": delta} if streamer and (r := streamer.feed(delta)) is not None: payload["rendered"] = r _emit("message.delta", sid, payload) result = agent.run_conversation( prompt, conversation_history=list(history), stream_callback=_stream, ) if isinstance(result, dict): if isinstance(result.get("messages"), list): session["history"] = result["messages"] raw = result.get("final_response", "") status = "interrupted" if result.get("interrupted") else "error" if result.get("error") else "complete" else: raw = str(result) status = "complete" payload = {"text": raw, "usage": _get_usage(agent), "status": status} rendered = render_message(raw, cols) if rendered: payload["rendered"] = rendered _emit("message.complete", sid, payload) except Exception as e: _emit("error", sid, {"message": str(e)}) threading.Thread(target=run, daemon=True).start() return _ok(rid, {"status": "streaming"}) @method("clipboard.paste") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err try: from datetime import datetime from hermes_cli.clipboard import has_clipboard_image, save_clipboard_image except Exception as e: return _err(rid, 5027, f"clipboard unavailable: {e}") if not has_clipboard_image(): return _ok(rid, {"attached": False, "message": "No image found in clipboard"}) img_dir = _hermes_home / "images" img_dir.mkdir(parents=True, exist_ok=True) session["image_counter"] = session.get("image_counter", 0) + 1 img_path = img_dir / f"clip_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{session['image_counter']}.png" if not save_clipboard_image(img_path): return _ok(rid, {"attached": False, "message": "Clipboard has image but extraction failed"}) session.setdefault("attached_images", []).append(str(img_path)) return _ok(rid, {"attached": True, "path": str(img_path), "count": len(session["attached_images"])}) @method("prompt.background") def _(rid, params: dict) -> dict: text, parent = params.get("text", ""), params.get("session_id", "") if not text: return _err(rid, 4012, "text required") task_id = f"bg_{uuid.uuid4().hex[:6]}" def run(): try: from run_agent import AIAgent result = AIAgent(model=_resolve_model(), quiet_mode=True, platform="tui", session_id=task_id, max_iterations=30).run_conversation(text) _emit("background.complete", parent, {"task_id": task_id, "text": result.get("final_response", str(result)) if isinstance(result, dict) else str(result)}) except Exception as e: _emit("background.complete", parent, {"task_id": task_id, "text": f"error: {e}"}) threading.Thread(target=run, daemon=True).start() return _ok(rid, {"task_id": task_id}) @method("prompt.btw") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err text, sid = params.get("text", ""), params.get("session_id", "") if not text: return _err(rid, 4012, "text required") snapshot = list(session.get("history", [])) def run(): try: from run_agent import AIAgent result = AIAgent(model=_resolve_model(), quiet_mode=True, platform="tui", max_iterations=8, enabled_toolsets=[]).run_conversation(text, conversation_history=snapshot) _emit("btw.complete", sid, {"text": result.get("final_response", str(result)) if isinstance(result, dict) else str(result)}) except Exception as e: _emit("btw.complete", sid, {"text": f"error: {e}"}) threading.Thread(target=run, daemon=True).start() return _ok(rid, {"status": "running"}) # ── Methods: respond ───────────────────────────────────────────────── def _respond(rid, params, key): r = params.get("request_id", "") ev = _pending.get(r) if not ev: return _err(rid, 4009, f"no pending {key} request") _answers[r] = params.get(key, "") ev.set() return _ok(rid, {"status": "ok"}) @method("clarify.respond") def _(rid, params: dict) -> dict: return _respond(rid, params, "answer") @method("sudo.respond") def _(rid, params: dict) -> dict: return _respond(rid, params, "password") @method("secret.respond") def _(rid, params: dict) -> dict: return _respond(rid, params, "value") @method("approval.respond") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err try: from tools.approval import resolve_gateway_approval return _ok(rid, {"resolved": resolve_gateway_approval( session["session_key"], params.get("choice", "deny"), resolve_all=params.get("all", False))}) except Exception as e: return _err(rid, 5004, str(e)) # ── Methods: config ────────────────────────────────────────────────── @method("config.set") def _(rid, params: dict) -> dict: key, value = params.get("key", ""), params.get("value", "") if key == "model": os.environ["HERMES_MODEL"] = value return _ok(rid, {"key": key, "value": value}) if key == "verbose": cycle = ["off", "new", "all", "verbose"] if value and value != "cycle": os.environ["HERMES_VERBOSE"] = value return _ok(rid, {"key": key, "value": value}) cur = os.environ.get("HERMES_VERBOSE", "all") try: idx = cycle.index(cur) except ValueError: idx = 2 nv = cycle[(idx + 1) % len(cycle)] os.environ["HERMES_VERBOSE"] = nv return _ok(rid, {"key": key, "value": nv}) if key == "yolo": nv = "0" if os.environ.get("HERMES_YOLO", "0") == "1" else "1" os.environ["HERMES_YOLO"] = nv return _ok(rid, {"key": key, "value": nv}) if key == "reasoning": if value in ("show", "on"): os.environ["HERMES_SHOW_REASONING"] = "1" return _ok(rid, {"key": key, "value": "show"}) if value in ("hide", "off"): os.environ.pop("HERMES_SHOW_REASONING", None) return _ok(rid, {"key": key, "value": "hide"}) os.environ["HERMES_REASONING"] = value return _ok(rid, {"key": key, "value": value}) if key in ("prompt", "personality", "skin"): try: cfg = _load_cfg() if key == "prompt": if value == "clear": cfg.pop("custom_prompt", None) nv = "" else: cfg["custom_prompt"] = value nv = value elif key == "personality": cfg.setdefault("display", {})["personality"] = value if value not in ("none", "default", "neutral") else "" nv = value else: cfg.setdefault("display", {})[key] = value nv = value _save_cfg(cfg) return _ok(rid, {"key": key, "value": nv}) except Exception as e: return _err(rid, 5001, str(e)) return _err(rid, 4002, f"unknown config key: {key}") @method("config.get") def _(rid, params: dict) -> dict: key = params.get("key", "") if key == "provider": try: from hermes_cli.models import list_available_providers, normalize_provider model = _resolve_model() parts = model.split("/", 1) return _ok(rid, {"model": model, "provider": normalize_provider(parts[0]) if len(parts) > 1 else "unknown", "providers": list_available_providers()}) except Exception as e: return _err(rid, 5013, str(e)) if key == "profile": from hermes_constants import display_hermes_home return _ok(rid, {"home": str(_hermes_home), "display": display_hermes_home()}) if key == "full": return _ok(rid, {"config": _load_cfg()}) if key == "prompt": return _ok(rid, {"prompt": _load_cfg().get("custom_prompt", "")}) return _err(rid, 4002, f"unknown config key: {key}") # ── Methods: tools & system ────────────────────────────────────────── @method("process.stop") def _(rid, params: dict) -> dict: try: from tools.process_registry import ProcessRegistry return _ok(rid, {"killed": ProcessRegistry().kill_all()}) except Exception as e: return _err(rid, 5010, str(e)) @method("reload.mcp") def _(rid, params: dict) -> dict: session = _sessions.get(params.get("session_id", "")) try: from tools.mcp_tool import shutdown_mcp_servers, discover_mcp_tools shutdown_mcp_servers() discover_mcp_tools() if session: agent = session["agent"] if hasattr(agent, "refresh_tools"): agent.refresh_tools() _emit("session.info", params.get("session_id", ""), _session_info(agent)) return _ok(rid, {"status": "reloaded"}) except Exception as e: return _err(rid, 5015, str(e)) @method("commands.catalog") def _(rid, params: dict) -> dict: """Registry-backed slash metadata (same surface as SlashCommandCompleter).""" try: from hermes_cli.commands import COMMAND_REGISTRY, COMMANDS, SUBCOMMANDS pairs = sorted(COMMANDS.items(), key=lambda kv: kv[0]) sub = {k: v[:] for k, v in SUBCOMMANDS.items()} canon: dict[str, str] = {} for cmd in COMMAND_REGISTRY: if cmd.gateway_only: continue c = f"/{cmd.name}" canon[c.lower()] = c for a in cmd.aliases: canon[f"/{a}".lower()] = c skills = [] try: from agent.skill_commands import scan_skill_commands for k, info in scan_skill_commands().items(): d = str(info.get("description", "Skill")) skills.append([k, f"⚡ {d[:120]}{'…' if len(d) > 120 else ''}"]) except Exception: pass return _ok(rid, {"pairs": pairs + skills, "sub": sub, "canon": canon}) except Exception as e: return _err(rid, 5020, str(e)) def _cli_exec_blocked(argv: list[str]) -> str | None: """Return user hint if this argv must not run headless in the gateway process.""" if not argv: return "bare `hermes` is interactive — use `/hermes chat -q …` or run `hermes` in another terminal" a0 = argv[0].lower() if a0 == "setup": return "`hermes setup` needs a full terminal — run it outside the Ink UI" if a0 == "gateway": return "`hermes gateway` is long-running — run it in another terminal" if a0 == "sessions" and len(argv) > 1 and argv[1].lower() == "browse": return "`hermes sessions browse` is interactive — use /resume here, or run browse in another terminal" if a0 == "config" and len(argv) > 1 and argv[1].lower() == "edit": return "`hermes config edit` needs $EDITOR in a real terminal" return None @method("cli.exec") def _(rid, params: dict) -> dict: """Run `python -m hermes_cli.main` with argv; capture stdout/stderr (non-interactive only).""" argv = params.get("argv", []) if not isinstance(argv, list) or not all(isinstance(x, str) for x in argv): return _err(rid, 4003, "argv must be list[str]") hint = _cli_exec_blocked(argv) if hint: return _ok(rid, {"blocked": True, "hint": hint, "code": -1, "output": ""}) try: r = subprocess.run( [sys.executable, "-m", "hermes_cli.main", *argv], capture_output=True, text=True, timeout=min(int(params.get("timeout", 240)), 600), cwd=os.getcwd(), env=os.environ.copy(), ) parts = [r.stdout or "", r.stderr or ""] out = "\n".join(p for p in parts if p).strip() or "(no output)" return _ok(rid, {"blocked": False, "code": r.returncode, "output": out[:48_000]}) except subprocess.TimeoutExpired: return _err(rid, 5016, "cli.exec: timeout") except Exception as e: return _err(rid, 5017, str(e)) @method("command.resolve") def _(rid, params: dict) -> dict: try: from hermes_cli.commands import resolve_command r = resolve_command(params.get("name", "")) if r: return _ok(rid, {"canonical": r.name, "description": r.description, "category": r.category}) return _err(rid, 4011, f"unknown command: {params.get('name')}") except Exception as e: return _err(rid, 5012, str(e)) @method("command.dispatch") def _(rid, params: dict) -> dict: name, arg = params.get("name", "").lstrip("/"), params.get("arg", "") session = _sessions.get(params.get("session_id", "")) qcmds = _load_cfg().get("quick_commands", {}) if name in qcmds: qc = qcmds[name] if qc.get("type") == "exec": r = subprocess.run(qc.get("command", ""), shell=True, capture_output=True, text=True, timeout=30) return _ok(rid, {"type": "exec", "output": (r.stdout or r.stderr)[:4000]}) if qc.get("type") == "alias": return _ok(rid, {"type": "alias", "target": qc.get("target", "")}) try: from hermes_cli.plugins import get_plugin_command_handler handler = get_plugin_command_handler(name) if handler: return _ok(rid, {"type": "plugin", "output": str(handler(arg) or "")}) except Exception: pass try: from agent.skill_commands import scan_skill_commands, build_skill_invocation_message cmds = scan_skill_commands() key = f"/{name}" if key in cmds: msg = build_skill_invocation_message(key, arg, task_id=session.get("session_key", "") if session else "") if msg: return _ok(rid, {"type": "skill", "message": msg, "name": cmds[key].get("name", name)}) except Exception: pass return _err(rid, 4018, f"not a quick/plugin/skill command: {name}") # ── Methods: voice ─────────────────────────────────────────────────── @method("voice.toggle") def _(rid, params: dict) -> dict: action = params.get("action", "status") if action == "status": return _ok(rid, {"enabled": os.environ.get("HERMES_VOICE", "0") == "1"}) if action in ("on", "off"): os.environ["HERMES_VOICE"] = "1" if action == "on" else "0" return _ok(rid, {"enabled": action == "on"}) return _err(rid, 4013, f"unknown voice action: {action}") @method("voice.record") def _(rid, params: dict) -> dict: action = params.get("action", "start") try: if action == "start": from hermes_cli.voice import start_recording start_recording() return _ok(rid, {"status": "recording"}) if action == "stop": from hermes_cli.voice import stop_and_transcribe return _ok(rid, {"text": stop_and_transcribe() or ""}) return _err(rid, 4019, f"unknown voice action: {action}") except ImportError: return _err(rid, 5025, "voice module not available — install audio dependencies") except Exception as e: return _err(rid, 5025, str(e)) @method("voice.tts") def _(rid, params: dict) -> dict: text = params.get("text", "") if not text: return _err(rid, 4020, "text required") try: from hermes_cli.voice import speak_text threading.Thread(target=speak_text, args=(text,), daemon=True).start() return _ok(rid, {"status": "speaking"}) except ImportError: return _err(rid, 5026, "voice module not available") except Exception as e: return _err(rid, 5026, str(e)) # ── Methods: insights ──────────────────────────────────────────────── @method("insights.get") def _(rid, params: dict) -> dict: days = params.get("days", 30) try: import time cutoff = time.time() - days * 86400 rows = [s for s in _get_db().list_sessions_rich(limit=500) if (s.get("started_at") or 0) >= cutoff] return _ok(rid, {"days": days, "sessions": len(rows), "messages": sum(s.get("message_count", 0) for s in rows)}) except Exception as e: return _err(rid, 5017, str(e)) # ── Methods: rollback ──────────────────────────────────────────────── @method("rollback.list") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err try: def go(mgr, cwd): if not mgr.enabled: return _ok(rid, {"enabled": False, "checkpoints": []}) return _ok(rid, {"enabled": True, "checkpoints": [ {"hash": c.get("hash", ""), "timestamp": c.get("timestamp", ""), "message": c.get("message", "")} for c in mgr.list_checkpoints(cwd)]}) return _with_checkpoints(session, go) except Exception as e: return _err(rid, 5020, str(e)) @method("rollback.restore") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err target = params.get("hash", "") if not target: return _err(rid, 4014, "hash required") try: return _ok(rid, _with_checkpoints(session, lambda mgr, cwd: mgr.restore(cwd, target))) except Exception as e: return _err(rid, 5021, str(e)) @method("rollback.diff") def _(rid, params: dict) -> dict: session, err = _sess(params, rid) if err: return err target = params.get("hash", "") if not target: return _err(rid, 4014, "hash required") try: r = _with_checkpoints(session, lambda mgr, cwd: mgr.diff(cwd, target)) raw = r.get("diff", "")[:4000] payload = {"stat": r.get("stat", ""), "diff": raw} rendered = render_diff(raw, session.get("cols", 80)) if rendered: payload["rendered"] = rendered return _ok(rid, payload) except Exception as e: return _err(rid, 5022, str(e)) # ── Methods: browser / plugins / cron / skills ─────────────────────── @method("browser.manage") def _(rid, params: dict) -> dict: action = params.get("action", "status") if action == "status": url = os.environ.get("BROWSER_CDP_URL", "") return _ok(rid, {"connected": bool(url), "url": url}) if action == "connect": url = params.get("url", "http://localhost:9222") os.environ["BROWSER_CDP_URL"] = url try: from tools.browser_tool import cleanup_all_browsers cleanup_all_browsers() except Exception: pass return _ok(rid, {"connected": True, "url": url}) if action == "disconnect": os.environ.pop("BROWSER_CDP_URL", None) try: from tools.browser_tool import cleanup_all_browsers cleanup_all_browsers() except Exception: pass return _ok(rid, {"connected": False}) return _err(rid, 4015, f"unknown action: {action}") @method("plugins.list") def _(rid, params: dict) -> dict: try: from hermes_cli.plugins import get_plugin_manager return _ok(rid, {"plugins": [ {"name": n, "version": getattr(i, "version", "?"), "enabled": getattr(i, "enabled", True)} for n, i in get_plugin_manager()._plugins.items()]}) except Exception: return _ok(rid, {"plugins": []}) @method("cron.manage") def _(rid, params: dict) -> dict: action, jid = params.get("action", "list"), params.get("name", "") try: from tools.cronjob_tools import cronjob if action == "list": return _ok(rid, json.loads(cronjob(action="list"))) if action == "add": return _ok(rid, json.loads(cronjob(action="create", name=jid, schedule=params.get("schedule", ""), prompt=params.get("prompt", "")))) if action in ("remove", "pause", "resume"): return _ok(rid, json.loads(cronjob(action=action, job_id=jid))) return _err(rid, 4016, f"unknown cron action: {action}") except Exception as e: return _err(rid, 5023, str(e)) @method("skills.manage") def _(rid, params: dict) -> dict: action, query = params.get("action", "list"), params.get("query", "") try: if action == "list": from hermes_cli.banner import get_available_skills return _ok(rid, {"skills": get_available_skills()}) if action == "search": from hermes_cli.skills_hub import unified_search, GitHubAuth, create_source_router raw = unified_search(query, create_source_router(GitHubAuth()), source_filter="all", limit=20) or [] return _ok(rid, {"results": [{"name": r.name, "description": r.description} for r in raw]}) if action == "install": from hermes_cli.skills_hub import do_install class _Q: def print(self, *a, **k): pass do_install(query, skip_confirm=True, console=_Q()) return _ok(rid, {"installed": True, "name": query}) if action == "browse": from hermes_cli.skills_hub import browse_skills return _ok(rid, {"results": [{"name": r.get("name", ""), "description": r.get("description", "")} for r in (browse_skills(page=int(query) if query.isdigit() else 1) or [])]}) if action == "inspect": from hermes_cli.skills_hub import inspect_skill return _ok(rid, {"info": inspect_skill(query) or {}}) return _err(rid, 4017, f"unknown skills action: {action}") except Exception as e: return _err(rid, 5024, str(e)) # ── Methods: shell ─────────────────────────────────────────────────── @method("shell.exec") def _(rid, params: dict) -> dict: cmd = params.get("command", "") if not cmd: return _err(rid, 4004, "empty command") try: from tools.approval import detect_dangerous_command is_dangerous, _, desc = detect_dangerous_command(cmd) if is_dangerous: return _err(rid, 4005, f"blocked: {desc}. Use the agent for dangerous commands.") except ImportError: pass try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30, cwd=os.getcwd()) return _ok(rid, {"stdout": r.stdout[-4000:], "stderr": r.stderr[-2000:], "code": r.returncode}) except subprocess.TimeoutExpired: return _err(rid, 5002, "command timed out (30s)") except Exception as e: return _err(rid, 5003, str(e))