diff --git a/code_stats.py b/code_stats.py new file mode 100644 index 0000000000..c6567a111b --- /dev/null +++ b/code_stats.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python3 +"""代码统计分析工具 - 按目录/类型分组,区分代码行/注释行/空行,ASCII柱状图""" + +import os +import sys +from pathlib import Path +from collections import defaultdict + +# 文件类型 → (单行注释, 多行开始, 多行结束) +LANG_MAP = { + ".py": ("#", '"""', '"""'), + ".js": ("//", "/*", "*/"), + ".ts": ("//", "/*", "*/"), + ".tsx": ("//", "/*", "*/"), + ".jsx": ("//", "/*", "*/"), + ".java": ("//", "/*", "*/"), + ".c": ("//", "/*", "*/"), + ".cpp": ("//", "/*", "*/"), + ".h": ("//", "/*", "*/"), + ".go": ("//", "/*", "*/"), + ".rs": ("//", "/*", "*/"), + ".rb": ("#", "=begin", "=end"), + ".sh": ("#", None, None), + ".bash": ("#", None, None), + ".yml": ("#", None, None), + ".yaml": ("#", None, None), + ".toml": ("#", None, None), + ".sql": ("--", "/*", "*/"), + ".html": (None, ""), + ".css": (None, "/*", "*/"), + ".vue": ("//", "/*", "*/"), + ".swift": ("//", "/*", "*/"), + ".kt": ("//", "/*", "*/"), + ".lua": ("--", "--[[", "]]"), + ".r": ("#", None, None), + ".php": ("//", "/*", "*/"), +} + +SKIP_DIRS = { + ".git", ".svn", ".hg", "node_modules", "__pycache__", + ".venv", "venv", "env", ".env", ".tox", "dist", "build", + ".mypy_cache", ".pytest_cache", ".eggs", "target", "vendor", + ".next", ".nuxt", "coverage", +} + + +def should_skip(path: Path) -> bool: + return any(part in SKIP_DIRS for part in path.parts) + + +def analyze_file(filepath: Path) -> dict: + ext = filepath.suffix.lower() + if ext not in LANG_MAP: + return None + + try: + with open(filepath, "r", encoding="utf-8", errors="ignore") as f: + lines = f.readlines() + except (OSError, PermissionError): + return None + + line_comment, block_start, block_end = LANG_MAP[ext] + total = len(lines) + blank = 0 + comment = 0 + in_block = False + + for line in lines: + stripped = line.strip() + if not stripped: + blank += 1 + continue + if in_block: + comment += 1 + if block_end and block_end in stripped: + in_block = False + continue + if block_start and stripped.startswith(block_start): + comment += 1 + if block_end and block_end not in stripped[len(block_start):]: + in_block = True + continue + if line_comment and stripped.startswith(line_comment): + comment += 1 + + return {"ext": ext, "total": total, "code": total - blank - comment, "comment": comment, "blank": blank} + + +def bar(value, max_val, width=25): + if max_val == 0: + return "" + return "█" * int(width * value / max_val) + "░" * (width - int(width * value / max_val)) + + +def stacked_bar(code, comment, blank, total, width=30): + """三段式柱状图:█代码 ▓注释 ░空行""" + if total == 0: + return "░" * width + cw = int(width * code / total) + mw = int(width * comment / total) + bw = width - cw - mw + return "█" * cw + "▓" * mw + "░" * bw + + +def fmt(n): + """数字格式化""" + return f"{n:,}" + + +def print_table(title, rows, headers, col_widths, grand_row=None): + """通用表格打印""" + print(f"\n📁 {title}") + header_line = "".join(h.rjust(w) for h, w in zip(headers, col_widths)) + print(header_line) + print("─" * len(header_line)) + + for row in rows: + print("".join(str(v).rjust(w) for v, w in zip(row, col_widths))) + + if grand_row: + print("─" * len(header_line)) + print("".join(str(v).rjust(w) for v, w in zip(grand_row, col_widths))) + + +def main(): + target = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".") + if not target.is_dir(): + print(f"❌ 不是有效目录: {target}") + sys.exit(1) + + # 收集所有文件的分析结果 + file_results = [] + for filepath in target.rglob("*"): + if not filepath.is_file() or should_skip(filepath): + continue + result = analyze_file(filepath) + if result: + rel = filepath.relative_to(target) + file_results.append({"path": rel, **result}) + + if not file_results: + print("未找到可分析的代码文件。") + return + + # 汇总 + grand = {"files": len(file_results), "total": 0, "code": 0, "comment": 0, "blank": 0} + for r in file_results: + for k in ("total", "code", "comment", "blank"): + grand[k] += r[k] + + comment_rate = grand["comment"] / max(grand["code"] + grand["comment"], 1) * 100 + + print(f"\n📊 代码统计 — {target.resolve()}") + print(f" {grand['files']} 个文件 | {fmt(grand['total'])} 行 (代码 {fmt(grand['code'])} / 注释 {fmt(grand['comment'])} / 空行 {fmt(grand['blank'])})") + print(f" 代码占比 {grand['code']/max(grand['total'],1)*100:.1f}% | 注释率 {comment_rate:.1f}%") + + # ── 按目录分组 ── + dir_stats = defaultdict(lambda: {"files": 0, "total": 0, "code": 0, "comment": 0, "blank": 0}) + for r in file_results: + top_dir = r["path"].parts[0] if len(r["path"].parts) > 1 else "[root]" + bucket = dir_stats[top_dir] + bucket["files"] += 1 + for k in ("total", "code", "comment", "blank"): + bucket[k] += r[k] + + headers = ["目录", "文件数", "代码行", "注释行", "空行", "注释率"] + col_widths = [16, 8, 10, 10, 10, 9] + rows = [] + for d, b in sorted(dir_stats.items(), key=lambda x: x[1]["code"], reverse=True): + cr = b["comment"] / max(b["code"] + b["comment"], 1) * 100 + rows.append([d, str(b["files"]), fmt(b["code"]), fmt(b["comment"]), fmt(b["blank"]), f"{cr:.1f}%"]) + + print_table("按目录分组", rows, headers, col_widths, + grand_row=["合计", str(grand["files"]), fmt(grand["code"]), fmt(grand["comment"]), fmt(grand["blank"]), f"{comment_rate:.1f}%"]) + + # 按目录柱状图 + max_code = max(b["code"] for b in dir_stats.values()) + print(f"\n📊 目录代码量分布") + for d, b in sorted(dir_stats.items(), key=lambda x: x[1]["code"], reverse=True): + pct = b["code"] / max(grand["code"], 1) * 100 + print(f" {d:<14} {bar(b['code'], max_code, 30)} {pct:5.1f}%") + + # ── 按文件类型 ── + ext_stats = defaultdict(lambda: {"files": 0, "total": 0, "code": 0, "comment": 0, "blank": 0}) + for r in file_results: + bucket = ext_stats[r["ext"]] + bucket["files"] += 1 + for k in ("total", "code", "comment", "blank"): + bucket[k] += r[k] + + headers2 = ["类型", "文件数", "代码行", "注释行", "空行", "注释率"] + col_widths2 = [10, 8, 10, 10, 10, 9] + rows2 = [] + for ext, b in sorted(ext_stats.items(), key=lambda x: x[1]["code"], reverse=True): + cr = b["comment"] / max(b["code"] + b["comment"], 1) * 100 + rows2.append([ext, str(b["files"]), fmt(b["code"]), fmt(b["comment"]), fmt(b["blank"]), f"{cr:.1f}%"]) + + print_table("按文件类型", rows2, headers2, col_widths2, + grand_row=["合计", str(grand["files"]), fmt(grand["code"]), fmt(grand["comment"]), fmt(grand["blank"]), f"{comment_rate:.1f}%"]) + + # 按类型柱状图 + max_code_ext = max(b["code"] for b in ext_stats.values()) + print(f"\n📊 类型代码量分布") + for ext, b in sorted(ext_stats.items(), key=lambda x: x[1]["code"], reverse=True): + pct = b["code"] / max(grand["code"], 1) * 100 + print(f" {ext:<8} {bar(b['code'], max_code_ext, 30)} {pct:5.1f}%") + + # ── 综合堆叠柱状图(按目录) ── + print(f"\n📊 目录代码结构(█代码 ▓注释 ░空行)") + for d, b in sorted(dir_stats.items(), key=lambda x: x[1]["code"], reverse=True): + print(f" {d:<14} {stacked_bar(b['code'], b['comment'], b['blank'], b['total'], 40)}") + + print() + + +if __name__ == "__main__": + main() diff --git a/count_lines.py b/count_lines.py new file mode 100644 index 0000000000..ec14b707bf --- /dev/null +++ b/count_lines.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""统计当前目录下各类型文件的代码行数""" + +import os +from pathlib import Path +from collections import defaultdict + +# 常见代码文件扩展名 +CODE_EXTENSIONS = { + ".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".c", ".cpp", ".h", ".hpp", + ".go", ".rs", ".rb", ".php", ".swift", ".kt", ".scala", ".sh", ".bash", + ".sql", ".html", ".css", ".scss", ".less", ".vue", ".svelte", + ".yaml", ".yml", ".json", ".xml", ".toml", ".ini", ".cfg", + ".md", ".rst", ".txt", +} + +# 跳过的目录 +SKIP_DIRS = { + "node_modules", ".git", "__pycache__", ".venv", "venv", "env", + ".tox", ".mypy_cache", ".pytest_cache", "dist", "build", ".eggs", + ".idea", ".vscode", "target", "vendor", ".next", ".nuxt", +} + + +def count_lines(path: Path) -> tuple[int, int, int]: + """返回 (总行数, 非空行数, 注释行数)""" + total = blank = comment = 0 + try: + with open(path, "r", encoding="utf-8", errors="ignore") as f: + for line in f: + total += 1 + stripped = line.strip() + if not stripped: + blank += 1 + elif stripped.startswith("#") or stripped.startswith("//"): + comment += 1 + except (PermissionError, OSError): + pass + return total, total - blank, comment + + +def main(): + base = Path.cwd() + stats = defaultdict(lambda: {"files": 0, "total": 0, "code": 0, "comment": 0}) + + for root, dirs, files in os.walk(base): + # 原地删除跳过目录 + dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")] + for fname in files: + fpath = Path(root) / fname + ext = fpath.suffix.lower() + if ext in CODE_EXTENSIONS: + total, code, comment = count_lines(fpath) + if total > 0: + lang = ext.lstrip(".") + stats[lang]["files"] += 1 + stats[lang]["total"] += total + stats[lang]["code"] += code + stats[lang]["comment"] += comment + + if not stats: + print("未找到代码文件") + return + + # 按代码行数降序排列 + sorted_stats = sorted(stats.items(), key=lambda x: x[1]["code"], reverse=True) + + # 表头 + print(f"\n{'语言':<12} {'文件数':>6} {'总行数':>8} {'代码行':>8} {'注释行':>8}") + print("-" * 50) + + sum_files = sum_total = sum_code = sum_comment = 0 + for lang, s in sorted_stats: + print(f".{lang:<11} {s['files']:>6} {s['total']:>8} {s['code']:>8} {s['comment']:>8}") + sum_files += s["files"] + sum_total += s["total"] + sum_code += s["code"] + sum_comment += s["comment"] + + print("-" * 50) + print(f"{'合计':<12} {sum_files:>6} {sum_total:>8} {sum_code:>8} {sum_comment:>8}\n") + + +if __name__ == "__main__": + main() diff --git a/scripts/claude_acp_bridge.py b/scripts/claude_acp_bridge.py new file mode 100755 index 0000000000..3d1d761ca4 --- /dev/null +++ b/scripts/claude_acp_bridge.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +"""ACP JSON-RPC bridge for Claude Code CLI. + +Translates Hermes CopilotACPClient's ACP JSON-RPC protocol into +`claude -p` CLI calls. This lets Hermes delegate tasks to Claude Code +without modifying any upstream Hermes source code. + +Usage (via HERMES_COPILOT_ACP_COMMAND env var): + HERMES_COPILOT_ACP_COMMAND=python3 scripts/claude_acp_bridge.py + +Protocol flow: + 1. Client sends "initialize" → respond with server info + 2. Client sends "session/new" → respond with sessionId + 3. Client sends "session/prompt" → run `claude -p`, stream back result +""" + +import json +import os +import subprocess +import sys +import uuid + + +def _read_request(): + """Read one JSON-RPC request from stdin (line-delimited).""" + for line in sys.stdin: + line = line.strip() + if not line: + continue + try: + return json.loads(line) + except json.JSONDecodeError: + continue + return None + + +def _write_msg(msg): + """Write a JSON-RPC message to stdout.""" + sys.stdout.write(json.dumps(msg, ensure_ascii=False) + "\n") + sys.stdout.flush() + + +def _send_notification(method, params): + """Send a JSON-RPC notification (no id).""" + _write_msg({"jsonrpc": "2.0", "method": method, "params": params}) + + +def _send_response(request_id, result): + """Send a JSON-RPC success response.""" + _write_msg({"jsonrpc": "2.0", "id": request_id, "result": result}) + + +def _send_error(request_id, code, message): + """Send a JSON-RPC error response.""" + _write_msg({ + "jsonrpc": "2.0", + "id": request_id, + "error": {"code": code, "message": message}, + }) + + +def _handle_initialize(request_id, _params): + _send_response(request_id, { + "protocolVersion": 1, + "serverCapabilities": {}, + "serverInfo": { + "name": "claude-code-bridge", + "title": "Claude Code Bridge", + "version": "0.1.0", + }, + }) + + +def _handle_session_new(request_id, params): + session_id = str(uuid.uuid4()) + cwd = params.get("cwd", os.getcwd()) + _send_response(request_id, {"sessionId": session_id, "cwd": cwd}) + + +def _handle_session_prompt(request_id, params): + # Extract prompt text from ACP format + prompt_parts = params.get("prompt", []) + prompt_text = "" + for part in prompt_parts: + if isinstance(part, dict) and part.get("type") == "text": + prompt_text += part.get("text", "") + + if not prompt_text: + _send_error(request_id, -32602, "Empty prompt") + return + + # Build claude CLI command + cmd = ["claude", "-p", prompt_text, "--output-format", "json", "--dangerously-skip-permissions"] + + # Add model if specified via env var + model = os.getenv("CLAUDE_CODE_BRIDGE_MODEL", "").strip() + if model: + cmd.extend(["--model", model]) + + # Use cwd from session if available + cwd = params.get("cwd") or os.getcwd() + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=900, + cwd=cwd if os.path.isdir(cwd) else None, + ) + if result.returncode != 0: + error_msg = result.stderr.strip() or f"claude exited with code {result.returncode}" + _send_notification("session/update", { + "update": { + "sessionUpdate": "agent_message_chunk", + "content": {"text": f"Error from Claude Code: {error_msg}"}, + } + }) + _send_response(request_id, {"status": "error"}) + return + + response_data = json.loads(result.stdout) + response_text = response_data.get("result", "") + + except subprocess.TimeoutExpired: + response_text = "Error: Claude Code timed out (900s limit)" + except json.JSONDecodeError: + response_text = result.stdout if result.stdout else "Error: Could not parse Claude Code output" + except FileNotFoundError: + response_text = "Error: 'claude' command not found. Install Claude Code CLI first." + except Exception as exc: + response_text = f"Error: {exc}" + + # Send response text as a single agent_message_chunk notification + _send_notification("session/update", { + "update": { + "sessionUpdate": "agent_message_chunk", + "content": {"text": response_text}, + } + }) + + # Send completion response + _send_response(request_id, {"status": "completed"}) + + +_HANDLERS = { + "initialize": _handle_initialize, + "session/new": _handle_session_new, + "session/prompt": _handle_session_prompt, +} + + +def main(): + # Log stderr so stdout stays clean for JSON-RPC + import logging + logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + format="%(asctime)s [%(levelname)s] claude-acp-bridge: %(message)s", + ) + + while True: + request = _read_request() + if request is None: + break + + request_id = request.get("id") + method = request.get("method", "") + params = request.get("params", {}) + + handler = _HANDLERS.get(method) + if handler: + handler(request_id, params) + else: + _send_error(request_id, -32601, f"Method not found: {method}") + + +if __name__ == "__main__": + main()