diff --git a/hermes_cli/web_git.py b/hermes_cli/web_git.py index a71974b605f..292f6c81144 100644 --- a/hermes_cli/web_git.py +++ b/hermes_cli/web_git.py @@ -14,6 +14,7 @@ from __future__ import annotations import json import os +import re import shutil import subprocess from pathlib import Path @@ -152,87 +153,49 @@ def _default_branch_name(cwd: str) -> str | None: # ── porcelain v2 status parsing ────────────────────────────────────────────── -def _parse_status_v2(cwd: str) -> dict | None: - """Parse ``git status --porcelain=v2 --branch -z`` into branch + classified - files. None when ``cwd`` isn't a git repo.""" - code, out, _ = _git(cwd, ["status", "--porcelain=v2", "--branch", "-z"]) - if code != 0: - return None - - branch: str | None = None - detached = False - ahead = behind = 0 - files: list[dict] = [] - untracked = 0 - conflicted = 0 - - records = out.split("\0") +def _walk_entries(raw: str): + """Yield (tag, xy, path) per changed file from ``git status --porcelain=v2 -z``, + skipping branch headers and the rename/copy origin-path records. One walker + feeds the rail, the review list, and the commit flow.""" + records = raw.split("\0") i = 0 while i < len(records): rec = records[i] - if not rec: - i += 1 - continue - tag = rec[0] - if tag == "#": - if rec.startswith("# branch.head "): - head = rec[len("# branch.head ") :] - if head == "(detached)": - detached = True - else: - branch = head - elif rec.startswith("# branch.ab "): - for tok in rec[len("# branch.ab ") :].split(): - if tok.startswith("+"): - ahead = int(tok[1:] or 0) - elif tok.startswith("-"): - behind = int(tok[1:] or 0) + tag = rec[0] if rec else "" + if tag == "?": + yield "?", "??", rec[2:] + elif tag == "u": + yield "u", rec.split(" ")[1], rec.split(" ", 10)[-1] elif tag in ("1", "2"): - fields = rec.split(" ") - xy = fields[1] + xy = rec.split(" ")[1] path = rec.split(" ", 8)[-1] if tag == "1" else rec.split(" ", 9)[-1] if tag == "2": - # Rename/copy: NUL-separated origin path follows in the next record. - i += 1 - files.append(_classify(xy, resolve_rename_path(path))) - elif tag == "u": - path = rec.split(" ", 10)[-1] - files.append({"path": path, "staged": False, "unstaged": False, "untracked": False, "conflicted": True}) - conflicted += 1 - elif tag == "?": - path = rec[2:] - files.append({"path": path, "staged": False, "unstaged": True, "untracked": True, "conflicted": False}) - untracked += 1 + i += 1 # rename/copy: the origin path is the next NUL record + yield tag, xy, resolve_rename_path(path) i += 1 - return { - "branch": branch, - "detached": detached, - "ahead": ahead, - "behind": behind, - "files": files, - "untracked": untracked, - "conflicted": conflicted, - } + +def _entry_staged(tag: str, xy: str) -> bool: + """A tracked entry whose index (staged) code is set.""" + return tag in ("1", "2") and xy[0] not in (".", "?") -def _classify(xy: str, path: str) -> dict: - x = xy[0] if xy else "." +def _classify(tag: str, xy: str, path: str) -> dict: y = xy[1] if len(xy) > 1 else "." return { "path": path, - "staged": x not in (".", "?"), - "unstaged": y not in (".", "?"), - "untracked": False, - "conflicted": x == "U" or y == "U", + "staged": _entry_staged(tag, xy), + "unstaged": tag == "?" or (tag in ("1", "2") and y not in (".", "?")), + "untracked": tag == "?", + "conflicted": tag == "u", } -def _status_letter(xy: str) -> str: - x = xy[0] if xy else "." - y = xy[1] if len(xy) > 1 else "." - code = x if x != "." else y - return (code or "M").upper() +def _status_letter(tag: str, xy: str) -> str: + if tag in ("?", "u"): + return tag.upper() if tag == "u" else "?" + code = xy[0] if xy[0] != "." else (xy[1] if len(xy) > 1 else ".") + return (code if code != "." else "M").upper() # ── coding rail ────────────────────────────────────────────────────────────── @@ -242,39 +205,50 @@ def repo_status(cwd: str) -> dict | None: """Compact working-tree status for the coding rail. None on a non-repo.""" if not _is_dir(cwd): return None - parsed = _parse_status_v2(cwd) - if parsed is None: + + code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "--branch", "-z"]) + if code != 0: return None - files = parsed["files"] + branch: str | None = None + detached = False + ahead = behind = 0 + for rec in raw.split("\0"): + if rec.startswith("# branch.head "): + head = rec[len("# branch.head ") :] + detached = head == "(detached)" + branch = None if detached else head + elif rec.startswith("# branch.ab "): + for tok in rec.split()[2:]: + if tok.startswith("+"): + ahead = int(tok[1:] or 0) + elif tok.startswith("-"): + behind = int(tok[1:] or 0) + + files = [_classify(tag, xy, path) for tag, xy, path in _walk_entries(raw)] + + # +/- vs HEAD (tracked), then fold in untracked insertions — `git diff HEAD` + # ignores them, so a new-file-only turn would otherwise read +0 (bounded scan). added = removed = 0 - summary = _numstat(cwd, ["HEAD"]) - for a, r in summary.values(): + for a, r in _numstat(cwd, ["HEAD"]).values(): added += a removed += r - - # `git diff HEAD` ignores untracked files; fold their insertions into `added` - # so a new-file-only turn registers in the rail (bounded scan). - untracked_paths = [f["path"] for f in files if f["untracked"]][:_UNTRACKED_SCAN_CAP] - for rel in untracked_paths: - added += _untracked_insertions(cwd, rel) + added += sum(_untracked_insertions(cwd, f["path"]) for f in files[:_UNTRACKED_SCAN_CAP] if f["untracked"]) return { - "branch": None if parsed["detached"] else parsed["branch"], + "branch": branch, "defaultBranch": _default_branch_name(cwd), - "detached": parsed["detached"], - "ahead": parsed["ahead"], - "behind": parsed["behind"], - "staged": sum(1 for f in files if f["staged"]), - "unstaged": sum(1 for f in files if f["unstaged"]), - "untracked": parsed["untracked"], - "conflicted": parsed["conflicted"], + "detached": detached, + "ahead": ahead, + "behind": behind, + "staged": sum(f["staged"] for f in files), + "unstaged": sum(f["unstaged"] for f in files), + "untracked": sum(f["untracked"] for f in files), + "conflicted": sum(f["conflicted"] for f in files), "changed": len(files), "added": added, "removed": removed, - "files": [ - {k: f[k] for k in ("path", "staged", "unstaged", "untracked", "conflicted")} for f in files[:200] - ], + "files": files[:200], } @@ -291,30 +265,30 @@ def review_list(cwd: str, scope: str, base_ref: str | None) -> dict: if not base: return {"files": [], "base": None} rng = f"{base}...HEAD" if scope == "branch" else base - counts = _numstat(cwd, [rng]) files = [ {"path": path, "added": a, "removed": r, "status": "M", "staged": False} - for path, (a, r) in counts.items() + for path, (a, r) in _numstat(cwd, [rng]).items() ] if scope == "lastTurn": - parsed = _parse_status_v2(cwd) - for f in parsed["files"] if parsed else []: - if f["untracked"] and not any(x["path"] == f["path"] for x in files): - files.append({"path": f["path"], "added": 0, "removed": 0, "status": "?", "staged": False}) + seen = {f["path"] for f in files} + _, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"]) + files += [ + {"path": path, "added": 0, "removed": 0, "status": "?", "staged": False} + for tag, _xy, path in _walk_entries(raw) + if tag == "?" and path not in seen + ] files.sort(key=lambda f: f["path"]) _fill_untracked_counts(cwd, files) return {"files": files, "base": base} - parsed = _parse_status_v2(cwd) - if parsed is None: + code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"]) + if code != 0: return {"files": [], "base": None} staged = _numstat(cwd, ["--cached"]) unstaged = _numstat(cwd, []) files = [] - code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"]) - for entry in _iter_status_entries(raw): - path = entry["path"] + for tag, xy, path in _walk_entries(raw): sa, sr = staged.get(path, (0, 0)) ua, ur = unstaged.get(path, (0, 0)) files.append( @@ -322,8 +296,8 @@ def review_list(cwd: str, scope: str, base_ref: str | None) -> dict: "path": path, "added": sa + ua, "removed": sr + ur, - "status": entry["letter"], - "staged": entry["staged"], + "status": _status_letter(tag, xy), + "staged": _entry_staged(tag, xy), } ) files.sort(key=lambda f: f["path"]) @@ -331,32 +305,6 @@ def review_list(cwd: str, scope: str, base_ref: str | None) -> dict: return {"files": files, "base": None} -def _iter_status_entries(raw: str): - """Yield {path, letter, staged} from porcelain v2 -z output (for review_list).""" - records = raw.split("\0") - i = 0 - while i < len(records): - rec = records[i] - if not rec: - i += 1 - continue - tag = rec[0] - if tag in ("1", "2"): - xy = rec.split(" ")[1] - path = rec.split(" ", 8)[-1] if tag == "1" else rec.split(" ", 9)[-1] - if tag == "2": - i += 1 - path = resolve_rename_path(path) - x = xy[0] if xy else "." - yield {"path": path, "letter": _status_letter(xy), "staged": x not in (".", "?")} - elif tag == "u": - path = rec.split(" ", 10)[-1] - yield {"path": path, "letter": "U", "staged": False} - elif tag == "?": - yield {"path": rec[2:], "letter": "?", "staged": False} - i += 1 - - def review_diff(cwd: str, file_path: str, scope: str, base_ref: str | None, staged: bool) -> str: if not _is_dir(cwd): return "" @@ -415,8 +363,8 @@ def review_rev_parse(cwd: str, ref: str | None) -> str | None: def review_commit(cwd: str, message: str, push: bool) -> dict: """Commit the working tree; stage everything first when nothing is staged.""" - parsed = _parse_status_v2(cwd) - if not parsed or not any(f["staged"] for f in parsed["files"]): + _, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"]) + if not any(_entry_staged(tag, xy) for tag, xy, _ in _walk_entries(raw)): _git_ok(cwd, ["add", "-A"]) _git_ok(cwd, ["commit", "-m", message]) if push: @@ -443,27 +391,26 @@ def review_commit_context(cwd: str) -> dict: """Diff of what WILL commit + recent subjects, for drafting a commit message.""" if not _is_dir(cwd): return {"diff": "", "recent": ""} - parsed = _parse_status_v2(cwd) - if parsed is None: + code, raw, _ = _git(cwd, ["status", "--porcelain=v2", "-z"]) + if code != 0: return {"diff": "", "recent": ""} + entries = list(_walk_entries(raw)) - has_staged = any(f["staged"] for f in parsed["files"]) + has_staged = any(_entry_staged(tag, xy) for tag, xy, _ in entries) diff = _git_out(cwd, ["diff", "--cached"]) if has_staged else _git_out(cwd, ["diff", "HEAD"]) if len(diff) > _COMMIT_CONTEXT_DIFF_MAX_CHARS: omitted = len(diff) - _COMMIT_CONTEXT_DIFF_MAX_CHARS diff = f"{diff[:_COMMIT_CONTEXT_DIFF_MAX_CHARS]}\n# diff truncated: {omitted} chars omitted\n" - untracked = [f["path"] for f in parsed["files"] if f["untracked"]] + untracked = [path for tag, _xy, path in entries if tag == "?"] if untracked: visible = untracked[:_COMMIT_CONTEXT_UNTRACKED_MAX] - omitted = len(untracked) - len(visible) - note = "\n# New (untracked) files:\n" + "\n".join(f"# {p}" for p in visible) + "\n" - if omitted > 0: - note += f"# ... {omitted} more omitted\n" + note = "\n# New (untracked) files:\n" + "".join(f"# {p}\n" for p in visible) + if len(untracked) > len(visible): + note += f"# ... {len(untracked) - len(visible)} more omitted\n" diff = f"{diff}{note}" if diff else note - recent = _git_out(cwd, ["log", "-n", "10", "--pretty=format:%s"]).strip() - return {"diff": diff or "", "recent": recent} + return {"diff": diff or "", "recent": _git_out(cwd, ["log", "-n", "10", "--pretty=format:%s"]).strip()} # ── ship flow (gh) ─────────────────────────────────────────────────────────── @@ -563,8 +510,6 @@ def _main_root(cwd: str) -> str: def _sanitize_branch(name: str) -> str: - import re - value = str(name or "") value = re.sub(r"\s+", "-", value) value = re.sub(r"[^\w./-]", "", value) @@ -575,8 +520,6 @@ def _sanitize_branch(name: str) -> str: def _slugify(name: str) -> str: - import re - slug = re.sub(r"[^a-z0-9]+", "-", str(name or "").strip().lower()) slug = re.sub(r"^-+|-+$", "", slug)[:40].rstrip("-") return slug or "work"