feat: devex help, add Makefile, ruff, pre-commit, and modernize CI

This commit is contained in:
Brooklyn Nicholson 2026-03-09 20:36:51 -05:00
parent 172a38c344
commit f4d7e6a29e
111 changed files with 11655 additions and 10200 deletions

View file

@ -34,7 +34,7 @@ import logging
import os
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Tuple
from typing import Any
from openai import OpenAI
@ -43,7 +43,7 @@ from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
# Default auxiliary models for direct API-key providers (cheap/fast for side tasks)
_API_KEY_PROVIDER_AUX_MODELS: Dict[str, str] = {
_API_KEY_PROVIDER_AUX_MODELS: dict[str, str] = {
"zai": "glm-4.5-flash",
"kimi-coding": "kimi-k2-turbo-preview",
"minimax": "MiniMax-M2.5-highspeed",
@ -102,7 +102,7 @@ def _convert_content_for_responses(content: Any) -> Any:
if not isinstance(content, list):
return str(content) if content else ""
converted: List[Dict[str, Any]] = []
converted: list[dict[str, Any]] = []
for part in content:
if not isinstance(part, dict):
continue
@ -113,7 +113,7 @@ def _convert_content_for_responses(content: Any) -> Any:
# chat.completions nests the URL: {"image_url": {"url": "..."}}
image_data = part.get("image_url", {})
url = image_data.get("url", "") if isinstance(image_data, dict) else str(image_data)
entry: Dict[str, Any] = {"type": "input_image", "image_url": url}
entry: dict[str, Any] = {"type": "input_image", "image_url": url}
# Preserve detail if specified
detail = image_data.get("detail") if isinstance(image_data, dict) else None
if detail:
@ -148,19 +148,21 @@ class _CodexCompletionsAdapter:
# Convert chat.completions multimodal content blocks to Responses
# API format (input_text / input_image instead of text / image_url).
instructions = "You are a helpful assistant."
input_msgs: List[Dict[str, Any]] = []
input_msgs: list[dict[str, Any]] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content") or ""
if role == "system":
instructions = content if isinstance(content, str) else str(content)
else:
input_msgs.append({
"role": role,
"content": _convert_content_for_responses(content),
})
input_msgs.append(
{
"role": role,
"content": _convert_content_for_responses(content),
}
)
resp_kwargs: Dict[str, Any] = {
resp_kwargs: dict[str, Any] = {
"model": model,
"instructions": instructions,
"input": input_msgs or [{"role": "user", "content": ""}],
@ -179,18 +181,20 @@ class _CodexCompletionsAdapter:
name = fn.get("name")
if not name:
continue
converted.append({
"type": "function",
"name": name,
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
})
converted.append(
{
"type": "function",
"name": name,
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}
)
if converted:
resp_kwargs["tools"] = converted
# Stream and collect the response
text_parts: List[str] = []
tool_calls_raw: List[Any] = []
text_parts: list[str] = []
tool_calls_raw: list[Any] = []
usage = None
try:
@ -208,14 +212,16 @@ class _CodexCompletionsAdapter:
if ptype in ("output_text", "text"):
text_parts.append(getattr(part, "text", ""))
elif item_type == "function_call":
tool_calls_raw.append(SimpleNamespace(
id=getattr(item, "call_id", ""),
type="function",
function=SimpleNamespace(
name=getattr(item, "name", ""),
arguments=getattr(item, "arguments", "{}"),
),
))
tool_calls_raw.append(
SimpleNamespace(
id=getattr(item, "call_id", ""),
type="function",
function=SimpleNamespace(
name=getattr(item, "name", ""),
arguments=getattr(item, "arguments", "{}"),
),
)
)
resp_usage = getattr(final, "usage", None)
if resp_usage:
@ -285,6 +291,7 @@ class _AsyncCodexCompletionsAdapter:
async def create(self, **kwargs) -> Any:
import asyncio
return await asyncio.to_thread(self._sync.create, **kwargs)
@ -304,7 +311,7 @@ class AsyncCodexAuxiliaryClient:
self.base_url = sync_wrapper.base_url
def _read_nous_auth() -> Optional[dict]:
def _read_nous_auth() -> dict | None:
"""Read and validate ~/.hermes/auth.json for an active Nous provider.
Returns the provider state dict if Nous is active with tokens,
@ -336,10 +343,11 @@ def _nous_base_url() -> str:
return os.getenv("NOUS_INFERENCE_BASE_URL", _NOUS_DEFAULT_BASE_URL)
def _read_codex_access_token() -> Optional[str]:
def _read_codex_access_token() -> str | None:
"""Read a valid Codex OAuth access token from Hermes auth store (~/.hermes/auth.json)."""
try:
from hermes_cli.auth import _read_codex_tokens
data = _read_codex_tokens()
tokens = data.get("tokens", {})
access_token = tokens.get("access_token")
@ -351,7 +359,7 @@ def _read_codex_access_token() -> Optional[str]:
return None
def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
def _resolve_api_key_provider() -> tuple[OpenAI | None, str | None]:
"""Try each API-key provider in PROVIDER_REGISTRY order.
Returns (client, model) for the first provider whose env var is set,
@ -398,6 +406,7 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
# ── Provider resolution helpers ─────────────────────────────────────────────
def _get_auxiliary_provider(task: str = "") -> str:
"""Read the provider override for a specific auxiliary task.
@ -413,16 +422,15 @@ def _get_auxiliary_provider(task: str = "") -> str:
return "auto"
def _try_openrouter() -> Tuple[Optional[OpenAI], Optional[str]]:
def _try_openrouter() -> tuple[OpenAI | None, str | None]:
or_key = os.getenv("OPENROUTER_API_KEY")
if not or_key:
return None, None
logger.debug("Auxiliary client: OpenRouter")
return OpenAI(api_key=or_key, base_url=OPENROUTER_BASE_URL,
default_headers=_OR_HEADERS), _OPENROUTER_MODEL
return OpenAI(api_key=or_key, base_url=OPENROUTER_BASE_URL, default_headers=_OR_HEADERS), _OPENROUTER_MODEL
def _try_nous() -> Tuple[Optional[OpenAI], Optional[str]]:
def _try_nous() -> tuple[OpenAI | None, str | None]:
nous = _read_nous_auth()
if not nous:
return None, None
@ -435,7 +443,7 @@ def _try_nous() -> Tuple[Optional[OpenAI], Optional[str]]:
)
def _try_custom_endpoint() -> Tuple[Optional[OpenAI], Optional[str]]:
def _try_custom_endpoint() -> tuple[OpenAI | None, str | None]:
custom_base = os.getenv("OPENAI_BASE_URL")
custom_key = os.getenv("OPENAI_API_KEY")
if not custom_base or not custom_key:
@ -445,7 +453,7 @@ def _try_custom_endpoint() -> Tuple[Optional[OpenAI], Optional[str]]:
return OpenAI(api_key=custom_key, base_url=custom_base), model
def _try_codex() -> Tuple[Optional[Any], Optional[str]]:
def _try_codex() -> tuple[Any | None, str | None]:
codex_token = _read_codex_access_token()
if not codex_token:
return None, None
@ -454,7 +462,7 @@ def _try_codex() -> Tuple[Optional[Any], Optional[str]]:
return CodexAuxiliaryClient(real_client, _CODEX_AUX_MODEL), _CODEX_AUX_MODEL
def _resolve_forced_provider(forced: str) -> Tuple[Optional[OpenAI], Optional[str]]:
def _resolve_forced_provider(forced: str) -> tuple[OpenAI | None, str | None]:
"""Resolve a specific forced provider. Returns (None, None) if creds missing."""
if forced == "openrouter":
client, model = _try_openrouter()
@ -488,10 +496,9 @@ def _resolve_forced_provider(forced: str) -> Tuple[Optional[OpenAI], Optional[st
return None, None
def _resolve_auto() -> Tuple[Optional[OpenAI], Optional[str]]:
def _resolve_auto() -> tuple[OpenAI | None, str | None]:
"""Full auto-detection chain: OpenRouter → Nous → custom → Codex → API-key → None."""
for try_fn in (_try_openrouter, _try_nous, _try_custom_endpoint,
_try_codex, _resolve_api_key_provider):
for try_fn in (_try_openrouter, _try_nous, _try_custom_endpoint, _try_codex, _resolve_api_key_provider):
client, model = try_fn()
if client is not None:
return client, model
@ -501,7 +508,8 @@ def _resolve_auto() -> Tuple[Optional[OpenAI], Optional[str]]:
# ── Public API ──────────────────────────────────────────────────────────────
def get_text_auxiliary_client(task: str = "") -> Tuple[Optional[OpenAI], Optional[str]]:
def get_text_auxiliary_client(task: str = "") -> tuple[OpenAI | None, str | None]:
"""Return (client, default_model_slug) for text-only auxiliary tasks.
Args:
@ -544,7 +552,7 @@ def get_async_text_auxiliary_client(task: str = ""):
return AsyncOpenAI(**async_kwargs), model
def get_vision_auxiliary_client() -> Tuple[Optional[OpenAI], Optional[str]]:
def get_vision_auxiliary_client() -> tuple[OpenAI | None, str | None]:
"""Return (client, default_model_slug) for vision/multimodal auxiliary tasks.
Checks AUXILIARY_VISION_PROVIDER for a forced provider, otherwise
@ -564,8 +572,7 @@ def get_vision_auxiliary_client() -> Tuple[Optional[OpenAI], Optional[str]]:
# back to the user's custom endpoint. Many local models (Qwen-VL,
# LLaVA, Pixtral, etc.) support vision — skipping them entirely
# caused silent failures for local-only users.
for try_fn in (_try_openrouter, _try_nous, _try_codex,
_try_custom_endpoint):
for try_fn in (_try_openrouter, _try_nous, _try_codex, _try_custom_endpoint):
client, model = try_fn()
if client is not None:
return client, model
@ -575,7 +582,7 @@ def get_vision_auxiliary_client() -> Tuple[Optional[OpenAI], Optional[str]]:
def get_auxiliary_extra_body() -> dict:
"""Return extra_body kwargs for auxiliary API calls.
Includes Nous Portal product tags when the auxiliary client is backed
by Nous Portal. Returns empty dict otherwise.
"""
@ -584,7 +591,7 @@ def get_auxiliary_extra_body() -> dict:
def auxiliary_max_tokens_param(value: int) -> dict:
"""Return the correct max tokens kwarg for the auxiliary client's provider.
OpenRouter and local models use 'max_tokens'. Direct OpenAI with newer
models (gpt-4o, o-series, gpt-5+) requires 'max_completion_tokens'.
The Codex adapter translates max_tokens internally, so we use max_tokens
@ -593,8 +600,6 @@ def auxiliary_max_tokens_param(value: int) -> dict:
custom_base = os.getenv("OPENAI_BASE_URL", "")
or_key = os.getenv("OPENROUTER_API_KEY")
# Only use max_completion_tokens for direct OpenAI custom endpoints
if (not or_key
and _read_nous_auth() is None
and "api.openai.com" in custom_base.lower()):
if not or_key and _read_nous_auth() is None and "api.openai.com" in custom_base.lower():
return {"max_completion_tokens": value}
return {"max_tokens": value}

View file

@ -7,12 +7,12 @@ protecting head and tail context.
import logging
import os
from typing import Any, Dict, List, Optional
from typing import Any
from agent.auxiliary_client import get_text_auxiliary_client
from agent.model_metadata import (
get_model_context_length,
estimate_messages_tokens_rough,
get_model_context_length,
)
logger = logging.getLogger(__name__)
@ -56,7 +56,7 @@ class ContextCompressor:
self.client, default_model = get_text_auxiliary_client("compression")
self.summary_model = summary_model_override or default_model
def update_from_response(self, usage: Dict[str, Any]):
def update_from_response(self, usage: dict[str, Any]):
"""Update tracked token usage from API response."""
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
self.last_completion_tokens = usage.get("completion_tokens", 0)
@ -67,12 +67,12 @@ class ContextCompressor:
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
return tokens >= self.threshold_tokens
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
def should_compress_preflight(self, messages: list[dict[str, Any]]) -> bool:
"""Quick pre-flight check using rough estimate (before API call)."""
rough_estimate = estimate_messages_tokens_rough(messages)
return rough_estimate >= self.threshold_tokens
def get_status(self) -> Dict[str, Any]:
def get_status(self) -> dict[str, Any]:
"""Get current compression status for display/logging."""
return {
"last_prompt_tokens": self.last_prompt_tokens,
@ -82,7 +82,7 @@ class ContextCompressor:
"compression_count": self.compression_count,
}
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]:
def _generate_summary(self, turns_to_summarize: list[dict[str, Any]]) -> str | None:
"""Generate a concise summary of conversation turns.
Tries the auxiliary model first, then falls back to the user's main
@ -140,7 +140,9 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
logging.warning(f"Main model summary also failed: {fallback_err}")
# 3. All models failed — return None so the caller drops turns without a summary
logging.warning("Context compression: no model available for summary. Middle turns will be dropped without summary.")
logging.warning(
"Context compression: no model available for summary. Middle turns will be dropped without summary."
)
return None
def _call_summary_model(self, client, model: str, prompt: str) -> str:
@ -186,12 +188,14 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
# Don't fallback to the same provider that just failed
from hermes_constants import OPENROUTER_BASE_URL
if custom_base.rstrip("/") == OPENROUTER_BASE_URL.rstrip("/"):
return None, None
model = os.getenv("LLM_MODEL") or os.getenv("OPENAI_MODEL") or self.model
try:
from openai import OpenAI as _OpenAI
client = _OpenAI(api_key=custom_key, base_url=custom_base)
logger.debug("Built fallback auxiliary client: %s via %s", model, custom_base)
return client, model
@ -210,7 +214,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
return tc.get("id", "")
return getattr(tc, "id", "") or ""
def _sanitize_tool_pairs(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def _sanitize_tool_pairs(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Fix orphaned tool_call / tool_result pairs after compression.
Two failure modes:
@ -243,8 +247,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
orphaned_results = result_call_ids - surviving_call_ids
if orphaned_results:
messages = [
m for m in messages
if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results)
m for m in messages if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results)
]
if not self.quiet_mode:
logger.info("Compression sanitizer: removed %d orphaned tool result(s)", len(orphaned_results))
@ -252,25 +255,27 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
# 2. Add stub results for assistant tool_calls whose results were dropped
missing_results = surviving_call_ids - result_call_ids
if missing_results:
patched: List[Dict[str, Any]] = []
patched: list[dict[str, Any]] = []
for msg in messages:
patched.append(msg)
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
cid = self._get_tool_call_id(tc)
if cid in missing_results:
patched.append({
"role": "tool",
"content": "[Result from earlier conversation — see context summary above]",
"tool_call_id": cid,
})
patched.append(
{
"role": "tool",
"content": "[Result from earlier conversation — see context summary above]",
"tool_call_id": cid,
}
)
messages = patched
if not self.quiet_mode:
logger.info("Compression sanitizer: added %d stub tool result(s)", len(missing_results))
return messages
def _align_boundary_forward(self, messages: List[Dict[str, Any]], idx: int) -> int:
def _align_boundary_forward(self, messages: list[dict[str, Any]], idx: int) -> int:
"""Push a compress-start boundary forward past any orphan tool results.
If ``messages[idx]`` is a tool result, slide forward until we hit a
@ -280,7 +285,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
idx += 1
return idx
def _align_boundary_backward(self, messages: List[Dict[str, Any]], idx: int) -> int:
def _align_boundary_backward(self, messages: list[dict[str, Any]], idx: int) -> int:
"""Pull a compress-end boundary backward to avoid splitting a
tool_call / result group.
@ -298,7 +303,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
idx -= 1
return idx
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
def compress(self, messages: list[dict[str, Any]], current_tokens: int = None) -> list[dict[str, Any]]:
"""Compress conversation messages by summarizing middle turns.
Keeps first N + last N turns, summarizes everything in between.
@ -308,7 +313,9 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
n_messages = len(messages)
if n_messages <= self.protect_first_n + self.protect_last_n + 1:
if not self.quiet_mode:
print(f"⚠️ Cannot compress: only {n_messages} messages (need > {self.protect_first_n + self.protect_last_n + 1})")
print(
f"⚠️ Cannot compress: only {n_messages} messages (need > {self.protect_first_n + self.protect_last_n + 1})"
)
return messages
compress_start = self.protect_first_n
@ -323,14 +330,20 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
return messages
turns_to_summarize = messages[compress_start:compress_end]
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
display_tokens = (
current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
)
if not self.quiet_mode:
print(f"\n📦 Context compression triggered ({display_tokens:,} tokens ≥ {self.threshold_tokens:,} threshold)")
print(f" 📊 Model context limit: {self.context_length:,} tokens ({self.threshold_percent*100:.0f}% = {self.threshold_tokens:,})")
print(
f"\n📦 Context compression triggered ({display_tokens:,} tokens ≥ {self.threshold_tokens:,} threshold)"
)
print(
f" 📊 Model context limit: {self.context_length:,} tokens ({self.threshold_percent * 100:.0f}% = {self.threshold_tokens:,})"
)
if not self.quiet_mode:
print(f" 🗜️ Summarizing turns {compress_start+1}-{compress_end} ({len(turns_to_summarize)} turns)")
print(f" 🗜️ Summarizing turns {compress_start + 1}-{compress_end} ({len(turns_to_summarize)} turns)")
summary = self._generate_summary(turns_to_summarize)
@ -338,7 +351,9 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
for i in range(compress_start):
msg = messages[i].copy()
if i == 0 and msg.get("role") == "system" and self.compression_count == 0:
msg["content"] = (msg.get("content") or "") + "\n\n[Note: Some earlier conversation turns may be summarized to preserve context space.]"
msg["content"] = (
msg.get("content") or ""
) + "\n\n[Note: Some earlier conversation turns may be summarized to preserve context space.]"
compressed.append(msg)
if summary:

View file

@ -6,7 +6,6 @@ Used by AIAgent._execute_tool_calls for CLI feedback.
import json
import os
import random
import sys
import threading
import time
@ -20,19 +19,31 @@ _RESET = "\033[0m"
# Tool preview (one-line summary of a tool call's primary argument)
# =========================================================================
def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
"""Build a short preview of a tool call's primary argument for display."""
primary_args = {
"terminal": "command", "web_search": "query", "web_extract": "urls",
"read_file": "path", "write_file": "path", "patch": "path",
"search_files": "pattern", "browser_navigate": "url",
"browser_click": "ref", "browser_type": "text",
"image_generate": "prompt", "text_to_speech": "text",
"vision_analyze": "question", "mixture_of_agents": "user_prompt",
"skill_view": "name", "skills_list": "category",
"terminal": "command",
"web_search": "query",
"web_extract": "urls",
"read_file": "path",
"write_file": "path",
"patch": "path",
"search_files": "pattern",
"browser_navigate": "url",
"browser_click": "ref",
"browser_type": "text",
"image_generate": "prompt",
"text_to_speech": "text",
"vision_analyze": "question",
"mixture_of_agents": "user_prompt",
"skill_view": "name",
"skills_list": "category",
"schedule_cronjob": "name",
"execute_code": "code", "delegate_task": "goal",
"clarify": "question", "skill_manage": "name",
"execute_code": "code",
"delegate_task": "goal",
"clarify": "question",
"skill_manage": "name",
}
if tool_name == "process":
@ -61,18 +72,18 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
if tool_name == "session_search":
query = args.get("query", "")
return f"recall: \"{query[:25]}{'...' if len(query) > 25 else ''}\""
return f'recall: "{query[:25]}{"..." if len(query) > 25 else ""}"'
if tool_name == "memory":
action = args.get("action", "")
target = args.get("target", "")
if action == "add":
content = args.get("content", "")
return f"+{target}: \"{content[:25]}{'...' if len(content) > 25 else ''}\""
return f'+{target}: "{content[:25]}{"..." if len(content) > 25 else ""}"'
elif action == "replace":
return f"~{target}: \"{args.get('old_text', '')[:20]}\""
return f'~{target}: "{args.get("old_text", "")[:20]}"'
elif action == "remove":
return f"-{target}: \"{args.get('old_text', '')[:20]}\""
return f'-{target}: "{args.get("old_text", "")[:20]}"'
return action
if tool_name == "send_message":
@ -80,7 +91,7 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
msg = args.get("message", "")
if len(msg) > 20:
msg = msg[:17] + "..."
return f"to {target}: \"{msg}\""
return f'to {target}: "{msg}"'
if tool_name.startswith("rl_"):
rl_previews = {
@ -115,7 +126,7 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
if not preview:
return None
if len(preview) > max_len:
preview = preview[:max_len - 3] + "..."
preview = preview[: max_len - 3] + "..."
return preview
@ -123,41 +134,74 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
# KawaiiSpinner
# =========================================================================
class KawaiiSpinner:
"""Animated spinner with kawaii faces for CLI feedback during tool execution."""
SPINNERS = {
'dots': ['', '', '', '', '', '', '', '', '', ''],
'bounce': ['', '', '', '', '', '', '', ''],
'grow': ['', '', '', '', '', '', '', '', '', '', '', '', '', ''],
'arrows': ['', '', '', '', '', '', '', ''],
'star': ['', '', '', '', '', '', '', ''],
'moon': ['🌑', '🌒', '🌓', '🌔', '🌕', '🌖', '🌗', '🌘'],
'pulse': ['', '', '', '', '', ''],
'brain': ['🧠', '💭', '💡', '', '💫', '🌟', '💡', '💭'],
'sparkle': ['', '˚', '*', '', '', '', '*', '˚'],
"dots": ["", "", "", "", "", "", "", "", "", ""],
"bounce": ["", "", "", "", "", "", "", ""],
"grow": ["", "", "", "", "", "", "", "", "", "", "", "", "", ""],
"arrows": ["", "", "", "", "", "", "", ""],
"star": ["", "", "", "", "", "", "", ""],
"moon": ["🌑", "🌒", "🌓", "🌔", "🌕", "🌖", "🌗", "🌘"],
"pulse": ["", "", "", "", "", ""],
"brain": ["🧠", "💭", "💡", "", "💫", "🌟", "💡", "💭"],
"sparkle": ["", "˚", "*", "", "", "", "*", "˚"],
}
KAWAII_WAITING = [
"(。◕‿◕。)", "(◕‿◕✿)", "٩(◕‿◕。)۶", "(✿◠‿◠)", "( ˘▽˘)っ",
"♪(´ε` )", "(◕ᴗ◕✿)", "ヾ(^∇^)", "(≧◡≦)", "(★ω★)",
"(。◕‿◕。)",
"(◕‿◕✿)",
"٩(◕‿◕。)۶",
"(✿◠‿◠)",
"( ˘▽˘)っ",
"♪(´ε` )",
"(◕ᴗ◕✿)",
"ヾ(^∇^)",
"(≧◡≦)",
"(★ω★)",
]
KAWAII_THINKING = [
"(。•́︿•̀。)", "(◔_◔)", "(¬‿¬)", "( •_•)>⌐■-■", "(⌐■_■)",
"(´・_・`)", "◉_◉", "(°ロ°)", "( ˘⌣˘)♡", "ヽ(>∀<☆)☆",
"٩(๑❛ᴗ❛๑)۶", "(⊙_⊙)", "(¬_¬)", "( ͡° ͜ʖ ͡°)", "ಠ_ಠ",
"(。•́︿•̀。)",
"(◔_◔)",
"(¬‿¬)",
"( •_•)>⌐■-■",
"(⌐■_■)",
"(´・_・`)",
"◉_◉",
"(°ロ°)",
"( ˘⌣˘)♡",
"ヽ(>∀<☆)☆",
"٩(๑❛ᴗ❛๑)۶",
"(⊙_⊙)",
"(¬_¬)",
"( ͡° ͜ʖ ͡°)",
"ಠ_ಠ",
]
THINKING_VERBS = [
"pondering", "contemplating", "musing", "cogitating", "ruminating",
"deliberating", "mulling", "reflecting", "processing", "reasoning",
"analyzing", "computing", "synthesizing", "formulating", "brainstorming",
"pondering",
"contemplating",
"musing",
"cogitating",
"ruminating",
"deliberating",
"mulling",
"reflecting",
"processing",
"reasoning",
"analyzing",
"computing",
"synthesizing",
"formulating",
"brainstorming",
]
def __init__(self, message: str = "", spinner_type: str = 'dots'):
def __init__(self, message: str = "", spinner_type: str = "dots"):
self.message = message
self.spinner_frames = self.SPINNERS.get(spinner_type, self.SPINNERS['dots'])
self.spinner_frames = self.SPINNERS.get(spinner_type, self.SPINNERS["dots"])
self.running = False
self.thread = None
self.frame_idx = 0
@ -167,7 +211,7 @@ class KawaiiSpinner:
# child agents can replace sys.stdout with a black hole.
self._out = sys.stdout
def _write(self, text: str, end: str = '\n', flush: bool = False):
def _write(self, text: str, end: str = "\n", flush: bool = False):
"""Write to the stdout captured at spinner creation time."""
try:
self._out.write(text + end)
@ -185,7 +229,7 @@ class KawaiiSpinner:
elapsed = time.time() - self.start_time
line = f" {frame} {self.message} ({elapsed:.1f}s)"
pad = max(self.last_line_len - len(line), 0)
self._write(f"\r{line}{' ' * pad}", end='', flush=True)
self._write(f"\r{line}{' ' * pad}", end="", flush=True)
self.last_line_len = len(line)
self.frame_idx += 1
time.sleep(0.12)
@ -216,7 +260,7 @@ class KawaiiSpinner:
# Clear spinner line with spaces (not \033[K) to avoid garbled escape
# codes when prompt_toolkit's patch_stdout is active — same approach
# as stop(). Then print text; spinner redraws on next tick.
blanks = ' ' * max(self.last_line_len + 5, 40)
blanks = " " * max(self.last_line_len + 5, 40)
self._write(f"\r{blanks}\r {text}", flush=True)
def stop(self, final_message: str = None):
@ -225,8 +269,8 @@ class KawaiiSpinner:
self.thread.join(timeout=0.5)
# Clear the spinner line with spaces instead of \033[K to avoid
# garbled escape codes when prompt_toolkit's patch_stdout is active.
blanks = ' ' * max(self.last_line_len + 5, 40)
self._write(f"\r{blanks}\r", end='', flush=True)
blanks = " " * max(self.last_line_len + 5, 40)
self._write(f"\r{blanks}\r", end="", flush=True)
if final_message:
self._write(f" {final_message}", flush=True)
@ -244,38 +288,110 @@ class KawaiiSpinner:
# =========================================================================
KAWAII_SEARCH = [
"♪(´ε` )", "(。◕‿◕。)", "ヾ(^∇^)", "(◕ᴗ◕✿)", "( ˘▽˘)っ",
"٩(◕‿◕。)۶", "(✿◠‿◠)", "♪~(´ε` )", "(ノ´ヮ`)*:・゚✧", "(◎o◎)",
"♪(´ε` )",
"(。◕‿◕。)",
"ヾ(^∇^)",
"(◕ᴗ◕✿)",
"( ˘▽˘)っ",
"٩(◕‿◕。)۶",
"(✿◠‿◠)",
"♪~(´ε` )",
"(ノ´ヮ`)*:・゚✧",
"(◎o◎)",
]
KAWAII_READ = [
"φ(゜▽゜*)♪", "( ˘▽˘)っ", "(⌐■_■)", "٩(。•́‿•̀。)۶", "(◕‿◕✿)",
"ヾ(@⌒ー⌒@)", "(✧ω✧)", "♪(๑ᴖ◡ᴖ๑)♪", "(≧◡≦)", "( ´ ▽ ` )",
"φ(゜▽゜*)♪",
"( ˘▽˘)っ",
"(⌐■_■)",
"٩(。•́‿•̀。)۶",
"(◕‿◕✿)",
"ヾ(@⌒ー⌒@)",
"(✧ω✧)",
"♪(๑ᴖ◡ᴖ๑)♪",
"(≧◡≦)",
"( ´ ▽ ` )",
]
KAWAII_TERMINAL = [
"ヽ(>∀<☆)", "(ノ°∀°)", "٩(^ᴗ^)۶", "ヾ(⌐■_■)ノ♪", "(•̀ᴗ•́)و",
"┗(0)┓", "(`・ω・´)", "( ̄▽ ̄)", "(ง •̀_•́)ง", "ヽ(´▽`)/",
"ヽ(>∀<☆)",
"(ノ°∀°)",
"٩(^ᴗ^)۶",
"ヾ(⌐■_■)ノ♪",
"(•̀ᴗ•́)و",
"┗(0)┓",
"(`・ω・´)",
"( ̄▽ ̄)",
"(ง •̀_•́)ง",
"ヽ(´▽`)/",
]
KAWAII_BROWSER = [
"(ノ°∀°)", "(☞゚ヮ゚)☞", "( ͡° ͜ʖ ͡°)", "┌( ಠ_ಠ)┘", "(⊙_⊙)",
"ヾ(•ω•`)o", "( ̄ω ̄)", "( ˇωˇ )", "(ᵔᴥᵔ)", "(◎o◎)",
"(ノ°∀°)",
"(☞゚ヮ゚)☞",
"( ͡° ͜ʖ ͡°)",
"┌( ಠ_ಠ)┘",
"(⊙_⊙)",
"ヾ(•ω•`)o",
"( ̄ω ̄)",
"( ˇωˇ )",
"(ᵔᴥᵔ)",
"(◎o◎)",
]
KAWAII_CREATE = [
"✧*。٩(ˊᗜˋ*)و✧", "(ノ◕ヮ◕)ノ*:・゚✧", "ヽ(>∀<☆)", "٩(♡ε♡)۶", "(◕‿◕)♡",
"✿◕ ‿ ◕✿", "(*≧▽≦)", "ヾ(-)", "(☆▽☆)", "°˖✧◝(⁰▿⁰)◜✧˖°",
"✧*。٩(ˊᗜˋ*)و✧",
"(ノ◕ヮ◕)ノ*:・゚✧",
"ヽ(>∀<☆)",
"٩(♡ε♡)۶",
"(◕‿◕)♡",
"✿◕ ‿ ◕✿",
"(*≧▽≦)",
"ヾ(-)",
"(☆▽☆)",
"°˖✧◝(⁰▿⁰)◜✧˖°",
]
KAWAII_SKILL = [
"ヾ(@⌒ー⌒@)", "(๑˃ᴗ˂)ﻭ", "٩(◕‿◕。)۶", "(✿╹◡╹)", "ヽ(・∀・)",
"(ノ´ヮ`)*:・゚✧", "♪(๑ᴖ◡ᴖ๑)♪", "(◠‿◠)", "٩(ˊᗜˋ*)و", "(^▽^)",
"ヾ(^∇^)", "(★ω★)/", "٩(。•́‿•̀。)۶", "(◕ᴗ◕✿)", "(◎o◎)",
"(✧ω✧)", "ヽ(>∀<☆)", "( ˘▽˘)っ", "(≧◡≦) ♡", "ヾ( ̄▽ ̄)",
"ヾ(@⌒ー⌒@)",
"(๑˃ᴗ˂)ﻭ",
"٩(◕‿◕。)۶",
"(✿╹◡╹)",
"ヽ(・∀・)",
"(ノ´ヮ`)*:・゚✧",
"♪(๑ᴖ◡ᴖ๑)♪",
"(◠‿◠)",
"٩(ˊᗜˋ*)و",
"(^▽^)",
"ヾ(^∇^)",
"(★ω★)/",
"٩(。•́‿•̀。)۶",
"(◕ᴗ◕✿)",
"(◎o◎)",
"(✧ω✧)",
"ヽ(>∀<☆)",
"( ˘▽˘)っ",
"(≧◡≦) ♡",
"ヾ( ̄▽ ̄)",
]
KAWAII_THINK = [
"(っ°Д°;)っ", "(;′⌒`)", "(・_・ヾ", "( ´_ゝ`)", "( ̄ヘ ̄)",
"(。-`ω´-)", "( ˘︹˘ )", "(¬_¬)", "ヽ(ー_ー )", "(一_一)",
"(っ°Д°;)っ",
"(;′⌒`)",
"(・_・ヾ",
"( ´_ゝ`)",
"( ̄ヘ ̄)",
"(。-`ω´-)",
"( ˘︹˘ )",
"(¬_¬)",
"ヽ(ー_ー )",
"(一_一)",
]
KAWAII_GENERIC = [
"♪(´ε` )", "(◕‿◕✿)", "ヾ(^∇^)", "٩(◕‿◕。)۶", "(✿◠‿◠)",
"(ノ´ヮ`)*:・゚✧", "ヽ(>∀<☆)", "(☆▽☆)", "( ˘▽˘)っ", "(≧◡≦)",
"♪(´ε` )",
"(◕‿◕✿)",
"ヾ(^∇^)",
"٩(◕‿◕。)۶",
"(✿◠‿◠)",
"(ノ´ヮ`)*:・゚✧",
"ヽ(>∀<☆)",
"(☆▽☆)",
"( ˘▽˘)っ",
"(≧◡≦)",
]
@ -283,6 +399,7 @@ KAWAII_GENERIC = [
# Cute tool message (completion line that replaces the spinner)
# =========================================================================
def _detect_tool_failure(tool_name: str, result: str | None) -> tuple[bool, str]:
"""Inspect a tool result string for signs of failure.
@ -321,7 +438,10 @@ def _detect_tool_failure(tool_name: str, result: str | None) -> tuple[bool, str]
def get_cute_tool_message(
tool_name: str, args: dict, duration: float, result: str | None = None,
tool_name: str,
args: dict,
duration: float,
result: str | None = None,
) -> str:
"""Generate a formatted tool completion line for CLI quiet mode.
@ -335,11 +455,11 @@ def get_cute_tool_message(
def _trunc(s, n=40):
s = str(s)
return (s[:n-3] + "...") if len(s) > n else s
return (s[: n - 3] + "...") if len(s) > n else s
def _path(p, n=35):
p = str(p)
return ("..." + p[-(n-3):]) if len(p) > n else p
return ("..." + p[-(n - 3) :]) if len(p) > n else p
def _wrap(line: str) -> str:
"""Append failure suffix when the tool failed."""
@ -354,7 +474,7 @@ def get_cute_tool_message(
if urls:
url = urls[0] if isinstance(urls, list) else str(urls)
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
extra = f" +{len(urls)-1}" if len(urls) > 1 else ""
extra = f" +{len(urls) - 1}" if len(urls) > 1 else ""
return _wrap(f"┊ 📄 fetch {_trunc(domain, 35)}{extra} {dur}")
return _wrap(f"┊ 📄 fetch pages {dur}")
if tool_name == "web_crawl":
@ -366,8 +486,15 @@ def get_cute_tool_message(
if tool_name == "process":
action = args.get("action", "?")
sid = args.get("session_id", "")[:12]
labels = {"list": "ls processes", "poll": f"poll {sid}", "log": f"log {sid}",
"wait": f"wait {sid}", "kill": f"kill {sid}", "write": f"write {sid}", "submit": f"submit {sid}"}
labels = {
"list": "ls processes",
"poll": f"poll {sid}",
"log": f"log {sid}",
"wait": f"wait {sid}",
"kill": f"kill {sid}",
"write": f"write {sid}",
"submit": f"submit {sid}",
}
return _wrap(f"┊ ⚙️ proc {labels.get(action, f'{action} {sid}')} {dur}")
if tool_name == "read_file":
return _wrap(f"┊ 📖 read {_path(args.get('path', ''))} {dur}")
@ -390,7 +517,7 @@ def get_cute_tool_message(
if tool_name == "browser_click":
return _wrap(f"┊ 👆 click {args.get('ref', '?')} {dur}")
if tool_name == "browser_type":
return _wrap(f"┊ ⌨️ type \"{_trunc(args.get('text', ''), 30)}\" {dur}")
return _wrap(f'┊ ⌨️ type "{_trunc(args.get("text", ""), 30)}" {dur}')
if tool_name == "browser_scroll":
d = args.get("direction", "down")
arrow = {"down": "", "up": "", "right": "", "left": ""}.get(d, "")
@ -415,16 +542,16 @@ def get_cute_tool_message(
else:
return _wrap(f"┊ 📋 plan {len(todos_arg)} task(s) {dur}")
if tool_name == "session_search":
return _wrap(f"┊ 🔍 recall \"{_trunc(args.get('query', ''), 35)}\" {dur}")
return _wrap(f'┊ 🔍 recall "{_trunc(args.get("query", ""), 35)}" {dur}')
if tool_name == "memory":
action = args.get("action", "?")
target = args.get("target", "")
if action == "add":
return _wrap(f"┊ 🧠 memory +{target}: \"{_trunc(args.get('content', ''), 30)}\" {dur}")
return _wrap(f'┊ 🧠 memory +{target}: "{_trunc(args.get("content", ""), 30)}" {dur}')
elif action == "replace":
return _wrap(f"┊ 🧠 memory ~{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}")
return _wrap(f'┊ 🧠 memory ~{target}: "{_trunc(args.get("old_text", ""), 20)}" {dur}')
elif action == "remove":
return _wrap(f"┊ 🧠 memory -{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}")
return _wrap(f'┊ 🧠 memory -{target}: "{_trunc(args.get("old_text", ""), 20)}" {dur}')
return _wrap(f"┊ 🧠 memory {action} {dur}")
if tool_name == "skills_list":
return _wrap(f"┊ 📚 skills list {args.get('category', 'all')} {dur}")
@ -439,7 +566,7 @@ def get_cute_tool_message(
if tool_name == "mixture_of_agents":
return _wrap(f"┊ 🧠 reason {_trunc(args.get('user_prompt', ''), 30)} {dur}")
if tool_name == "send_message":
return _wrap(f"┊ 📨 send {args.get('target', '?')}: \"{_trunc(args.get('message', ''), 25)}\" {dur}")
return _wrap(f'┊ 📨 send {args.get("target", "?")}: "{_trunc(args.get("message", ""), 25)}" {dur}')
if tool_name == "schedule_cronjob":
return _wrap(f"┊ ⏰ schedule {_trunc(args.get('name', args.get('prompt', 'task')), 30)} {dur}")
if tool_name == "list_cronjobs":
@ -448,11 +575,16 @@ def get_cute_tool_message(
return _wrap(f"┊ ⏰ remove job {args.get('job_id', '?')} {dur}")
if tool_name.startswith("rl_"):
rl = {
"rl_list_environments": "list envs", "rl_select_environment": f"select {args.get('name', '')}",
"rl_get_current_config": "get config", "rl_edit_config": f"set {args.get('field', '?')}",
"rl_start_training": "start training", "rl_check_status": f"status {args.get('run_id', '?')[:12]}",
"rl_stop_training": f"stop {args.get('run_id', '?')[:12]}", "rl_get_results": f"results {args.get('run_id', '?')[:12]}",
"rl_list_runs": "list runs", "rl_test_inference": "test inference",
"rl_list_environments": "list envs",
"rl_select_environment": f"select {args.get('name', '')}",
"rl_get_current_config": "get config",
"rl_edit_config": f"set {args.get('field', '?')}",
"rl_start_training": "start training",
"rl_check_status": f"status {args.get('run_id', '?')[:12]}",
"rl_stop_training": f"stop {args.get('run_id', '?')[:12]}",
"rl_get_results": f"results {args.get('run_id', '?')[:12]}",
"rl_list_runs": "list runs",
"rl_test_inference": "test inference",
}
return _wrap(f"┊ 🧪 rl {rl.get(tool_name, tool_name.replace('rl_', ''))} {dur}")
if tool_name == "execute_code":

View file

@ -20,7 +20,7 @@ import json
import time
from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any
# =========================================================================
# Model pricing (USD per million tokens) — approximate as of early 2026
@ -81,7 +81,7 @@ def _has_known_pricing(model_name: str) -> bool:
return _get_pricing(model_name) is not _DEFAULT_PRICING
def _get_pricing(model_name: str) -> Dict[str, float]:
def _get_pricing(model_name: str) -> dict[str, float]:
"""Look up pricing for a model. Uses fuzzy matching on model name.
Returns _DEFAULT_PRICING (zero cost) for unknown/custom models
@ -150,7 +150,7 @@ def _format_duration(seconds: float) -> str:
return f"{days:.1f}d"
def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
def _bar_chart(values: list[int], max_width: int = 20) -> list[str]:
"""Create simple horizontal bar chart strings from values."""
peak = max(values) if values else 1
if peak == 0:
@ -176,7 +176,7 @@ class InsightsEngine:
self.db = db
self._conn = db._conn
def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]:
def generate(self, days: int = 30, source: str = None) -> dict[str, Any]:
"""
Generate a complete insights report.
@ -233,10 +233,11 @@ class InsightsEngine:
# =========================================================================
# Columns we actually need (skip system_prompt, model_config blobs)
_SESSION_COLS = ("id, source, model, started_at, ended_at, "
"message_count, tool_call_count, input_tokens, output_tokens")
_SESSION_COLS = (
"id, source, model, started_at, ended_at, message_count, tool_call_count, input_tokens, output_tokens"
)
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
def _get_sessions(self, cutoff: float, source: str = None) -> list[dict]:
"""Fetch sessions within the time window."""
if source:
cursor = self._conn.execute(
@ -254,7 +255,7 @@ class InsightsEngine:
)
return [dict(row) for row in cursor.fetchall()]
def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:
def _get_tool_usage(self, cutoff: float, source: str = None) -> list[dict]:
"""Get tool call counts from messages.
Uses two sources:
@ -341,12 +342,9 @@ class InsightsEngine:
tool_counts = merged
# Convert to the expected format
return [
{"tool_name": name, "count": count}
for name, count in tool_counts.most_common()
]
return [{"tool_name": name, "count": count} for name, count in tool_counts.most_common()]
def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
def _get_message_stats(self, cutoff: float, source: str = None) -> dict:
"""Get aggregate message statistics."""
if source:
cursor = self._conn.execute(
@ -373,16 +371,22 @@ class InsightsEngine:
(cutoff,),
)
row = cursor.fetchone()
return dict(row) if row else {
"total_messages": 0, "user_messages": 0,
"assistant_messages": 0, "tool_messages": 0,
}
return (
dict(row)
if row
else {
"total_messages": 0,
"user_messages": 0,
"assistant_messages": 0,
"tool_messages": 0,
}
)
# =========================================================================
# Computation
# =========================================================================
def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict:
def _compute_overview(self, sessions: list[dict], message_stats: dict) -> dict:
"""Compute high-level overview statistics."""
total_input = sum(s.get("input_tokens") or 0 for s in sessions)
total_output = sum(s.get("output_tokens") or 0 for s in sessions)
@ -442,12 +446,18 @@ class InsightsEngine:
"models_without_pricing": sorted(models_without_pricing),
}
def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
def _compute_model_breakdown(self, sessions: list[dict]) -> list[dict]:
"""Break down usage by model."""
model_data = defaultdict(lambda: {
"sessions": 0, "input_tokens": 0, "output_tokens": 0,
"total_tokens": 0, "tool_calls": 0, "cost": 0.0,
})
model_data = defaultdict(
lambda: {
"sessions": 0,
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
"tool_calls": 0,
"cost": 0.0,
}
)
for s in sessions:
model = s.get("model") or "unknown"
@ -464,20 +474,23 @@ class InsightsEngine:
d["cost"] += _estimate_cost(model, inp, out)
d["has_pricing"] = _has_known_pricing(model)
result = [
{"model": model, **data}
for model, data in model_data.items()
]
result = [{"model": model, **data} for model, data in model_data.items()]
# Sort by tokens first, fall back to session count when tokens are 0
result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True)
return result
def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]:
def _compute_platform_breakdown(self, sessions: list[dict]) -> list[dict]:
"""Break down usage by platform/source."""
platform_data = defaultdict(lambda: {
"sessions": 0, "messages": 0, "input_tokens": 0,
"output_tokens": 0, "total_tokens": 0, "tool_calls": 0,
})
platform_data = defaultdict(
lambda: {
"sessions": 0,
"messages": 0,
"input_tokens": 0,
"output_tokens": 0,
"total_tokens": 0,
"tool_calls": 0,
}
)
for s in sessions:
source = s.get("source") or "unknown"
@ -491,27 +504,26 @@ class InsightsEngine:
d["total_tokens"] += inp + out
d["tool_calls"] += s.get("tool_call_count") or 0
result = [
{"platform": platform, **data}
for platform, data in platform_data.items()
]
result = [{"platform": platform, **data} for platform, data in platform_data.items()]
result.sort(key=lambda x: x["sessions"], reverse=True)
return result
def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]:
def _compute_tool_breakdown(self, tool_usage: list[dict]) -> list[dict]:
"""Process tool usage data into a ranked list with percentages."""
total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0
result = []
for t in tool_usage:
pct = (t["count"] / total_calls * 100) if total_calls else 0
result.append({
"tool": t["tool_name"],
"count": t["count"],
"percentage": pct,
})
result.append(
{
"tool": t["tool_name"],
"count": t["count"],
"percentage": pct,
}
)
return result
def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
def _compute_activity_patterns(self, sessions: list[dict]) -> dict:
"""Analyze activity patterns by day of week and hour."""
day_counts = Counter() # 0=Monday ... 6=Sunday
hour_counts = Counter()
@ -527,15 +539,9 @@ class InsightsEngine:
daily_counts[dt.strftime("%Y-%m-%d")] += 1
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
day_breakdown = [
{"day": day_names[i], "count": day_counts.get(i, 0)}
for i in range(7)
]
day_breakdown = [{"day": day_names[i], "count": day_counts.get(i, 0)} for i in range(7)]
hour_breakdown = [
{"hour": i, "count": hour_counts.get(i, 0)}
for i in range(24)
]
hour_breakdown = [{"hour": i, "count": hour_counts.get(i, 0)} for i in range(24)]
# Busiest day and hour
busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None
@ -569,37 +575,40 @@ class InsightsEngine:
"max_streak": max_streak,
}
def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]:
def _compute_top_sessions(self, sessions: list[dict]) -> list[dict]:
"""Find notable sessions (longest, most messages, most tokens)."""
top = []
# Longest by duration
sessions_with_duration = [
s for s in sessions
if s.get("started_at") and s.get("ended_at")
]
sessions_with_duration = [s for s in sessions if s.get("started_at") and s.get("ended_at")]
if sessions_with_duration:
longest = max(
sessions_with_duration,
key=lambda s: (s["ended_at"] - s["started_at"]),
key=lambda s: s["ended_at"] - s["started_at"],
)
dur = longest["ended_at"] - longest["started_at"]
top.append({
"label": "Longest session",
"session_id": longest["id"][:16],
"value": _format_duration(dur),
"date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
})
top.append(
{
"label": "Longest session",
"session_id": longest["id"][:16],
"value": _format_duration(dur),
"date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
}
)
# Most messages
most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0)
if (most_msgs.get("message_count") or 0) > 0:
top.append({
"label": "Most messages",
"session_id": most_msgs["id"][:16],
"value": f"{most_msgs['message_count']} msgs",
"date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?",
})
top.append(
{
"label": "Most messages",
"session_id": most_msgs["id"][:16],
"value": f"{most_msgs['message_count']} msgs",
"date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d")
if most_msgs.get("started_at")
else "?",
}
)
# Most tokens
most_tokens = max(
@ -608,22 +617,30 @@ class InsightsEngine:
)
token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0)
if token_total > 0:
top.append({
"label": "Most tokens",
"session_id": most_tokens["id"][:16],
"value": f"{token_total:,} tokens",
"date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?",
})
top.append(
{
"label": "Most tokens",
"session_id": most_tokens["id"][:16],
"value": f"{token_total:,} tokens",
"date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d")
if most_tokens.get("started_at")
else "?",
}
)
# Most tool calls
most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0)
if (most_tools.get("tool_call_count") or 0) > 0:
top.append({
"label": "Most tool calls",
"session_id": most_tools["id"][:16],
"value": f"{most_tools['tool_call_count']} calls",
"date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?",
})
top.append(
{
"label": "Most tool calls",
"session_id": most_tools["id"][:16],
"value": f"{most_tools['tool_call_count']} calls",
"date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d")
if most_tools.get("started_at")
else "?",
}
)
return top
@ -631,7 +648,7 @@ class InsightsEngine:
# Formatting
# =========================================================================
def format_terminal(self, report: Dict) -> str:
def format_terminal(self, report: dict) -> str:
"""Format the insights report for terminal display (CLI)."""
if report.get("empty"):
days = report.get("days", 30)
@ -669,13 +686,17 @@ class InsightsEngine:
lines.append(" " + "" * 56)
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
lines.append(
f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}"
)
cost_str = f"${o['estimated_cost']:.2f}"
if o.get("models_without_pricing"):
cost_str += " *"
lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: {cost_str}")
if o["total_hours"] > 0:
lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}")
lines.append(
f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}"
)
lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}")
lines.append("")
@ -692,7 +713,7 @@ class InsightsEngine:
cost_cell = " N/A"
lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}")
if o.get("models_without_pricing"):
lines.append(f" * Cost N/A for custom/self-hosted models")
lines.append(" * Cost N/A for custom/self-hosted models")
lines.append("")
# Platform breakdown
@ -758,7 +779,7 @@ class InsightsEngine:
return "\n".join(lines)
def format_gateway(self, report: Dict) -> str:
def format_gateway(self, report: dict) -> str:
"""Format the insights report for gateway/messaging (shorter)."""
if report.get("empty"):
days = report.get("days", 30)
@ -771,14 +792,20 @@ class InsightsEngine:
lines.append(f"📊 **Hermes Insights** — Last {days} days\n")
# Overview
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
lines.append(
f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}"
)
lines.append(
f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})"
)
cost_note = ""
if o.get("models_without_pricing"):
cost_note = " _(excludes custom/self-hosted models)_"
lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}{cost_note}")
if o["total_hours"] > 0:
lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}")
lines.append(
f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}"
)
lines.append("")
# Models (top 5)
@ -786,7 +813,9 @@ class InsightsEngine:
lines.append("**🤖 Models:**")
for m in report["models"][:5]:
cost_str = f"${m['cost']:.2f}" if m.get("has_pricing") else "N/A"
lines.append(f" {m['model'][:25]}{m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}")
lines.append(
f" {m['model'][:25]}{m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}"
)
lines.append("")
# Platforms (if multi-platform)
@ -809,9 +838,13 @@ class InsightsEngine:
hr = act["busiest_hour"]["hour"]
ampm = "AM" if hr < 12 else "PM"
display_hr = hr % 12 or 12
lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)")
lines.append(
f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)"
)
if act.get("active_days"):
lines.append(f"**Active days:** {act['active_days']}", )
lines.append(
f"**Active days:** {act['active_days']}",
)
if act.get("max_streak", 0) > 1:
lines.append(f"**Best streak:** {act['max_streak']} consecutive days")

View file

@ -9,7 +9,7 @@ import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any
import requests
import yaml
@ -18,7 +18,7 @@ from hermes_constants import OPENROUTER_MODELS_URL
logger = logging.getLogger(__name__)
_model_metadata_cache: Dict[str, Dict[str, Any]] = {}
_model_metadata_cache: dict[str, dict[str, Any]] = {}
_model_metadata_cache_time: float = 0
_MODEL_CACHE_TTL = 3600
@ -63,7 +63,7 @@ DEFAULT_CONTEXT_LENGTHS = {
}
def fetch_model_metadata(force_refresh: bool = False) -> Dict[str, Dict[str, Any]]:
def fetch_model_metadata(force_refresh: bool = False) -> dict[str, dict[str, Any]]:
"""Fetch model metadata from OpenRouter (cached for 1 hour)."""
global _model_metadata_cache, _model_metadata_cache_time
@ -104,7 +104,7 @@ def _get_context_cache_path() -> Path:
return hermes_home / "context_length_cache.yaml"
def _load_context_cache() -> Dict[str, int]:
def _load_context_cache() -> dict[str, int]:
"""Load the model+provider → context_length cache from disk."""
path = _get_context_cache_path()
if not path.exists():
@ -139,14 +139,14 @@ def save_context_length(model: str, base_url: str, length: int) -> None:
logger.debug("Failed to save context length cache: %s", e)
def get_cached_context_length(model: str, base_url: str) -> Optional[int]:
def get_cached_context_length(model: str, base_url: str) -> int | None:
"""Look up a previously discovered context length for model+provider."""
key = f"{model}@{base_url}"
cache = _load_context_cache()
return cache.get(key)
def get_next_probe_tier(current_length: int) -> Optional[int]:
def get_next_probe_tier(current_length: int) -> int | None:
"""Return the next lower probe tier, or None if already at minimum."""
for tier in CONTEXT_PROBE_TIERS:
if tier < current_length:
@ -154,7 +154,7 @@ def get_next_probe_tier(current_length: int) -> Optional[int]:
return None
def parse_context_limit_from_error(error_msg: str) -> Optional[int]:
def parse_context_limit_from_error(error_msg: str) -> int | None:
"""Try to extract the actual context limit from an API error message.
Many providers include the limit in their error text, e.g.:
@ -166,11 +166,11 @@ def parse_context_limit_from_error(error_msg: str) -> Optional[int]:
error_lower = error_msg.lower()
# Pattern: look for numbers near context-related keywords
patterns = [
r'(?:max(?:imum)?|limit)\s*(?:context\s*)?(?:length|size|window)?\s*(?:is|of|:)?\s*(\d{4,})',
r'context\s*(?:length|size|window)\s*(?:is|of|:)?\s*(\d{4,})',
r'(\d{4,})\s*(?:token)?\s*(?:context|limit)',
r'>\s*(\d{4,})\s*(?:max|limit|token)', # "250000 tokens > 200000 maximum"
r'(\d{4,})\s*(?:max(?:imum)?)\b', # "200000 maximum"
r"(?:max(?:imum)?|limit)\s*(?:context\s*)?(?:length|size|window)?\s*(?:is|of|:)?\s*(\d{4,})",
r"context\s*(?:length|size|window)\s*(?:is|of|:)?\s*(\d{4,})",
r"(\d{4,})\s*(?:token)?\s*(?:context|limit)",
r">\s*(\d{4,})\s*(?:max|limit|token)", # "250000 tokens > 200000 maximum"
r"(\d{4,})\s*(?:max(?:imum)?)\b", # "200000 maximum"
]
for pattern in patterns:
match = re.search(pattern, error_lower)
@ -218,7 +218,7 @@ def estimate_tokens_rough(text: str) -> int:
return len(text) // 4
def estimate_messages_tokens_rough(messages: List[Dict[str, Any]]) -> int:
def estimate_messages_tokens_rough(messages: list[dict[str, Any]]) -> int:
"""Rough token estimate for a message list (pre-flight only)."""
total_chars = sum(len(str(msg)) for msg in messages)
return total_chars // 4

View file

@ -8,7 +8,6 @@ import logging
import os
import re
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
@ -18,21 +17,29 @@ logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
_CONTEXT_THREAT_PATTERNS = [
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
(r'system\s+prompt\s+override', "sys_prompt_override"),
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
(r'<!--[^>]*(?:ignore|override|system|secret|hidden)[^>]*-->', "html_comment_injection"),
(r"ignore\s+(previous|all|above|prior)\s+instructions", "prompt_injection"),
(r"do\s+not\s+tell\s+the\s+user", "deception_hide"),
(r"system\s+prompt\s+override", "sys_prompt_override"),
(r"disregard\s+(your|all|any)\s+(instructions|rules|guidelines)", "disregard_rules"),
(r"act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)", "bypass_restrictions"),
(r"<!--[^>]*(?:ignore|override|system|secret|hidden)[^>]*-->", "html_comment_injection"),
(r'<\s*div\s+style\s*=\s*["\'].*display\s*:\s*none', "hidden_div"),
(r'translate\s+.*\s+into\s+.*\s+and\s+(execute|run|eval)', "translate_execute"),
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"),
(r"translate\s+.*\s+into\s+.*\s+and\s+(execute|run|eval)", "translate_execute"),
(r"curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)", "exfil_curl"),
(r"cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)", "read_secrets"),
]
_CONTEXT_INVISIBLE_CHARS = {
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
"\u200b",
"\u200c",
"\u200d",
"\u2060",
"\ufeff",
"\u202a",
"\u202b",
"\u202c",
"\u202d",
"\u202e",
}
@ -52,10 +59,13 @@ def _scan_context_content(content: str, filename: str) -> str:
if findings:
logger.warning("Context file %s blocked: %s", filename, ", ".join(findings))
return f"[BLOCKED: {filename} contained potential prompt injection ({', '.join(findings)}). Content not loaded.]"
return (
f"[BLOCKED: {filename} contained potential prompt injection ({', '.join(findings)}). Content not loaded.]"
)
return content
# =========================================================================
# Constants
# =========================================================================
@ -131,10 +141,7 @@ PLATFORM_HINTS = {
"files arrive as downloadable documents. You can also include image "
"URLs in markdown format ![alt](url) and they will be sent as photos."
),
"cli": (
"You are a CLI AI Agent. Try not to use markdown but simple text "
"renderable inside a terminal."
),
"cli": ("You are a CLI AI Agent. Try not to use markdown but simple text renderable inside a terminal."),
}
CONTEXT_FILE_MAX_CHARS = 20_000
@ -146,18 +153,20 @@ CONTEXT_TRUNCATE_TAIL_RATIO = 0.2
# Skills index
# =========================================================================
def _read_skill_description(skill_file: Path, max_chars: int = 60) -> str:
"""Read the description from a SKILL.md frontmatter, capped at max_chars."""
try:
raw = skill_file.read_text(encoding="utf-8")[:2000]
match = re.search(
r"^---\s*\n.*?description:\s*(.+?)\s*\n.*?^---",
raw, re.MULTILINE | re.DOTALL,
raw,
re.MULTILINE | re.DOTALL,
)
if match:
desc = match.group(1).strip().strip("'\"")
if len(desc) > max_chars:
desc = desc[:max_chars - 3] + "..."
desc = desc[: max_chars - 3] + "..."
return desc
except Exception:
pass
@ -172,6 +181,7 @@ def _skill_is_platform_compatible(skill_file: Path) -> bool:
"""
try:
from tools.skills_tool import _parse_frontmatter, skill_matches_platform
raw = skill_file.read_text(encoding="utf-8")[:2000]
frontmatter, _ = _parse_frontmatter(raw)
return skill_matches_platform(frontmatter)
@ -260,8 +270,7 @@ def build_skills_system_prompt() -> str:
"load it with skill_view(name) and follow its instructions. "
"If a skill has issues, fix it with skill_manage(action='patch').\n"
"\n"
"<available_skills>\n"
+ "\n".join(index_lines) + "\n"
"<available_skills>\n" + "\n".join(index_lines) + "\n"
"</available_skills>\n"
"\n"
"If none match, proceed normally without loading a skill."
@ -272,6 +281,7 @@ def build_skills_system_prompt() -> str:
# Context files (SOUL.md, AGENTS.md, .cursorrules)
# =========================================================================
def _truncate_content(content: str, filename: str, max_chars: int = CONTEXT_FILE_MAX_CHARS) -> str:
"""Head/tail truncation with a marker in the middle."""
if len(content) <= max_chars:
@ -284,7 +294,7 @@ def _truncate_content(content: str, filename: str, max_chars: int = CONTEXT_FILE
return head + marker + tail
def build_context_files_prompt(cwd: Optional[str] = None) -> str:
def build_context_files_prompt(cwd: str | None = None) -> str:
"""Discover and load context files for the system prompt.
Discovery: AGENTS.md (recursive), .cursorrules / .cursor/rules/*.mdc,
@ -307,7 +317,9 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
if top_level_agents:
agents_files = []
for root, dirs, files in os.walk(cwd_path):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('node_modules', '__pycache__', 'venv', '.venv')]
dirs[:] = [
d for d in dirs if not d.startswith(".") and d not in ("node_modules", "__pycache__", "venv", ".venv")
]
for f in files:
if f.lower() == "agents.md":
agents_files.append(Path(root) / f)
@ -384,4 +396,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
if not sections:
return ""
return "# Project Context\n\nThe following project context files have been loaded and should be followed:\n\n" + "\n".join(sections)
return (
"# Project Context\n\nThe following project context files have been loaded and should be followed:\n\n"
+ "\n".join(sections)
)

View file

@ -9,7 +9,7 @@ Pure functions -- no class state, no AIAgent dependency.
"""
import copy
from typing import Any, Dict, List
from typing import Any
def _apply_cache_marker(msg: dict, cache_marker: dict) -> None:
@ -36,9 +36,9 @@ def _apply_cache_marker(msg: dict, cache_marker: dict) -> None:
def apply_anthropic_cache_control(
api_messages: List[Dict[str, Any]],
api_messages: list[dict[str, Any]],
cache_ttl: str = "5m",
) -> List[Dict[str, Any]]:
) -> list[dict[str, Any]]:
"""Apply system_and_3 caching strategy to messages for Anthropic models.
Places up to 4 cache_control breakpoints: system prompt + last 3 non-system messages.

View file

@ -10,34 +10,33 @@ the first 6 and last 4 characters for debuggability.
import logging
import os
import re
from typing import Optional
logger = logging.getLogger(__name__)
# Known API key prefixes -- match the prefix + contiguous token chars
_PREFIX_PATTERNS = [
r"sk-[A-Za-z0-9_-]{10,}", # OpenAI / OpenRouter / Anthropic (sk-ant-*)
r"ghp_[A-Za-z0-9]{10,}", # GitHub PAT (classic)
r"github_pat_[A-Za-z0-9_]{10,}", # GitHub PAT (fine-grained)
r"xox[baprs]-[A-Za-z0-9-]{10,}", # Slack tokens
r"AIza[A-Za-z0-9_-]{30,}", # Google API keys
r"pplx-[A-Za-z0-9]{10,}", # Perplexity
r"fal_[A-Za-z0-9_-]{10,}", # Fal.ai
r"fc-[A-Za-z0-9]{10,}", # Firecrawl
r"bb_live_[A-Za-z0-9_-]{10,}", # BrowserBase
r"gAAAA[A-Za-z0-9_=-]{20,}", # Codex encrypted tokens
r"AKIA[A-Z0-9]{16}", # AWS Access Key ID
r"sk_live_[A-Za-z0-9]{10,}", # Stripe secret key (live)
r"sk_test_[A-Za-z0-9]{10,}", # Stripe secret key (test)
r"rk_live_[A-Za-z0-9]{10,}", # Stripe restricted key
r"SG\.[A-Za-z0-9_-]{10,}", # SendGrid API key
r"hf_[A-Za-z0-9]{10,}", # HuggingFace token
r"r8_[A-Za-z0-9]{10,}", # Replicate API token
r"npm_[A-Za-z0-9]{10,}", # npm access token
r"pypi-[A-Za-z0-9_-]{10,}", # PyPI API token
r"dop_v1_[A-Za-z0-9]{10,}", # DigitalOcean PAT
r"doo_v1_[A-Za-z0-9]{10,}", # DigitalOcean OAuth
r"am_[A-Za-z0-9_-]{10,}", # AgentMail API key
r"sk-[A-Za-z0-9_-]{10,}", # OpenAI / OpenRouter / Anthropic (sk-ant-*)
r"ghp_[A-Za-z0-9]{10,}", # GitHub PAT (classic)
r"github_pat_[A-Za-z0-9_]{10,}", # GitHub PAT (fine-grained)
r"xox[baprs]-[A-Za-z0-9-]{10,}", # Slack tokens
r"AIza[A-Za-z0-9_-]{30,}", # Google API keys
r"pplx-[A-Za-z0-9]{10,}", # Perplexity
r"fal_[A-Za-z0-9_-]{10,}", # Fal.ai
r"fc-[A-Za-z0-9]{10,}", # Firecrawl
r"bb_live_[A-Za-z0-9_-]{10,}", # BrowserBase
r"gAAAA[A-Za-z0-9_=-]{20,}", # Codex encrypted tokens
r"AKIA[A-Z0-9]{16}", # AWS Access Key ID
r"sk_live_[A-Za-z0-9]{10,}", # Stripe secret key (live)
r"sk_test_[A-Za-z0-9]{10,}", # Stripe secret key (test)
r"rk_live_[A-Za-z0-9]{10,}", # Stripe restricted key
r"SG\.[A-Za-z0-9_-]{10,}", # SendGrid API key
r"hf_[A-Za-z0-9]{10,}", # HuggingFace token
r"r8_[A-Za-z0-9]{10,}", # Replicate API token
r"npm_[A-Za-z0-9]{10,}", # npm access token
r"pypi-[A-Za-z0-9_-]{10,}", # PyPI API token
r"dop_v1_[A-Za-z0-9]{10,}", # DigitalOcean PAT
r"doo_v1_[A-Za-z0-9]{10,}", # DigitalOcean OAuth
r"am_[A-Za-z0-9_-]{10,}", # AgentMail API key
]
# ENV assignment patterns: KEY=value where KEY contains a secret-like name
@ -66,9 +65,7 @@ _TELEGRAM_RE = re.compile(
)
# Private key blocks: -----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----
_PRIVATE_KEY_RE = re.compile(
r"-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----"
)
_PRIVATE_KEY_RE = re.compile(r"-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----")
# Database connection strings: protocol://user:PASSWORD@host
# Catches postgres, mysql, mongodb, redis, amqp URLs and redacts the password
@ -82,9 +79,7 @@ _DB_CONNSTR_RE = re.compile(
_SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])")
# Compile known prefix patterns into one alternation
_PREFIX_RE = re.compile(
r"(?<![A-Za-z0-9_-])(" + "|".join(_PREFIX_PATTERNS) + r")(?![A-Za-z0-9_-])"
)
_PREFIX_RE = re.compile(r"(?<![A-Za-z0-9_-])(" + "|".join(_PREFIX_PATTERNS) + r")(?![A-Za-z0-9_-])")
def _mask_token(token: str) -> str:
@ -112,12 +107,14 @@ def redact_sensitive_text(text: str) -> str:
def _redact_env(m):
name, quote, value = m.group(1), m.group(2), m.group(3)
return f"{name}={quote}{_mask_token(value)}{quote}"
text = _ENV_ASSIGN_RE.sub(_redact_env, text)
# JSON fields: "apiKey": "value"
def _redact_json(m):
key, value = m.group(1), m.group(2)
return f'{key}: "{_mask_token(value)}"'
text = _JSON_FIELD_RE.sub(_redact_json, text)
# Authorization headers
@ -131,6 +128,7 @@ def redact_sensitive_text(text: str) -> str:
prefix = m.group(1) or ""
digits = m.group(2)
return f"{prefix}{digits}:***"
text = _TELEGRAM_RE.sub(_redact_telegram, text)
# Private key blocks
@ -145,6 +143,7 @@ def redact_sensitive_text(text: str) -> str:
if len(phone) <= 8:
return phone[:2] + "****" + phone[-2:]
return phone[:4] + "****" + phone[-4:]
text = _SIGNAL_PHONE_RE.sub(_redact_phone, text)
return text
@ -153,7 +152,7 @@ def redact_sensitive_text(text: str) -> str:
class RedactingFormatter(logging.Formatter):
"""Log formatter that redacts secrets from all log messages."""
def __init__(self, fmt=None, datefmt=None, style='%', **kwargs):
def __init__(self, fmt=None, datefmt=None, style="%", **kwargs):
super().__init__(fmt, datefmt, style, **kwargs)
def format(self, record: logging.LogRecord) -> str:

View file

@ -6,14 +6,14 @@ can invoke skills via /skill-name commands.
import logging
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any
logger = logging.getLogger(__name__)
_skill_commands: Dict[str, Dict[str, Any]] = {}
_skill_commands: dict[str, dict[str, Any]] = {}
def scan_skill_commands() -> Dict[str, Dict[str, Any]]:
def scan_skill_commands() -> dict[str, dict[str, Any]]:
"""Scan ~/.hermes/skills/ and return a mapping of /command -> skill info.
Returns:
@ -23,26 +23,27 @@ def scan_skill_commands() -> Dict[str, Dict[str, Any]]:
_skill_commands = {}
try:
from tools.skills_tool import SKILLS_DIR, _parse_frontmatter, skill_matches_platform
if not SKILLS_DIR.exists():
return _skill_commands
for skill_md in SKILLS_DIR.rglob("SKILL.md"):
if any(part in ('.git', '.github', '.hub') for part in skill_md.parts):
if any(part in (".git", ".github", ".hub") for part in skill_md.parts):
continue
try:
content = skill_md.read_text(encoding='utf-8')
content = skill_md.read_text(encoding="utf-8")
frontmatter, body = _parse_frontmatter(content)
# Skip skills incompatible with the current OS platform
if not skill_matches_platform(frontmatter):
continue
name = frontmatter.get('name', skill_md.parent.name)
description = frontmatter.get('description', '')
name = frontmatter.get("name", skill_md.parent.name)
description = frontmatter.get("description", "")
if not description:
for line in body.strip().split('\n'):
for line in body.strip().split("\n"):
line = line.strip()
if line and not line.startswith('#'):
if line and not line.startswith("#"):
description = line[:80]
break
cmd_name = name.lower().replace(' ', '-').replace('_', '-')
cmd_name = name.lower().replace(" ", "-").replace("_", "-")
_skill_commands[f"/{cmd_name}"] = {
"name": name,
"description": description or f"Invoke the {name} skill",
@ -56,14 +57,14 @@ def scan_skill_commands() -> Dict[str, Dict[str, Any]]:
return _skill_commands
def get_skill_commands() -> Dict[str, Dict[str, Any]]:
def get_skill_commands() -> dict[str, dict[str, Any]]:
"""Return the current skill commands mapping (scan first if empty)."""
if not _skill_commands:
scan_skill_commands()
return _skill_commands
def build_skill_invocation_message(cmd_key: str, user_instruction: str = "") -> Optional[str]:
def build_skill_invocation_message(cmd_key: str, user_instruction: str = "") -> str | None:
"""Build the user message content for a skill slash command invocation.
Args:
@ -83,7 +84,7 @@ def build_skill_invocation_message(cmd_key: str, user_instruction: str = "") ->
skill_name = skill_info["name"]
try:
content = skill_md_path.read_text(encoding='utf-8')
content = skill_md_path.read_text(encoding="utf-8")
except Exception:
return f"[Failed to load skill: {skill_name}]"
@ -111,6 +112,8 @@ def build_skill_invocation_message(cmd_key: str, user_instruction: str = "") ->
if user_instruction:
parts.append("")
parts.append(f"The user has provided the following instruction alongside the skill invocation: {user_instruction}")
parts.append(
f"The user has provided the following instruction alongside the skill invocation: {user_instruction}"
)
return "\n".join(parts)

View file

@ -8,7 +8,7 @@ the file-write logic live here.
import json
import logging
from datetime import datetime
from typing import Any, Dict, List
from typing import Any
logger = logging.getLogger(__name__)
@ -27,8 +27,7 @@ def has_incomplete_scratchpad(content: str) -> bool:
return "<REASONING_SCRATCHPAD>" in content and "</REASONING_SCRATCHPAD>" not in content
def save_trajectory(trajectory: List[Dict[str, Any]], model: str,
completed: bool, filename: str = None):
def save_trajectory(trajectory: list[dict[str, Any]], model: str, completed: bool, filename: str = None):
"""Append a trajectory entry to a JSONL file.
Args: