mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-30 11:52:04 +00:00
refactor(web_git): unify porcelain-v2 parsing into one walker
Collapse the two near-duplicate status parsers (_parse_status_v2 + _iter_status_entries) into a single _walk_entries generator feeding the rail, review list, and commit flow; share the staged predicate; hoist `import re`. Behavior unchanged.
This commit is contained in:
parent
fc86e35764
commit
e4cf3a2e9d
1 changed files with 85 additions and 142 deletions
|
|
@ -14,6 +14,7 @@ from __future__ import annotations
|
|||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
|
@ -152,87 +153,49 @@ def _default_branch_name(cwd: str) -> str | None:
|
|||
# ── porcelain v2 status parsing ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def _parse_status_v2(cwd: str) -> dict | None:
|
||||
"""Parse ``git status --porcelain=v2 --branch -z`` into branch + classified
|
||||
files. None when ``cwd`` isn't a git repo."""
|
||||
code, out, _ = _git(cwd, ["status", "--porcelain=v2", "--branch", "-z"])
|
||||
if code != 0:
|
||||
return None
|
||||
|
||||
branch: str | None = None
|
||||
detached = False
|
||||
ahead = behind = 0
|
||||
files: list[dict] = []
|
||||
untracked = 0
|
||||
conflicted = 0
|
||||
|
||||
records = out.split("\0")
|
||||
def _walk_entries(raw: str):
|
||||
"""Yield (tag, xy, path) per changed file from ``git status --porcelain=v2 -z``,
|
||||
skipping branch headers and the rename/copy origin-path records. One walker
|
||||
feeds the rail, the review list, and the commit flow."""
|
||||
records = raw.split("\0")
|
||||
i = 0
|
||||
while i < len(records):
|
||||
rec = records[i]
|
||||
if not rec:
|
||||
i += 1
|
||||
continue
|
||||
tag = rec[0]
|
||||
if tag == "#":
|
||||
if rec.startswith("# branch.head "):
|
||||
head = rec[len("# branch.head ") :]
|
||||
if head == "(detached)":
|
||||
detached = True
|
||||
else:
|
||||
branch = head
|
||||
elif rec.startswith("# branch.ab "):
|
||||
for tok in rec[len("# branch.ab ") :].split():
|
||||
if tok.startswith("+"):
|
||||
ahead = int(tok[1:] or 0)
|
||||
elif tok.startswith("-"):
|
||||
behind = int(tok[1:] or 0)
|
||||
tag = rec[0] if rec else ""
|
||||
if tag == "?":
|
||||
yield "?", "??", rec[2:]
|
||||
elif tag == "u":
|
||||
yield "u", rec.split(" ")[1], rec.split(" ", 10)[-1]
|
||||
elif tag in ("1", "2"):
|
||||
fields = rec.split(" ")
|
||||
xy = fields[1]
|
||||
xy = rec.split(" ")[1]
|
||||
path = rec.split(" ", 8)[-1] if tag == "1" else rec.split(" ", 9)[-1]
|
||||
if tag == "2":
|
||||
# Rename/copy: NUL-separated origin path follows in the next record.
|
||||
i += 1
|
||||
files.append(_classify(xy, resolve_rename_path(path)))
|
||||
elif tag == "u":
|
||||
path = rec.split(" ", 10)[-1]
|
||||
files.append({"path": path, "staged": False, "unstaged": False, "untracked": False, "conflicted": True})
|
||||
conflicted += 1
|
||||
elif tag == "?":
|
||||
path = rec[2:]
|
||||
files.append({"path": path, "staged": False, "unstaged": True, "untracked": True, "conflicted": False})
|
||||
untracked += 1
|
||||
i += 1 # rename/copy: the origin path is the next NUL record
|
||||
yield tag, xy, resolve_rename_path(path)
|
||||
i += 1
|
||||
|
||||
return {
|
||||
"branch": branch,
|
||||
"detached": detached,
|
||||
"ahead": ahead,
|
||||
"behind": behind,
|
||||
"files": files,
|
||||
"untracked": untracked,
|
||||
"conflicted": conflicted,
|
||||
}
|
||||
|
||||
def _entry_staged(tag: str, xy: str) -> bool:
|
||||
"""A tracked entry whose index (staged) code is set."""
|
||||
return tag in ("1", "2") and xy[0] not in (".", "?")
|
||||
|
||||
|
||||
def _classify(xy: str, path: str) -> dict:
|
||||
x = xy[0] if xy else "."
|
||||
def _classify(tag: str, xy: str, path: str) -> dict:
|
||||
y = xy[1] if len(xy) > 1 else "."
|
||||
return {
|
||||
"path": path,
|
||||
"staged": x not in (".", "?"),
|
||||
"unstaged": y not in (".", "?"),
|
||||
"untracked": False,
|
||||
"conflicted": x == "U" or y == "U",
|
||||
"staged": _entry_staged(tag, xy),
|
||||
"unstaged": tag == "?" or (tag in ("1", "2") and y not in (".", "?")),
|
||||
"untracked": tag == "?",
|
||||
"conflicted": tag == "u",
|
||||
}
|
||||
|
||||
|
||||
def _status_letter(xy: str) -> str:
|
||||
x = xy[0] if xy else "."
|
||||
y = xy[1] if len(xy) > 1 else "."
|
||||
code = x if x != "." else y
|
||||
return (code or "M").upper()
|
||||
def _status_letter(tag: str, xy: str) -> str:
|
||||
if tag in ("?", "u"):
|
||||
return tag.upper() if tag == "u" else "?"
|
||||
code = xy[0] if xy[0] != "." else (xy[1] if len(xy) > 1 else ".")
|
||||
return (code if code != "." else "M").upper()
|
||||
|
||||
|
||||
# ── coding rail ──────────────────────────────────────────────────────────────
|
||||
|
|
@ -242,39 +205,50 @@ def repo_status(cwd: str) -> dict | None:
|
|||
"""Compact working-tree status for the coding rail. None on a non-repo."""
|
||||
if not _is_dir(cwd):
|
||||
return None
|
||||
parsed = _parse_status_v2(cwd)
|
||||
if parsed is None:
|
||||
|
||||
code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "--branch", "-z"])
|
||||
if code != 0:
|
||||
return None
|
||||
|
||||
files = parsed["files"]
|
||||
branch: str | None = None
|
||||
detached = False
|
||||
ahead = behind = 0
|
||||
for rec in raw.split("\0"):
|
||||
if rec.startswith("# branch.head "):
|
||||
head = rec[len("# branch.head ") :]
|
||||
detached = head == "(detached)"
|
||||
branch = None if detached else head
|
||||
elif rec.startswith("# branch.ab "):
|
||||
for tok in rec.split()[2:]:
|
||||
if tok.startswith("+"):
|
||||
ahead = int(tok[1:] or 0)
|
||||
elif tok.startswith("-"):
|
||||
behind = int(tok[1:] or 0)
|
||||
|
||||
files = [_classify(tag, xy, path) for tag, xy, path in _walk_entries(raw)]
|
||||
|
||||
# +/- vs HEAD (tracked), then fold in untracked insertions — `git diff HEAD`
|
||||
# ignores them, so a new-file-only turn would otherwise read +0 (bounded scan).
|
||||
added = removed = 0
|
||||
summary = _numstat(cwd, ["HEAD"])
|
||||
for a, r in summary.values():
|
||||
for a, r in _numstat(cwd, ["HEAD"]).values():
|
||||
added += a
|
||||
removed += r
|
||||
|
||||
# `git diff HEAD` ignores untracked files; fold their insertions into `added`
|
||||
# so a new-file-only turn registers in the rail (bounded scan).
|
||||
untracked_paths = [f["path"] for f in files if f["untracked"]][:_UNTRACKED_SCAN_CAP]
|
||||
for rel in untracked_paths:
|
||||
added += _untracked_insertions(cwd, rel)
|
||||
added += sum(_untracked_insertions(cwd, f["path"]) for f in files[:_UNTRACKED_SCAN_CAP] if f["untracked"])
|
||||
|
||||
return {
|
||||
"branch": None if parsed["detached"] else parsed["branch"],
|
||||
"branch": branch,
|
||||
"defaultBranch": _default_branch_name(cwd),
|
||||
"detached": parsed["detached"],
|
||||
"ahead": parsed["ahead"],
|
||||
"behind": parsed["behind"],
|
||||
"staged": sum(1 for f in files if f["staged"]),
|
||||
"unstaged": sum(1 for f in files if f["unstaged"]),
|
||||
"untracked": parsed["untracked"],
|
||||
"conflicted": parsed["conflicted"],
|
||||
"detached": detached,
|
||||
"ahead": ahead,
|
||||
"behind": behind,
|
||||
"staged": sum(f["staged"] for f in files),
|
||||
"unstaged": sum(f["unstaged"] for f in files),
|
||||
"untracked": sum(f["untracked"] for f in files),
|
||||
"conflicted": sum(f["conflicted"] for f in files),
|
||||
"changed": len(files),
|
||||
"added": added,
|
||||
"removed": removed,
|
||||
"files": [
|
||||
{k: f[k] for k in ("path", "staged", "unstaged", "untracked", "conflicted")} for f in files[:200]
|
||||
],
|
||||
"files": files[:200],
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -291,30 +265,30 @@ def review_list(cwd: str, scope: str, base_ref: str | None) -> dict:
|
|||
if not base:
|
||||
return {"files": [], "base": None}
|
||||
rng = f"{base}...HEAD" if scope == "branch" else base
|
||||
counts = _numstat(cwd, [rng])
|
||||
files = [
|
||||
{"path": path, "added": a, "removed": r, "status": "M", "staged": False}
|
||||
for path, (a, r) in counts.items()
|
||||
for path, (a, r) in _numstat(cwd, [rng]).items()
|
||||
]
|
||||
if scope == "lastTurn":
|
||||
parsed = _parse_status_v2(cwd)
|
||||
for f in parsed["files"] if parsed else []:
|
||||
if f["untracked"] and not any(x["path"] == f["path"] for x in files):
|
||||
files.append({"path": f["path"], "added": 0, "removed": 0, "status": "?", "staged": False})
|
||||
seen = {f["path"] for f in files}
|
||||
_, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"])
|
||||
files += [
|
||||
{"path": path, "added": 0, "removed": 0, "status": "?", "staged": False}
|
||||
for tag, _xy, path in _walk_entries(raw)
|
||||
if tag == "?" and path not in seen
|
||||
]
|
||||
files.sort(key=lambda f: f["path"])
|
||||
_fill_untracked_counts(cwd, files)
|
||||
return {"files": files, "base": base}
|
||||
|
||||
parsed = _parse_status_v2(cwd)
|
||||
if parsed is None:
|
||||
code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"])
|
||||
if code != 0:
|
||||
return {"files": [], "base": None}
|
||||
staged = _numstat(cwd, ["--cached"])
|
||||
unstaged = _numstat(cwd, [])
|
||||
|
||||
files = []
|
||||
code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"])
|
||||
for entry in _iter_status_entries(raw):
|
||||
path = entry["path"]
|
||||
for tag, xy, path in _walk_entries(raw):
|
||||
sa, sr = staged.get(path, (0, 0))
|
||||
ua, ur = unstaged.get(path, (0, 0))
|
||||
files.append(
|
||||
|
|
@ -322,8 +296,8 @@ def review_list(cwd: str, scope: str, base_ref: str | None) -> dict:
|
|||
"path": path,
|
||||
"added": sa + ua,
|
||||
"removed": sr + ur,
|
||||
"status": entry["letter"],
|
||||
"staged": entry["staged"],
|
||||
"status": _status_letter(tag, xy),
|
||||
"staged": _entry_staged(tag, xy),
|
||||
}
|
||||
)
|
||||
files.sort(key=lambda f: f["path"])
|
||||
|
|
@ -331,32 +305,6 @@ def review_list(cwd: str, scope: str, base_ref: str | None) -> dict:
|
|||
return {"files": files, "base": None}
|
||||
|
||||
|
||||
def _iter_status_entries(raw: str):
|
||||
"""Yield {path, letter, staged} from porcelain v2 -z output (for review_list)."""
|
||||
records = raw.split("\0")
|
||||
i = 0
|
||||
while i < len(records):
|
||||
rec = records[i]
|
||||
if not rec:
|
||||
i += 1
|
||||
continue
|
||||
tag = rec[0]
|
||||
if tag in ("1", "2"):
|
||||
xy = rec.split(" ")[1]
|
||||
path = rec.split(" ", 8)[-1] if tag == "1" else rec.split(" ", 9)[-1]
|
||||
if tag == "2":
|
||||
i += 1
|
||||
path = resolve_rename_path(path)
|
||||
x = xy[0] if xy else "."
|
||||
yield {"path": path, "letter": _status_letter(xy), "staged": x not in (".", "?")}
|
||||
elif tag == "u":
|
||||
path = rec.split(" ", 10)[-1]
|
||||
yield {"path": path, "letter": "U", "staged": False}
|
||||
elif tag == "?":
|
||||
yield {"path": rec[2:], "letter": "?", "staged": False}
|
||||
i += 1
|
||||
|
||||
|
||||
def review_diff(cwd: str, file_path: str, scope: str, base_ref: str | None, staged: bool) -> str:
|
||||
if not _is_dir(cwd):
|
||||
return ""
|
||||
|
|
@ -415,8 +363,8 @@ def review_rev_parse(cwd: str, ref: str | None) -> str | None:
|
|||
|
||||
def review_commit(cwd: str, message: str, push: bool) -> dict:
|
||||
"""Commit the working tree; stage everything first when nothing is staged."""
|
||||
parsed = _parse_status_v2(cwd)
|
||||
if not parsed or not any(f["staged"] for f in parsed["files"]):
|
||||
_, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"])
|
||||
if not any(_entry_staged(tag, xy) for tag, xy, _ in _walk_entries(raw)):
|
||||
_git_ok(cwd, ["add", "-A"])
|
||||
_git_ok(cwd, ["commit", "-m", message])
|
||||
if push:
|
||||
|
|
@ -443,27 +391,26 @@ def review_commit_context(cwd: str) -> dict:
|
|||
"""Diff of what WILL commit + recent subjects, for drafting a commit message."""
|
||||
if not _is_dir(cwd):
|
||||
return {"diff": "", "recent": ""}
|
||||
parsed = _parse_status_v2(cwd)
|
||||
if parsed is None:
|
||||
code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"])
|
||||
if code != 0:
|
||||
return {"diff": "", "recent": ""}
|
||||
entries = list(_walk_entries(raw))
|
||||
|
||||
has_staged = any(f["staged"] for f in parsed["files"])
|
||||
has_staged = any(_entry_staged(tag, xy) for tag, xy, _ in entries)
|
||||
diff = _git_out(cwd, ["diff", "--cached"]) if has_staged else _git_out(cwd, ["diff", "HEAD"])
|
||||
if len(diff) > _COMMIT_CONTEXT_DIFF_MAX_CHARS:
|
||||
omitted = len(diff) - _COMMIT_CONTEXT_DIFF_MAX_CHARS
|
||||
diff = f"{diff[:_COMMIT_CONTEXT_DIFF_MAX_CHARS]}\n# diff truncated: {omitted} chars omitted\n"
|
||||
|
||||
untracked = [f["path"] for f in parsed["files"] if f["untracked"]]
|
||||
untracked = [path for tag, _xy, path in entries if tag == "?"]
|
||||
if untracked:
|
||||
visible = untracked[:_COMMIT_CONTEXT_UNTRACKED_MAX]
|
||||
omitted = len(untracked) - len(visible)
|
||||
note = "\n# New (untracked) files:\n" + "\n".join(f"# {p}" for p in visible) + "\n"
|
||||
if omitted > 0:
|
||||
note += f"# ... {omitted} more omitted\n"
|
||||
note = "\n# New (untracked) files:\n" + "".join(f"# {p}\n" for p in visible)
|
||||
if len(untracked) > len(visible):
|
||||
note += f"# ... {len(untracked) - len(visible)} more omitted\n"
|
||||
diff = f"{diff}{note}" if diff else note
|
||||
|
||||
recent = _git_out(cwd, ["log", "-n", "10", "--pretty=format:%s"]).strip()
|
||||
return {"diff": diff or "", "recent": recent}
|
||||
return {"diff": diff or "", "recent": _git_out(cwd, ["log", "-n", "10", "--pretty=format:%s"]).strip()}
|
||||
|
||||
|
||||
# ── ship flow (gh) ───────────────────────────────────────────────────────────
|
||||
|
|
@ -563,8 +510,6 @@ def _main_root(cwd: str) -> str:
|
|||
|
||||
|
||||
def _sanitize_branch(name: str) -> str:
|
||||
import re
|
||||
|
||||
value = str(name or "")
|
||||
value = re.sub(r"\s+", "-", value)
|
||||
value = re.sub(r"[^\w./-]", "", value)
|
||||
|
|
@ -575,8 +520,6 @@ def _sanitize_branch(name: str) -> str:
|
|||
|
||||
|
||||
def _slugify(name: str) -> str:
|
||||
import re
|
||||
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", str(name or "").strip().lower())
|
||||
slug = re.sub(r"^-+|-+$", "", slug)[:40].rstrip("-")
|
||||
return slug or "work"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue