From 51f47f9a9774c97fe1973f91ade0970f9700ee84 Mon Sep 17 00:00:00 2001 From: yoniebans Date: Tue, 9 Jun 2026 18:57:36 +0200 Subject: [PATCH] feat(desktop): add read-only remote filesystem API --- hermes_cli/web_server.py | 253 +++++++++++++++++++++++++ tests/hermes_cli/test_web_server_fs.py | 160 ++++++++++++++++ 2 files changed, 413 insertions(+) create mode 100644 tests/hermes_cli/test_web_server_fs.py diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index ac48354f0b1..c4b028a602e 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -920,6 +920,178 @@ class ManagedFilesPolicy: can_change_path: bool +_FS_READDIR_HIDDEN = { + ".git", + ".hg", + ".svn", + ".cache", + ".next", + ".turbo", + ".venv", + "__pycache__", + "build", + "dist", + "node_modules", + "target", + "venv", +} +_FS_DATA_URL_MAX_BYTES = 16 * 1024 * 1024 +_FS_TEXT_SOURCE_MAX_BYTES = 64 * 1024 * 1024 +_FS_TEXT_PREVIEW_MAX_BYTES = 512 * 1024 +_FS_PREVIEW_LANGUAGE_BY_EXT = { + ".c": "c", + ".conf": "ini", + ".cpp": "cpp", + ".css": "css", + ".csv": "csv", + ".go": "go", + ".graphql": "graphql", + ".h": "c", + ".hpp": "cpp", + ".html": "html", + ".java": "java", + ".js": "javascript", + ".json": "json", + ".jsx": "jsx", + ".kt": "kotlin", + ".lua": "lua", + ".md": "markdown", + ".mjs": "javascript", + ".py": "python", + ".rb": "ruby", + ".rs": "rust", + ".sh": "shell", + ".sql": "sql", + ".svg": "xml", + ".toml": "toml", + ".ts": "typescript", + ".tsx": "tsx", + ".txt": "text", + ".xml": "xml", + ".yaml": "yaml", + ".yml": "yaml", + ".zsh": "shell", +} +_FS_MIME_TYPES = { + ".avi": "video/x-msvideo", + ".bmp": "image/bmp", + ".flac": "audio/flac", + ".gif": "image/gif", + ".jpeg": "image/jpeg", + ".jpg": "image/jpeg", + ".m4a": "audio/mp4", + ".mkv": "video/x-matroska", + ".mov": "video/quicktime", + ".mp3": "audio/mpeg", + ".mp4": "video/mp4", + ".ogg": "audio/ogg", + ".opus": "audio/ogg; codecs=opus", + ".png": "image/png", + ".svg": "image/svg+xml", + ".wav": "audio/wav", + ".webm": "video/webm", + ".webp": "image/webp", +} + + +def _fs_path(raw_path: str) -> Path: + raw = str(raw_path or "").strip() + if not raw: + raise HTTPException(status_code=400, detail="Path is required") + if "\0" in raw: + raise HTTPException(status_code=400, detail="Invalid path") + try: + if raw.lower().startswith("file:"): + parsed = urllib.parse.urlparse(raw) + if parsed.netloc and parsed.netloc not in {"", "localhost"}: + raise ValueError + raw = urllib.request.url2pathname(parsed.path) + candidate = Path(raw).expanduser() + if not candidate.is_absolute(): + candidate = Path.cwd() / candidate + return candidate.resolve(strict=False) + except (OSError, RuntimeError, ValueError): + raise HTTPException(status_code=400, detail="Invalid path") + + +def _fs_mime_type(path: Path) -> str: + suffix = path.suffix.lower() + if suffix in _FS_MIME_TYPES: + return _FS_MIME_TYPES[suffix] + guessed, _ = mimetypes.guess_type(str(path)) + return guessed or "application/octet-stream" + + +def _fs_looks_binary(data: bytes) -> bool: + if not data: + return False + if b"\0" in data: + return True + suspicious = sum(1 for byte in data if byte < 32 and byte not in {9, 10, 13}) + return suspicious / len(data) > 0.12 + + +def _fs_regular_file(path: Path) -> tuple[Path, os.stat_result]: + target = _fs_path(str(path)) + try: + st = target.stat() + except FileNotFoundError: + raise HTTPException(status_code=404, detail="File not found") + except NotADirectoryError: + raise HTTPException(status_code=404, detail="File not found") + except PermissionError: + raise HTTPException(status_code=403, detail="File is not readable") + except OSError as exc: + raise HTTPException(status_code=400, detail=str(exc) or "Invalid path") + if stat.S_ISDIR(st.st_mode): + raise HTTPException(status_code=400, detail="Path points to a directory") + if not stat.S_ISREG(st.st_mode): + raise HTTPException(status_code=400, detail="Only regular files can be read") + return target, st + + +def _fs_find_git_root(start: Path) -> str | None: + directory = start + for _ in range(50): + try: + if (directory / ".git").exists(): + return str(directory) + except OSError: + return None + parent = directory.parent + if parent == directory: + return None + directory = parent + return None + + +def _fs_default_cwd() -> str: + cfg_terminal = load_config().get("terminal") or {} + raw = str(cfg_terminal.get("cwd") or os.environ.get("TERMINAL_CWD") or "").strip() + if raw and raw not in {".", "auto", "cwd"}: + try: + candidate = Path(raw).expanduser().resolve(strict=False) + if candidate.is_dir(): + return str(candidate) + except (OSError, RuntimeError): + pass + return str(Path.cwd()) + + +def _fs_git_branch(cwd: str) -> str: + try: + result = subprocess.run( + ["git", "-C", cwd, "branch", "--show-current"], + capture_output=True, + text=True, + timeout=2, + check=False, + ) + return result.stdout.strip() if result.returncode == 0 else "" + except Exception: + return "" + + def _media_serve_roots() -> list[Path]: """Directories ``GET /api/media`` is allowed to read from. @@ -1264,6 +1436,87 @@ async def delete_managed_file(payload: ManagedFileDelete, request: Request): return {"ok": True, "path": display_path, **_managed_response_meta(policy)} +@app.get("/api/fs/list") +async def fs_list(path: str): + target = _fs_path(path) + try: + entries = [] + with os.scandir(target) as scan: + for entry in scan: + if entry.name in _FS_READDIR_HIDDEN: + continue + entries.append({ + "name": entry.name, + "path": str(target / entry.name), + "isDirectory": entry.is_dir(follow_symlinks=False), + }) + entries.sort(key=lambda item: (not item["isDirectory"], item["name"].lower(), item["name"])) + return {"entries": entries} + except FileNotFoundError: + return {"entries": [], "error": "ENOENT"} + except NotADirectoryError: + return {"entries": [], "error": "ENOTDIR"} + except PermissionError: + return {"entries": [], "error": "EACCES"} + except OSError as exc: + return {"entries": [], "error": getattr(exc, "strerror", None) or "read-error"} + + +@app.get("/api/fs/read-text") +async def fs_read_text(path: str): + target, st = _fs_regular_file(_fs_path(path)) + if st.st_size > _FS_TEXT_SOURCE_MAX_BYTES: + raise HTTPException(status_code=413, detail="File too large") + bytes_to_read = min(st.st_size, _FS_TEXT_PREVIEW_MAX_BYTES) + try: + with target.open("rb") as handle: + data = handle.read(bytes_to_read) + except PermissionError: + raise HTTPException(status_code=403, detail="File is not readable") + except OSError as exc: + raise HTTPException(status_code=400, detail=str(exc) or "File read failed") + return { + "binary": _fs_looks_binary(data[:4096]), + "byteSize": st.st_size, + "language": _FS_PREVIEW_LANGUAGE_BY_EXT.get(target.suffix.lower(), "text"), + "mimeType": _fs_mime_type(target), + "path": str(target), + "text": data.decode("utf-8", errors="replace"), + "truncated": st.st_size > _FS_TEXT_PREVIEW_MAX_BYTES, + } + + +@app.get("/api/fs/read-data-url") +async def fs_read_data_url(path: str): + target, st = _fs_regular_file(_fs_path(path)) + if st.st_size > _FS_DATA_URL_MAX_BYTES: + raise HTTPException(status_code=413, detail="File too large") + try: + encoded = base64.b64encode(target.read_bytes()).decode("ascii") + except PermissionError: + raise HTTPException(status_code=403, detail="File is not readable") + except OSError as exc: + raise HTTPException(status_code=400, detail=str(exc) or "File read failed") + return {"dataUrl": f"data:{_fs_mime_type(target)};base64,{encoded}"} + + +@app.get("/api/fs/git-root") +async def fs_git_root(path: str): + target = _fs_path(path) + try: + st = target.stat() + start = target if stat.S_ISDIR(st.st_mode) else target.parent + except OSError: + start = target + return {"root": _fs_find_git_root(start)} + + +@app.get("/api/fs/default-cwd") +async def fs_default_cwd(): + cwd = _fs_default_cwd() + return {"cwd": cwd, "branch": _fs_git_branch(cwd)} + + @app.get("/api/status") async def get_status(): current_ver, latest_ver = check_config_version() diff --git a/tests/hermes_cli/test_web_server_fs.py b/tests/hermes_cli/test_web_server_fs.py new file mode 100644 index 00000000000..5696ede25a4 --- /dev/null +++ b/tests/hermes_cli/test_web_server_fs.py @@ -0,0 +1,160 @@ +import base64 +from pathlib import Path + +import pytest + +from hermes_cli import web_server + +pytest.importorskip("starlette.testclient") +from starlette.testclient import TestClient + + +@pytest.fixture +def client(monkeypatch): + previous_auth_required = getattr(web_server.app.state, "auth_required", None) + web_server.app.state.auth_required = False + test_client = TestClient(web_server.app) + test_client.headers[web_server._SESSION_HEADER_NAME] = web_server._SESSION_TOKEN + try: + yield test_client + finally: + if previous_auth_required is None: + try: + delattr(web_server.app.state, "auth_required") + except AttributeError: + pass + else: + web_server.app.state.auth_required = previous_auth_required + + +def test_fs_list_sorts_and_hides_noise(client, tmp_path): + root = tmp_path / "project" + root.mkdir() + (root / "b.txt").write_text("b") + (root / "a_dir").mkdir() + (root / "a.txt").write_text("a") + (root / "node_modules").mkdir() + (root / ".git").mkdir() + + response = client.get("/api/fs/list", params={"path": str(root)}) + + assert response.status_code == 200 + entries = response.json()["entries"] + assert [entry["name"] for entry in entries] == ["a_dir", "a.txt", "b.txt"] + assert entries[0] == {"name": "a_dir", "path": str(root / "a_dir"), "isDirectory": True} + assert all(entry["name"] not in {".git", "node_modules"} for entry in entries) + + +def test_fs_list_accepts_relative_paths(client, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + (tmp_path / "rel").mkdir() + (tmp_path / "rel" / "file.txt").write_text("ok") + + response = client.get("/api/fs/list", params={"path": "rel"}) + + assert response.status_code == 200 + assert response.json()["entries"] == [ + {"name": "file.txt", "path": str(tmp_path / "rel" / "file.txt"), "isDirectory": False} + ] + + +def test_fs_list_missing_path_returns_structured_error(client, tmp_path): + response = client.get("/api/fs/list", params={"path": str(tmp_path / "missing")}) + + assert response.status_code == 200 + assert response.json() == {"entries": [], "error": "ENOENT"} + + +def test_fs_read_text_matches_preview_shape_and_truncates(client, tmp_path, monkeypatch): + monkeypatch.setattr(web_server, "_FS_TEXT_SOURCE_MAX_BYTES", 32) + monkeypatch.setattr(web_server, "_FS_TEXT_PREVIEW_MAX_BYTES", 5) + target = tmp_path / "sample.py" + target.write_text("print('hello')") + + response = client.get("/api/fs/read-text", params={"path": str(target)}) + + assert response.status_code == 200 + assert response.json() == { + "binary": False, + "byteSize": 14, + "language": "python", + "mimeType": "text/x-python", + "path": str(target), + "text": "print", + "truncated": True, + } + + +def test_fs_read_text_rejects_source_over_cap(client, tmp_path, monkeypatch): + monkeypatch.setattr(web_server, "_FS_TEXT_SOURCE_MAX_BYTES", 4) + target = tmp_path / "large.txt" + target.write_text("12345") + + response = client.get("/api/fs/read-text", params={"path": str(target)}) + + assert response.status_code == 413 + + +def test_fs_read_text_flags_binary(client, tmp_path): + target = tmp_path / "blob.bin" + target.write_bytes(b"hello\x00world") + + response = client.get("/api/fs/read-text", params={"path": str(target)}) + + assert response.status_code == 200 + body = response.json() + assert body["binary"] is True + assert body["text"].startswith("hello") + + +def test_fs_read_data_url_returns_capped_data_url(client, tmp_path, monkeypatch): + monkeypatch.setattr(web_server, "_FS_DATA_URL_MAX_BYTES", 16) + target = tmp_path / "image.png" + target.write_bytes(b"pngbytes") + + response = client.get("/api/fs/read-data-url", params={"path": str(target)}) + + assert response.status_code == 200 + assert response.json() == {"dataUrl": "data:image/png;base64," + base64.b64encode(b"pngbytes").decode("ascii")} + + +def test_fs_read_data_url_rejects_over_cap(client, tmp_path, monkeypatch): + monkeypatch.setattr(web_server, "_FS_DATA_URL_MAX_BYTES", 3) + target = tmp_path / "image.png" + target.write_bytes(b"1234") + + response = client.get("/api/fs/read-data-url", params={"path": str(target)}) + + assert response.status_code == 413 + + +def test_fs_git_root_for_nested_file(client, tmp_path): + (tmp_path / ".git").mkdir() + nested = tmp_path / "pkg" / "mod" + nested.mkdir(parents=True) + target = nested / "file.py" + target.write_text("x") + + response = client.get("/api/fs/git-root", params={"path": str(target)}) + + assert response.status_code == 200 + assert response.json() == {"root": str(tmp_path)} + + +def test_fs_git_root_returns_null_outside_repo(client, tmp_path): + response = client.get("/api/fs/git-root", params={"path": str(tmp_path)}) + + assert response.status_code == 200 + assert response.json() == {"root": None} + + +def test_fs_endpoints_require_auth(tmp_path): + client = TestClient(web_server.app) + target = tmp_path / "secret.txt" + target.write_text("secret") + + list_response = client.get("/api/fs/list", params={"path": str(tmp_path)}) + read_response = client.get("/api/fs/read-text", params={"path": str(target)}) + + assert list_response.status_code == 401 + assert read_response.status_code == 401