feat(desktop): add read-only remote filesystem API

This commit is contained in:
yoniebans 2026-06-09 18:57:36 +02:00 committed by Teknium
parent e71d746820
commit 51f47f9a97
2 changed files with 413 additions and 0 deletions

View file

@ -920,6 +920,178 @@ class ManagedFilesPolicy:
can_change_path: bool
_FS_READDIR_HIDDEN = {
".git",
".hg",
".svn",
".cache",
".next",
".turbo",
".venv",
"__pycache__",
"build",
"dist",
"node_modules",
"target",
"venv",
}
_FS_DATA_URL_MAX_BYTES = 16 * 1024 * 1024
_FS_TEXT_SOURCE_MAX_BYTES = 64 * 1024 * 1024
_FS_TEXT_PREVIEW_MAX_BYTES = 512 * 1024
_FS_PREVIEW_LANGUAGE_BY_EXT = {
".c": "c",
".conf": "ini",
".cpp": "cpp",
".css": "css",
".csv": "csv",
".go": "go",
".graphql": "graphql",
".h": "c",
".hpp": "cpp",
".html": "html",
".java": "java",
".js": "javascript",
".json": "json",
".jsx": "jsx",
".kt": "kotlin",
".lua": "lua",
".md": "markdown",
".mjs": "javascript",
".py": "python",
".rb": "ruby",
".rs": "rust",
".sh": "shell",
".sql": "sql",
".svg": "xml",
".toml": "toml",
".ts": "typescript",
".tsx": "tsx",
".txt": "text",
".xml": "xml",
".yaml": "yaml",
".yml": "yaml",
".zsh": "shell",
}
_FS_MIME_TYPES = {
".avi": "video/x-msvideo",
".bmp": "image/bmp",
".flac": "audio/flac",
".gif": "image/gif",
".jpeg": "image/jpeg",
".jpg": "image/jpeg",
".m4a": "audio/mp4",
".mkv": "video/x-matroska",
".mov": "video/quicktime",
".mp3": "audio/mpeg",
".mp4": "video/mp4",
".ogg": "audio/ogg",
".opus": "audio/ogg; codecs=opus",
".png": "image/png",
".svg": "image/svg+xml",
".wav": "audio/wav",
".webm": "video/webm",
".webp": "image/webp",
}
def _fs_path(raw_path: str) -> Path:
raw = str(raw_path or "").strip()
if not raw:
raise HTTPException(status_code=400, detail="Path is required")
if "\0" in raw:
raise HTTPException(status_code=400, detail="Invalid path")
try:
if raw.lower().startswith("file:"):
parsed = urllib.parse.urlparse(raw)
if parsed.netloc and parsed.netloc not in {"", "localhost"}:
raise ValueError
raw = urllib.request.url2pathname(parsed.path)
candidate = Path(raw).expanduser()
if not candidate.is_absolute():
candidate = Path.cwd() / candidate
return candidate.resolve(strict=False)
except (OSError, RuntimeError, ValueError):
raise HTTPException(status_code=400, detail="Invalid path")
def _fs_mime_type(path: Path) -> str:
suffix = path.suffix.lower()
if suffix in _FS_MIME_TYPES:
return _FS_MIME_TYPES[suffix]
guessed, _ = mimetypes.guess_type(str(path))
return guessed or "application/octet-stream"
def _fs_looks_binary(data: bytes) -> bool:
if not data:
return False
if b"\0" in data:
return True
suspicious = sum(1 for byte in data if byte < 32 and byte not in {9, 10, 13})
return suspicious / len(data) > 0.12
def _fs_regular_file(path: Path) -> tuple[Path, os.stat_result]:
target = _fs_path(str(path))
try:
st = target.stat()
except FileNotFoundError:
raise HTTPException(status_code=404, detail="File not found")
except NotADirectoryError:
raise HTTPException(status_code=404, detail="File not found")
except PermissionError:
raise HTTPException(status_code=403, detail="File is not readable")
except OSError as exc:
raise HTTPException(status_code=400, detail=str(exc) or "Invalid path")
if stat.S_ISDIR(st.st_mode):
raise HTTPException(status_code=400, detail="Path points to a directory")
if not stat.S_ISREG(st.st_mode):
raise HTTPException(status_code=400, detail="Only regular files can be read")
return target, st
def _fs_find_git_root(start: Path) -> str | None:
directory = start
for _ in range(50):
try:
if (directory / ".git").exists():
return str(directory)
except OSError:
return None
parent = directory.parent
if parent == directory:
return None
directory = parent
return None
def _fs_default_cwd() -> str:
cfg_terminal = load_config().get("terminal") or {}
raw = str(cfg_terminal.get("cwd") or os.environ.get("TERMINAL_CWD") or "").strip()
if raw and raw not in {".", "auto", "cwd"}:
try:
candidate = Path(raw).expanduser().resolve(strict=False)
if candidate.is_dir():
return str(candidate)
except (OSError, RuntimeError):
pass
return str(Path.cwd())
def _fs_git_branch(cwd: str) -> str:
try:
result = subprocess.run(
["git", "-C", cwd, "branch", "--show-current"],
capture_output=True,
text=True,
timeout=2,
check=False,
)
return result.stdout.strip() if result.returncode == 0 else ""
except Exception:
return ""
def _media_serve_roots() -> list[Path]:
"""Directories ``GET /api/media`` is allowed to read from.
@ -1264,6 +1436,87 @@ async def delete_managed_file(payload: ManagedFileDelete, request: Request):
return {"ok": True, "path": display_path, **_managed_response_meta(policy)}
@app.get("/api/fs/list")
async def fs_list(path: str):
target = _fs_path(path)
try:
entries = []
with os.scandir(target) as scan:
for entry in scan:
if entry.name in _FS_READDIR_HIDDEN:
continue
entries.append({
"name": entry.name,
"path": str(target / entry.name),
"isDirectory": entry.is_dir(follow_symlinks=False),
})
entries.sort(key=lambda item: (not item["isDirectory"], item["name"].lower(), item["name"]))
return {"entries": entries}
except FileNotFoundError:
return {"entries": [], "error": "ENOENT"}
except NotADirectoryError:
return {"entries": [], "error": "ENOTDIR"}
except PermissionError:
return {"entries": [], "error": "EACCES"}
except OSError as exc:
return {"entries": [], "error": getattr(exc, "strerror", None) or "read-error"}
@app.get("/api/fs/read-text")
async def fs_read_text(path: str):
target, st = _fs_regular_file(_fs_path(path))
if st.st_size > _FS_TEXT_SOURCE_MAX_BYTES:
raise HTTPException(status_code=413, detail="File too large")
bytes_to_read = min(st.st_size, _FS_TEXT_PREVIEW_MAX_BYTES)
try:
with target.open("rb") as handle:
data = handle.read(bytes_to_read)
except PermissionError:
raise HTTPException(status_code=403, detail="File is not readable")
except OSError as exc:
raise HTTPException(status_code=400, detail=str(exc) or "File read failed")
return {
"binary": _fs_looks_binary(data[:4096]),
"byteSize": st.st_size,
"language": _FS_PREVIEW_LANGUAGE_BY_EXT.get(target.suffix.lower(), "text"),
"mimeType": _fs_mime_type(target),
"path": str(target),
"text": data.decode("utf-8", errors="replace"),
"truncated": st.st_size > _FS_TEXT_PREVIEW_MAX_BYTES,
}
@app.get("/api/fs/read-data-url")
async def fs_read_data_url(path: str):
target, st = _fs_regular_file(_fs_path(path))
if st.st_size > _FS_DATA_URL_MAX_BYTES:
raise HTTPException(status_code=413, detail="File too large")
try:
encoded = base64.b64encode(target.read_bytes()).decode("ascii")
except PermissionError:
raise HTTPException(status_code=403, detail="File is not readable")
except OSError as exc:
raise HTTPException(status_code=400, detail=str(exc) or "File read failed")
return {"dataUrl": f"data:{_fs_mime_type(target)};base64,{encoded}"}
@app.get("/api/fs/git-root")
async def fs_git_root(path: str):
target = _fs_path(path)
try:
st = target.stat()
start = target if stat.S_ISDIR(st.st_mode) else target.parent
except OSError:
start = target
return {"root": _fs_find_git_root(start)}
@app.get("/api/fs/default-cwd")
async def fs_default_cwd():
cwd = _fs_default_cwd()
return {"cwd": cwd, "branch": _fs_git_branch(cwd)}
@app.get("/api/status")
async def get_status():
current_ver, latest_ver = check_config_version()

View file

@ -0,0 +1,160 @@
import base64
from pathlib import Path
import pytest
from hermes_cli import web_server
pytest.importorskip("starlette.testclient")
from starlette.testclient import TestClient
@pytest.fixture
def client(monkeypatch):
previous_auth_required = getattr(web_server.app.state, "auth_required", None)
web_server.app.state.auth_required = False
test_client = TestClient(web_server.app)
test_client.headers[web_server._SESSION_HEADER_NAME] = web_server._SESSION_TOKEN
try:
yield test_client
finally:
if previous_auth_required is None:
try:
delattr(web_server.app.state, "auth_required")
except AttributeError:
pass
else:
web_server.app.state.auth_required = previous_auth_required
def test_fs_list_sorts_and_hides_noise(client, tmp_path):
root = tmp_path / "project"
root.mkdir()
(root / "b.txt").write_text("b")
(root / "a_dir").mkdir()
(root / "a.txt").write_text("a")
(root / "node_modules").mkdir()
(root / ".git").mkdir()
response = client.get("/api/fs/list", params={"path": str(root)})
assert response.status_code == 200
entries = response.json()["entries"]
assert [entry["name"] for entry in entries] == ["a_dir", "a.txt", "b.txt"]
assert entries[0] == {"name": "a_dir", "path": str(root / "a_dir"), "isDirectory": True}
assert all(entry["name"] not in {".git", "node_modules"} for entry in entries)
def test_fs_list_accepts_relative_paths(client, tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
(tmp_path / "rel").mkdir()
(tmp_path / "rel" / "file.txt").write_text("ok")
response = client.get("/api/fs/list", params={"path": "rel"})
assert response.status_code == 200
assert response.json()["entries"] == [
{"name": "file.txt", "path": str(tmp_path / "rel" / "file.txt"), "isDirectory": False}
]
def test_fs_list_missing_path_returns_structured_error(client, tmp_path):
response = client.get("/api/fs/list", params={"path": str(tmp_path / "missing")})
assert response.status_code == 200
assert response.json() == {"entries": [], "error": "ENOENT"}
def test_fs_read_text_matches_preview_shape_and_truncates(client, tmp_path, monkeypatch):
monkeypatch.setattr(web_server, "_FS_TEXT_SOURCE_MAX_BYTES", 32)
monkeypatch.setattr(web_server, "_FS_TEXT_PREVIEW_MAX_BYTES", 5)
target = tmp_path / "sample.py"
target.write_text("print('hello')")
response = client.get("/api/fs/read-text", params={"path": str(target)})
assert response.status_code == 200
assert response.json() == {
"binary": False,
"byteSize": 14,
"language": "python",
"mimeType": "text/x-python",
"path": str(target),
"text": "print",
"truncated": True,
}
def test_fs_read_text_rejects_source_over_cap(client, tmp_path, monkeypatch):
monkeypatch.setattr(web_server, "_FS_TEXT_SOURCE_MAX_BYTES", 4)
target = tmp_path / "large.txt"
target.write_text("12345")
response = client.get("/api/fs/read-text", params={"path": str(target)})
assert response.status_code == 413
def test_fs_read_text_flags_binary(client, tmp_path):
target = tmp_path / "blob.bin"
target.write_bytes(b"hello\x00world")
response = client.get("/api/fs/read-text", params={"path": str(target)})
assert response.status_code == 200
body = response.json()
assert body["binary"] is True
assert body["text"].startswith("hello")
def test_fs_read_data_url_returns_capped_data_url(client, tmp_path, monkeypatch):
monkeypatch.setattr(web_server, "_FS_DATA_URL_MAX_BYTES", 16)
target = tmp_path / "image.png"
target.write_bytes(b"pngbytes")
response = client.get("/api/fs/read-data-url", params={"path": str(target)})
assert response.status_code == 200
assert response.json() == {"dataUrl": "data:image/png;base64," + base64.b64encode(b"pngbytes").decode("ascii")}
def test_fs_read_data_url_rejects_over_cap(client, tmp_path, monkeypatch):
monkeypatch.setattr(web_server, "_FS_DATA_URL_MAX_BYTES", 3)
target = tmp_path / "image.png"
target.write_bytes(b"1234")
response = client.get("/api/fs/read-data-url", params={"path": str(target)})
assert response.status_code == 413
def test_fs_git_root_for_nested_file(client, tmp_path):
(tmp_path / ".git").mkdir()
nested = tmp_path / "pkg" / "mod"
nested.mkdir(parents=True)
target = nested / "file.py"
target.write_text("x")
response = client.get("/api/fs/git-root", params={"path": str(target)})
assert response.status_code == 200
assert response.json() == {"root": str(tmp_path)}
def test_fs_git_root_returns_null_outside_repo(client, tmp_path):
response = client.get("/api/fs/git-root", params={"path": str(tmp_path)})
assert response.status_code == 200
assert response.json() == {"root": None}
def test_fs_endpoints_require_auth(tmp_path):
client = TestClient(web_server.app)
target = tmp_path / "secret.txt"
target.write_text("secret")
list_response = client.get("/api/fs/list", params={"path": str(tmp_path)})
read_response = client.get("/api/fs/read-text", params={"path": str(target)})
assert list_response.status_code == 401
assert read_response.status_code == 401