This commit is contained in:
玉冰 2026-04-25 07:58:45 +08:00 committed by GitHub
commit e01d3ccc78
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 481 additions and 0 deletions

217
code_stats.py Normal file
View file

@ -0,0 +1,217 @@
#!/usr/bin/env python3
"""代码统计分析工具 - 按目录/类型分组,区分代码行/注释行/空行ASCII柱状图"""
import os
import sys
from pathlib import Path
from collections import defaultdict
# 文件类型 → (单行注释, 多行开始, 多行结束)
LANG_MAP = {
".py": ("#", '"""', '"""'),
".js": ("//", "/*", "*/"),
".ts": ("//", "/*", "*/"),
".tsx": ("//", "/*", "*/"),
".jsx": ("//", "/*", "*/"),
".java": ("//", "/*", "*/"),
".c": ("//", "/*", "*/"),
".cpp": ("//", "/*", "*/"),
".h": ("//", "/*", "*/"),
".go": ("//", "/*", "*/"),
".rs": ("//", "/*", "*/"),
".rb": ("#", "=begin", "=end"),
".sh": ("#", None, None),
".bash": ("#", None, None),
".yml": ("#", None, None),
".yaml": ("#", None, None),
".toml": ("#", None, None),
".sql": ("--", "/*", "*/"),
".html": (None, "<!--", "-->"),
".css": (None, "/*", "*/"),
".vue": ("//", "/*", "*/"),
".swift": ("//", "/*", "*/"),
".kt": ("//", "/*", "*/"),
".lua": ("--", "--[[", "]]"),
".r": ("#", None, None),
".php": ("//", "/*", "*/"),
}
SKIP_DIRS = {
".git", ".svn", ".hg", "node_modules", "__pycache__",
".venv", "venv", "env", ".env", ".tox", "dist", "build",
".mypy_cache", ".pytest_cache", ".eggs", "target", "vendor",
".next", ".nuxt", "coverage",
}
def should_skip(path: Path) -> bool:
return any(part in SKIP_DIRS for part in path.parts)
def analyze_file(filepath: Path) -> dict:
ext = filepath.suffix.lower()
if ext not in LANG_MAP:
return None
try:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
except (OSError, PermissionError):
return None
line_comment, block_start, block_end = LANG_MAP[ext]
total = len(lines)
blank = 0
comment = 0
in_block = False
for line in lines:
stripped = line.strip()
if not stripped:
blank += 1
continue
if in_block:
comment += 1
if block_end and block_end in stripped:
in_block = False
continue
if block_start and stripped.startswith(block_start):
comment += 1
if block_end and block_end not in stripped[len(block_start):]:
in_block = True
continue
if line_comment and stripped.startswith(line_comment):
comment += 1
return {"ext": ext, "total": total, "code": total - blank - comment, "comment": comment, "blank": blank}
def bar(value, max_val, width=25):
if max_val == 0:
return ""
return "" * int(width * value / max_val) + "" * (width - int(width * value / max_val))
def stacked_bar(code, comment, blank, total, width=30):
"""三段式柱状图:█代码 ▓注释 ░空行"""
if total == 0:
return "" * width
cw = int(width * code / total)
mw = int(width * comment / total)
bw = width - cw - mw
return "" * cw + "" * mw + "" * bw
def fmt(n):
"""数字格式化"""
return f"{n:,}"
def print_table(title, rows, headers, col_widths, grand_row=None):
"""通用表格打印"""
print(f"\n📁 {title}")
header_line = "".join(h.rjust(w) for h, w in zip(headers, col_widths))
print(header_line)
print("" * len(header_line))
for row in rows:
print("".join(str(v).rjust(w) for v, w in zip(row, col_widths)))
if grand_row:
print("" * len(header_line))
print("".join(str(v).rjust(w) for v, w in zip(grand_row, col_widths)))
def main():
target = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".")
if not target.is_dir():
print(f"❌ 不是有效目录: {target}")
sys.exit(1)
# 收集所有文件的分析结果
file_results = []
for filepath in target.rglob("*"):
if not filepath.is_file() or should_skip(filepath):
continue
result = analyze_file(filepath)
if result:
rel = filepath.relative_to(target)
file_results.append({"path": rel, **result})
if not file_results:
print("未找到可分析的代码文件。")
return
# 汇总
grand = {"files": len(file_results), "total": 0, "code": 0, "comment": 0, "blank": 0}
for r in file_results:
for k in ("total", "code", "comment", "blank"):
grand[k] += r[k]
comment_rate = grand["comment"] / max(grand["code"] + grand["comment"], 1) * 100
print(f"\n📊 代码统计 — {target.resolve()}")
print(f" {grand['files']} 个文件 | {fmt(grand['total'])} 行 (代码 {fmt(grand['code'])} / 注释 {fmt(grand['comment'])} / 空行 {fmt(grand['blank'])})")
print(f" 代码占比 {grand['code']/max(grand['total'],1)*100:.1f}% | 注释率 {comment_rate:.1f}%")
# ── 按目录分组 ──
dir_stats = defaultdict(lambda: {"files": 0, "total": 0, "code": 0, "comment": 0, "blank": 0})
for r in file_results:
top_dir = r["path"].parts[0] if len(r["path"].parts) > 1 else "[root]"
bucket = dir_stats[top_dir]
bucket["files"] += 1
for k in ("total", "code", "comment", "blank"):
bucket[k] += r[k]
headers = ["目录", "文件数", "代码行", "注释行", "空行", "注释率"]
col_widths = [16, 8, 10, 10, 10, 9]
rows = []
for d, b in sorted(dir_stats.items(), key=lambda x: x[1]["code"], reverse=True):
cr = b["comment"] / max(b["code"] + b["comment"], 1) * 100
rows.append([d, str(b["files"]), fmt(b["code"]), fmt(b["comment"]), fmt(b["blank"]), f"{cr:.1f}%"])
print_table("按目录分组", rows, headers, col_widths,
grand_row=["合计", str(grand["files"]), fmt(grand["code"]), fmt(grand["comment"]), fmt(grand["blank"]), f"{comment_rate:.1f}%"])
# 按目录柱状图
max_code = max(b["code"] for b in dir_stats.values())
print(f"\n📊 目录代码量分布")
for d, b in sorted(dir_stats.items(), key=lambda x: x[1]["code"], reverse=True):
pct = b["code"] / max(grand["code"], 1) * 100
print(f" {d:<14} {bar(b['code'], max_code, 30)} {pct:5.1f}%")
# ── 按文件类型 ──
ext_stats = defaultdict(lambda: {"files": 0, "total": 0, "code": 0, "comment": 0, "blank": 0})
for r in file_results:
bucket = ext_stats[r["ext"]]
bucket["files"] += 1
for k in ("total", "code", "comment", "blank"):
bucket[k] += r[k]
headers2 = ["类型", "文件数", "代码行", "注释行", "空行", "注释率"]
col_widths2 = [10, 8, 10, 10, 10, 9]
rows2 = []
for ext, b in sorted(ext_stats.items(), key=lambda x: x[1]["code"], reverse=True):
cr = b["comment"] / max(b["code"] + b["comment"], 1) * 100
rows2.append([ext, str(b["files"]), fmt(b["code"]), fmt(b["comment"]), fmt(b["blank"]), f"{cr:.1f}%"])
print_table("按文件类型", rows2, headers2, col_widths2,
grand_row=["合计", str(grand["files"]), fmt(grand["code"]), fmt(grand["comment"]), fmt(grand["blank"]), f"{comment_rate:.1f}%"])
# 按类型柱状图
max_code_ext = max(b["code"] for b in ext_stats.values())
print(f"\n📊 类型代码量分布")
for ext, b in sorted(ext_stats.items(), key=lambda x: x[1]["code"], reverse=True):
pct = b["code"] / max(grand["code"], 1) * 100
print(f" {ext:<8} {bar(b['code'], max_code_ext, 30)} {pct:5.1f}%")
# ── 综合堆叠柱状图(按目录) ──
print(f"\n📊 目录代码结构(█代码 ▓注释 ░空行)")
for d, b in sorted(dir_stats.items(), key=lambda x: x[1]["code"], reverse=True):
print(f" {d:<14} {stacked_bar(b['code'], b['comment'], b['blank'], b['total'], 40)}")
print()
if __name__ == "__main__":
main()

85
count_lines.py Normal file
View file

@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""统计当前目录下各类型文件的代码行数"""
import os
from pathlib import Path
from collections import defaultdict
# 常见代码文件扩展名
CODE_EXTENSIONS = {
".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".c", ".cpp", ".h", ".hpp",
".go", ".rs", ".rb", ".php", ".swift", ".kt", ".scala", ".sh", ".bash",
".sql", ".html", ".css", ".scss", ".less", ".vue", ".svelte",
".yaml", ".yml", ".json", ".xml", ".toml", ".ini", ".cfg",
".md", ".rst", ".txt",
}
# 跳过的目录
SKIP_DIRS = {
"node_modules", ".git", "__pycache__", ".venv", "venv", "env",
".tox", ".mypy_cache", ".pytest_cache", "dist", "build", ".eggs",
".idea", ".vscode", "target", "vendor", ".next", ".nuxt",
}
def count_lines(path: Path) -> tuple[int, int, int]:
"""返回 (总行数, 非空行数, 注释行数)"""
total = blank = comment = 0
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
total += 1
stripped = line.strip()
if not stripped:
blank += 1
elif stripped.startswith("#") or stripped.startswith("//"):
comment += 1
except (PermissionError, OSError):
pass
return total, total - blank, comment
def main():
base = Path.cwd()
stats = defaultdict(lambda: {"files": 0, "total": 0, "code": 0, "comment": 0})
for root, dirs, files in os.walk(base):
# 原地删除跳过目录
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")]
for fname in files:
fpath = Path(root) / fname
ext = fpath.suffix.lower()
if ext in CODE_EXTENSIONS:
total, code, comment = count_lines(fpath)
if total > 0:
lang = ext.lstrip(".")
stats[lang]["files"] += 1
stats[lang]["total"] += total
stats[lang]["code"] += code
stats[lang]["comment"] += comment
if not stats:
print("未找到代码文件")
return
# 按代码行数降序排列
sorted_stats = sorted(stats.items(), key=lambda x: x[1]["code"], reverse=True)
# 表头
print(f"\n{'语言':<12} {'文件数':>6} {'总行数':>8} {'代码行':>8} {'注释行':>8}")
print("-" * 50)
sum_files = sum_total = sum_code = sum_comment = 0
for lang, s in sorted_stats:
print(f".{lang:<11} {s['files']:>6} {s['total']:>8} {s['code']:>8} {s['comment']:>8}")
sum_files += s["files"]
sum_total += s["total"]
sum_code += s["code"]
sum_comment += s["comment"]
print("-" * 50)
print(f"{'合计':<12} {sum_files:>6} {sum_total:>8} {sum_code:>8} {sum_comment:>8}\n")
if __name__ == "__main__":
main()

179
scripts/claude_acp_bridge.py Executable file
View file

@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""ACP JSON-RPC bridge for Claude Code CLI.
Translates Hermes CopilotACPClient's ACP JSON-RPC protocol into
`claude -p` CLI calls. This lets Hermes delegate tasks to Claude Code
without modifying any upstream Hermes source code.
Usage (via HERMES_COPILOT_ACP_COMMAND env var):
HERMES_COPILOT_ACP_COMMAND=python3 scripts/claude_acp_bridge.py
Protocol flow:
1. Client sends "initialize" respond with server info
2. Client sends "session/new" respond with sessionId
3. Client sends "session/prompt" run `claude -p`, stream back result
"""
import json
import os
import subprocess
import sys
import uuid
def _read_request():
"""Read one JSON-RPC request from stdin (line-delimited)."""
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
return json.loads(line)
except json.JSONDecodeError:
continue
return None
def _write_msg(msg):
"""Write a JSON-RPC message to stdout."""
sys.stdout.write(json.dumps(msg, ensure_ascii=False) + "\n")
sys.stdout.flush()
def _send_notification(method, params):
"""Send a JSON-RPC notification (no id)."""
_write_msg({"jsonrpc": "2.0", "method": method, "params": params})
def _send_response(request_id, result):
"""Send a JSON-RPC success response."""
_write_msg({"jsonrpc": "2.0", "id": request_id, "result": result})
def _send_error(request_id, code, message):
"""Send a JSON-RPC error response."""
_write_msg({
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": code, "message": message},
})
def _handle_initialize(request_id, _params):
_send_response(request_id, {
"protocolVersion": 1,
"serverCapabilities": {},
"serverInfo": {
"name": "claude-code-bridge",
"title": "Claude Code Bridge",
"version": "0.1.0",
},
})
def _handle_session_new(request_id, params):
session_id = str(uuid.uuid4())
cwd = params.get("cwd", os.getcwd())
_send_response(request_id, {"sessionId": session_id, "cwd": cwd})
def _handle_session_prompt(request_id, params):
# Extract prompt text from ACP format
prompt_parts = params.get("prompt", [])
prompt_text = ""
for part in prompt_parts:
if isinstance(part, dict) and part.get("type") == "text":
prompt_text += part.get("text", "")
if not prompt_text:
_send_error(request_id, -32602, "Empty prompt")
return
# Build claude CLI command
cmd = ["claude", "-p", prompt_text, "--output-format", "json", "--dangerously-skip-permissions"]
# Add model if specified via env var
model = os.getenv("CLAUDE_CODE_BRIDGE_MODEL", "").strip()
if model:
cmd.extend(["--model", model])
# Use cwd from session if available
cwd = params.get("cwd") or os.getcwd()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=900,
cwd=cwd if os.path.isdir(cwd) else None,
)
if result.returncode != 0:
error_msg = result.stderr.strip() or f"claude exited with code {result.returncode}"
_send_notification("session/update", {
"update": {
"sessionUpdate": "agent_message_chunk",
"content": {"text": f"Error from Claude Code: {error_msg}"},
}
})
_send_response(request_id, {"status": "error"})
return
response_data = json.loads(result.stdout)
response_text = response_data.get("result", "")
except subprocess.TimeoutExpired:
response_text = "Error: Claude Code timed out (900s limit)"
except json.JSONDecodeError:
response_text = result.stdout if result.stdout else "Error: Could not parse Claude Code output"
except FileNotFoundError:
response_text = "Error: 'claude' command not found. Install Claude Code CLI first."
except Exception as exc:
response_text = f"Error: {exc}"
# Send response text as a single agent_message_chunk notification
_send_notification("session/update", {
"update": {
"sessionUpdate": "agent_message_chunk",
"content": {"text": response_text},
}
})
# Send completion response
_send_response(request_id, {"status": "completed"})
_HANDLERS = {
"initialize": _handle_initialize,
"session/new": _handle_session_new,
"session/prompt": _handle_session_prompt,
}
def main():
# Log stderr so stdout stays clean for JSON-RPC
import logging
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] claude-acp-bridge: %(message)s",
)
while True:
request = _read_request()
if request is None:
break
request_id = request.get("id")
method = request.get("method", "")
params = request.get("params", {})
handler = _HANDLERS.get(method)
if handler:
handler(request_id, params)
else:
_send_error(request_id, -32601, f"Method not found: {method}")
if __name__ == "__main__":
main()