chore: fix 154 f-strings, simplify getattr/URL patterns, remove dead code (#3119)

Three categories of cleanup, all zero-behavioral-change:

1. F-strings without placeholders (154 fixes across 29 files)
   - Converted f'...' to '...' where no {expression} was present
   - Heaviest files: run_agent.py (24), cli.py (20), honcho_integration/cli.py (34)

2. Simplify defensive patterns in run_agent.py
   - Added explicit self._is_anthropic_oauth = False in __init__ (before
     the api_mode branch that conditionally sets it)
   - Replaced 7x getattr(self, '_is_anthropic_oauth', False) with direct
     self._is_anthropic_oauth (attribute always initialized now)
   - Added _is_openrouter_url() and _is_anthropic_url() helper methods
   - Replaced 3 inline 'openrouter' in self._base_url_lower checks

3. Remove dead code in small files
   - hermes_cli/claw.py: removed unused 'total' computation
   - tools/fuzzy_match.py: removed unused strip_indent() function and
     pattern_stripped variable

Full test suite: 6184 passed, 0 failures
E2E PTY: banner clean, tool calls work, zero garbled ANSI
This commit is contained in:
Teknium 2026-03-25 19:47:58 -07:00 committed by GitHub
parent 08d3be0412
commit cbf195e806
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 173 additions and 170 deletions

View file

@ -666,7 +666,7 @@ class InsightsEngine:
cost_cell = " N/A"
lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}")
if o.get("models_without_pricing"):
lines.append(f" * Cost N/A for custom/self-hosted models")
lines.append(" * Cost N/A for custom/self-hosted models")
lines.append("")
# Platform breakdown

40
cli.py
View file

@ -2414,13 +2414,13 @@ class HermesCLI:
print(f" ✅ Restored {file_path} from checkpoint {result['restored_to']}: {result['reason']}")
else:
print(f" ✅ Restored to checkpoint {result['restored_to']}: {result['reason']}")
print(f" A pre-rollback snapshot was saved automatically.")
print(" A pre-rollback snapshot was saved automatically.")
# Also undo the last conversation turn so the agent's context
# matches the restored filesystem state
if self.conversation_history:
self.undo_last()
print(f" Chat turn undone to match restored file state.")
print(" Chat turn undone to match restored file state.")
else:
print(f"{result['error']}")
@ -3067,15 +3067,15 @@ class HermesCLI:
print(f" endpoint: {custom_url}")
if is_active:
print(f" model: {self.model} ← current")
print(f" (use hermes model to change)")
print(" (use hermes model to change)")
else:
print(f" (use hermes model to change)")
print(" (use hermes model to change)")
print()
if unauthed:
names = ", ".join(p["label"] for p in unauthed)
print(f" Not configured: {names}")
print(f" Run: hermes setup")
print(" Run: hermes setup")
print()
print(" To change model or provider, use: hermes model")
@ -3099,9 +3099,9 @@ class HermesCLI:
self.system_prompt = new_prompt
self.agent = None # Force re-init
if save_config_value("agent.system_prompt", new_prompt):
print(f"(^_^)b System prompt set (saved to config)")
print("(^_^)b System prompt set (saved to config)")
else:
print(f"(^_^) System prompt set (session only)")
print("(^_^) System prompt set (session only)")
print(f" \"{new_prompt[:60]}{'...' if len(new_prompt) > 60 else ''}\"")
else:
# Show current prompt
@ -3642,7 +3642,7 @@ class HermesCLI:
elif self._pending_title:
_cprint(f" Title (pending): {self._pending_title}")
else:
_cprint(f" No title set. Usage: /title <your session title>")
_cprint(" No title set. Usage: /title <your session title>")
else:
_cprint(" Session database not available.")
elif canonical == "new":
@ -3701,7 +3701,7 @@ class HermesCLI:
plugins = mgr.list_plugins()
if not plugins:
print("No plugins installed.")
print(f"Drop plugin directories into ~/.hermes/plugins/ to get started.")
print("Drop plugin directories into ~/.hermes/plugins/ to get started.")
else:
print(f"Plugins ({len(plugins)}):")
for p in plugins:
@ -3894,7 +3894,7 @@ class HermesCLI:
_cprint(f" 🔄 Background task #{task_num} started: \"{prompt[:60]}{'...' if len(prompt) > 60 else ''}\"")
_cprint(f" Task ID: {task_id}")
_cprint(f" You can continue chatting — results will appear when done.\n")
_cprint(" You can continue chatting — results will appear when done.\n")
turn_route = self._resolve_turn_agent_config(prompt)
@ -4104,7 +4104,7 @@ class HermesCLI:
print(f" ⚠ Chrome launched but port {_port} isn't responding yet")
print(" You may need to close existing Chrome windows first and retry")
else:
print(f" ⚠ Could not auto-launch Chrome")
print(" ⚠ Could not auto-launch Chrome")
# Show manual instructions as fallback
sys_name = _plat.system()
if sys_name == "Darwin":
@ -4161,7 +4161,7 @@ class HermesCLI:
elif sub == "status":
print()
if current:
print(f"🌐 Browser: connected to live Chrome via CDP")
print("🌐 Browser: connected to live Chrome via CDP")
print(f" Endpoint: {current}")
_port = 9222
@ -4175,9 +4175,9 @@ class HermesCLI:
s.settimeout(1)
s.connect(("127.0.0.1", _port))
s.close()
print(f" Status: ✓ reachable")
print(" Status: ✓ reachable")
except (OSError, Exception):
print(f" Status: ⚠ not reachable (Chrome may not be running)")
print(" Status: ⚠ not reachable (Chrome may not be running)")
elif os.environ.get("BROWSERBASE_API_KEY"):
print("🌐 Browser: Browserbase (cloud)")
else:
@ -4210,13 +4210,13 @@ class HermesCLI:
current = get_active_skin_name()
skins = list_skins()
print(f"\n Current skin: {current}")
print(f" Available skins:")
print(" Available skins:")
for s in skins:
marker = "" if s["name"] == current else " "
source = f" ({s['source']})" if s["source"] == "user" else ""
print(f" {marker} {s['name']}{source}{s['description']}")
print(f"\n Usage: /skin <name>")
print(f" Custom skins: drop a YAML file in ~/.hermes/skins/\n")
print("\n Usage: /skin <name>")
print(" Custom skins: drop a YAML file in ~/.hermes/skins/\n")
return
new_skin = parts[1].strip().lower()
@ -4413,7 +4413,7 @@ class HermesCLI:
)
elapsed = format_duration_compact((datetime.now() - self.session_start).total_seconds())
print(f" 📊 Session Token Usage")
print(" 📊 Session Token Usage")
print(f" {'' * 40}")
print(f" Model: {agent.model}")
print(f" Input tokens: {input_tokens:>10,}")
@ -5549,7 +5549,7 @@ class HermesCLI:
# But if it does (race condition), don't interrupt.
if self._clarify_state or self._clarify_freetext:
continue
print(f"\n⚡ New message detected, interrupting...")
print("\n⚡ New message detected, interrupting...")
# Signal TTS to stop on interrupt
if stop_event is not None:
stop_event.set()
@ -5762,7 +5762,7 @@ class HermesCLI:
else:
duration_str = f"{seconds}s"
print(f"Resume this session with:")
print("Resume this session with:")
print(f" hermes --resume {self.session_id}")
print()
print(f"Session: {self.session_id}")

View file

@ -2885,7 +2885,7 @@ class GatewayRunner:
else:
preview = prompt[:50] + "..." if len(prompt) > 50 else prompt
lines.append(f"• `{name}` — {preview}")
lines.append(f"\nUsage: `/personality <name>`")
lines.append("\nUsage: `/personality <name>`")
return "\n".join(lines)
def _resolve_prompt(value):

View file

@ -2012,7 +2012,7 @@ def _login_openai_codex(args, pconfig: ProviderConfig) -> None:
config_path = _update_config_for_provider("openai-codex", creds.get("base_url", DEFAULT_CODEX_BASE_URL))
print()
print("Login successful!")
print(f" Auth state: ~/.hermes/auth.json")
print(" Auth state: ~/.hermes/auth.json")
print(f" Config updated: {config_path} (model.provider=openai-codex)")
@ -2056,9 +2056,9 @@ def _codex_device_code_login() -> Dict[str, Any]:
# Step 2: Show user the code
print("To continue, follow these steps:\n")
print(f" 1. Open this URL in your browser:")
print(" 1. Open this URL in your browser:")
print(f" \033[94m{issuer}/codex/device\033[0m\n")
print(f" 2. Enter this code:")
print(" 2. Enter this code:")
print(f" \033[94m{user_code}\033[0m\n")
print("Waiting for sign-in... (press Ctrl+C to cancel)")

View file

@ -125,7 +125,7 @@ def _cmd_migrate(args):
print()
print_error(f"OpenClaw directory not found: {source_dir}")
print_info("Make sure your OpenClaw installation is at the expected path.")
print_info(f"You can specify a custom path: hermes claw migrate --source /path/to/.openclaw")
print_info("You can specify a custom path: hermes claw migrate --source /path/to/.openclaw")
return
# Find the migration script
@ -206,7 +206,6 @@ def _print_migration_report(report: dict, dry_run: bool):
skipped = summary.get("skipped", 0)
conflicts = summary.get("conflict", 0)
errors = summary.get("error", 0)
total = migrated + skipped + conflicts + errors
print()
if dry_run:
@ -240,7 +239,7 @@ def _print_migration_report(report: dict, dry_run: bool):
print()
if conflict_items:
print(color(f" ⚠ Conflicts (skipped — use --overwrite to force):", Colors.YELLOW))
print(color(" ⚠ Conflicts (skipped — use --overwrite to force):", Colors.YELLOW))
for item in conflict_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "already exists")
@ -248,7 +247,7 @@ def _print_migration_report(report: dict, dry_run: bool):
print()
if skipped_items:
print(color(f" ─ Skipped:", Colors.DIM))
print(color(" ─ Skipped:", Colors.DIM))
for item in skipped_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "")
@ -256,7 +255,7 @@ def _print_migration_report(report: dict, dry_run: bool):
print()
if error_items:
print(color(f" ✗ Errors:", Colors.RED))
print(color(" ✗ Errors:", Colors.RED))
for item in error_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "unknown error")

View file

@ -705,7 +705,7 @@ def run_doctor(args):
_honcho_cfg_path = resolve_config_path()
if not _honcho_cfg_path.exists():
check_warn("Honcho config not found", f"run: hermes honcho setup")
check_warn("Honcho config not found", "run: hermes honcho setup")
elif not hcfg.enabled:
check_info(f"Honcho disabled (set enabled: true in {_honcho_cfg_path} to activate)")
elif not hcfg.api_key:

View file

@ -1332,9 +1332,9 @@ def _setup_standard_platform(platform: dict):
# Allowlist fields get special handling for the deny-by-default security model
if var.get("is_allowlist"):
print_info(f" The gateway DENIES all users by default for security.")
print_info(f" Enter user IDs to create an allowlist, or leave empty")
print_info(f" and you'll be asked about open access next.")
print_info(" The gateway DENIES all users by default for security.")
print_info(" Enter user IDs to create an allowlist, or leave empty")
print_info(" and you'll be asked about open access next.")
value = prompt(f" {var['prompt']}", password=False)
if value:
cleaned = value.replace(" ", "")
@ -1351,7 +1351,7 @@ def _setup_standard_platform(platform: dict):
parts.append(uid)
cleaned = ",".join(parts)
save_env_value(var["name"], cleaned)
print_success(f" Saved — only these users can interact with the bot.")
print_success(" Saved — only these users can interact with the bot.")
allowed_val_set = cleaned
else:
# No allowlist — ask about open access vs DM pairing
@ -1380,7 +1380,7 @@ def _setup_standard_platform(platform: dict):
print_warning(f" Skipped — {label} won't work without this.")
return
else:
print_info(f" Skipped (can configure later)")
print_info(" Skipped (can configure later)")
# If an allowlist was set and home channel wasn't, offer to reuse
# the first user ID (common for Telegram DMs).
@ -1556,7 +1556,7 @@ def _setup_signal():
print_success("Signal configured!")
print_info(f" URL: {url}")
print_info(f" Account: {account}")
print_info(f" DM auth: via SIGNAL_ALLOWED_USERS + DM pairing")
print_info(" DM auth: via SIGNAL_ALLOWED_USERS + DM pairing")
print_info(f" Groups: {'enabled' if get_env_value('SIGNAL_GROUP_ALLOWED_USERS') else 'disabled'}")

View file

@ -390,7 +390,7 @@ def _session_browse_picker(sessions: list) -> Optional[str]:
return sessions[idx]["id"]
print(f" Invalid selection. Enter 1-{len(sessions)} or q to cancel.")
except ValueError:
print(f" Invalid input. Enter a number or q to cancel.")
print(" Invalid input. Enter a number or q to cancel.")
except (KeyboardInterrupt, EOFError):
print()
return None
@ -2038,8 +2038,8 @@ def _model_flow_api_key_provider(config, provider_id, current_model=""):
else:
model_list = _PROVIDER_MODELS.get(provider_id, [])
if model_list:
print(f" ⚠ Could not auto-detect models from API — showing defaults.")
print(f" Use \"Enter custom model name\" if you don't see your model.")
print(" ⚠ Could not auto-detect models from API — showing defaults.")
print(" Use \"Enter custom model name\" if you don't see your model.")
# else: no defaults either, will fall through to raw input
if model_list:

View file

@ -72,10 +72,10 @@ def _cmd_approve(store, platform: str, code: str):
name = result.get("user_name", "")
display = f"{name} ({uid})" if name else uid
print(f"\n Approved! User {display} on {platform} can now use the bot~")
print(f" They'll be recognized automatically on their next message.\n")
print(" They'll be recognized automatically on their next message.\n")
else:
print(f"\n Code '{code}' not found or expired for platform '{platform}'.")
print(f" Run 'hermes pairing list' to see pending codes.\n")
print(" Run 'hermes pairing list' to see pending codes.\n")
def _cmd_revoke(store, platform: str, user_id: str):

View file

@ -390,7 +390,7 @@ def cmd_list() -> None:
dirs = sorted(d for d in plugins_dir.iterdir() if d.is_dir())
if not dirs:
console.print("[dim]No plugins installed.[/dim]")
console.print(f"[dim]Install with:[/dim] hermes plugins install owner/repo")
console.print("[dim]Install with:[/dim] hermes plugins install owner/repo")
return
table = Table(title="Installed Plugins", show_lines=False)

View file

@ -548,9 +548,9 @@ def _prompt_api_key(var: dict):
if value:
save_env_value(var["name"], value)
print_success(f" ✓ Saved")
print_success(" ✓ Saved")
else:
print_warning(f" Skipped (configure later with 'hermes setup')")
print_warning(" Skipped (configure later with 'hermes setup')")
def _print_setup_summary(config: dict, hermes_home):
@ -725,9 +725,9 @@ def _print_setup_summary(config: dict, hermes_home):
f" {color('hermes config edit', Colors.GREEN)} Open config in your editor"
)
print(f" {color('hermes config set <key> <value>', Colors.GREEN)}")
print(f" Set a specific value")
print(" Set a specific value")
print()
print(f" Or edit the files directly:")
print(" Or edit the files directly:")
print(f" {color(f'nano {get_config_path()}', Colors.DIM)}")
print(f" {color(f'nano {get_env_path()}', Colors.DIM)}")
print()
@ -755,13 +755,13 @@ def _prompt_container_resources(config: dict):
print_info(" Persistent filesystem keeps files between sessions.")
print_info(" Set to 'no' for ephemeral sandboxes that reset each time.")
persist_str = prompt(
f" Persist filesystem across sessions? (yes/no)", persist_label
" Persist filesystem across sessions? (yes/no)", persist_label
)
terminal["container_persistent"] = persist_str.lower() in ("yes", "true", "y", "1")
# CPU
current_cpu = terminal.get("container_cpu", 1)
cpu_str = prompt(f" CPU cores", str(current_cpu))
cpu_str = prompt(" CPU cores", str(current_cpu))
try:
terminal["container_cpu"] = float(cpu_str)
except ValueError:
@ -769,7 +769,7 @@ def _prompt_container_resources(config: dict):
# Memory
current_mem = terminal.get("container_memory", 5120)
mem_str = prompt(f" Memory in MB (5120 = 5GB)", str(current_mem))
mem_str = prompt(" Memory in MB (5120 = 5GB)", str(current_mem))
try:
terminal["container_memory"] = int(mem_str)
except ValueError:
@ -777,7 +777,7 @@ def _prompt_container_resources(config: dict):
# Disk
current_disk = terminal.get("container_disk", 51200)
disk_str = prompt(f" Disk in MB (51200 = 50GB)", str(current_disk))
disk_str = prompt(" Disk in MB (51200 = 50GB)", str(current_disk))
try:
terminal["container_disk"] = int(disk_str)
except ValueError:
@ -3441,9 +3441,9 @@ def _run_quick_setup(config: dict, hermes_home):
value = prompt(f" {var.get('prompt', var['name'])}")
if value:
save_env_value(var["name"], value)
print_success(f" ✓ Saved")
print_success(" ✓ Saved")
else:
print_warning(f" Skipped")
print_warning(" Skipped")
print()
# Handle missing config fields

View file

@ -289,7 +289,7 @@ def show_status(args):
)
is_active = result.stdout.strip() == "active"
print(f" Status: {check_mark(is_active)} {'running' if is_active else 'stopped'}")
print(f" Manager: systemd (user)")
print(" Manager: systemd (user)")
elif sys.platform == 'darwin':
result = subprocess.run(
@ -299,10 +299,10 @@ def show_status(args):
)
is_loaded = result.returncode == 0
print(f" Status: {check_mark(is_loaded)} {'loaded' if is_loaded else 'not loaded'}")
print(f" Manager: launchd")
print(" Manager: launchd")
else:
print(f" Status: {color('N/A', Colors.DIM)}")
print(f" Manager: (not supported on this platform)")
print(" Manager: (not supported on this platform)")
# =========================================================================
# Cron Jobs
@ -320,9 +320,9 @@ def show_status(args):
enabled_jobs = [j for j in jobs if j.get("enabled", True)]
print(f" Jobs: {len(enabled_jobs)} active, {len(jobs)} total")
except Exception:
print(f" Jobs: (error reading jobs file)")
print(" Jobs: (error reading jobs file)")
else:
print(f" Jobs: 0")
print(" Jobs: 0")
# =========================================================================
# Sessions
@ -338,9 +338,9 @@ def show_status(args):
data = json.load(f)
print(f" Active: {len(data)} session(s)")
except Exception:
print(f" Active: (error reading sessions file)")
print(" Active: (error reading sessions file)")
else:
print(f" Active: 0")
print(" Active: 0")
# =========================================================================
# Deep checks

View file

@ -659,7 +659,7 @@ def _configure_tool_category(ts_key: str, cat: dict, config: dict):
# Multiple providers - let user choose
print()
# Use custom title if provided (e.g. "Select Search Provider")
title = cat.get("setup_title", f"Choose a provider")
title = cat.get("setup_title", "Choose a provider")
print(color(f" --- {icon} {name} - {title} ---", Colors.CYAN))
if cat.get("setup_note"):
_print_info(f" {cat['setup_note']}")
@ -768,9 +768,9 @@ def _configure_provider(provider: dict, config: dict):
if value:
save_env_value(var["key"], value)
_print_success(f" Saved")
_print_success(" Saved")
else:
_print_warning(f" Skipped")
_print_warning(" Skipped")
all_configured = False
# Run post-setup hooks if needed
@ -834,9 +834,9 @@ def _configure_simple_requirements(ts_key: str):
value = _prompt(f" {var}", password=True)
if value and value.strip():
save_env_value(var, value.strip())
_print_success(f" Saved")
_print_success(" Saved")
else:
_print_warning(f" Skipped")
_print_warning(" Skipped")
def _reconfigure_tool(config: dict):
@ -924,7 +924,7 @@ def _reconfigure_provider(provider: dict, config: dict):
_print_success(f" Browser cloud provider set to: {bp}")
else:
config.get("browser", {}).pop("cloud_provider", None)
_print_success(f" Browser set to local mode")
_print_success(" Browser set to local mode")
# Set web search backend in config if applicable
if provider.get("web_backend"):
@ -946,9 +946,9 @@ def _reconfigure_provider(provider: dict, config: dict):
value = _prompt(f" {var.get('prompt', var['key'])} (Enter to keep current)", password=not default_val)
if value and value.strip():
save_env_value(var["key"], value.strip())
_print_success(f" Updated")
_print_success(" Updated")
else:
_print_info(f" Kept current")
_print_info(" Kept current")
def _reconfigure_simple_requirements(ts_key: str):
@ -970,9 +970,9 @@ def _reconfigure_simple_requirements(ts_key: str):
value = _prompt(f" {var} (Enter to keep current)", password=True)
if value and value.strip():
save_env_value(var, value.strip())
_print_success(f" Updated")
_print_success(" Updated")
else:
_print_info(f" Kept current")
_print_info(" Kept current")
# ─── Main Entry Point ─────────────────────────────────────────────────────────

View file

@ -273,7 +273,7 @@ def run_uninstall(args):
log_info("No wrapper script found")
# 4. Remove installation directory (code)
log_info(f"Removing installation directory...")
log_info("Removing installation directory...")
# Check if we're running from within the install dir
# We need to be careful here

View file

@ -141,7 +141,7 @@ def cmd_setup(args) -> None:
# Memory mode
current_mode = hermes_host.get("memoryMode") or cfg.get("memoryMode", "hybrid")
print(f"\n Memory mode options:")
print("\n Memory mode options:")
print(" hybrid — write to both Honcho and local MEMORY.md (default)")
print(" honcho — Honcho only, skip MEMORY.md writes")
new_mode = _prompt("Memory mode", default=current_mode)
@ -152,7 +152,7 @@ def cmd_setup(args) -> None:
# Write frequency
current_wf = str(hermes_host.get("writeFrequency") or cfg.get("writeFrequency", "async"))
print(f"\n Write frequency options:")
print("\n Write frequency options:")
print(" async — background thread, no token cost (recommended)")
print(" turn — sync write after every turn")
print(" session — batch write at session end only")
@ -166,7 +166,7 @@ def cmd_setup(args) -> None:
# Recall mode
_raw_recall = hermes_host.get("recallMode") or cfg.get("recallMode", "hybrid")
current_recall = "hybrid" if _raw_recall not in ("hybrid", "context", "tools") else _raw_recall
print(f"\n Recall mode options:")
print("\n Recall mode options:")
print(" hybrid — auto-injected context + Honcho tools available (default)")
print(" context — auto-injected context only, Honcho tools hidden")
print(" tools — Honcho tools only, no auto-injected context")
@ -176,7 +176,7 @@ def cmd_setup(args) -> None:
# Session strategy
current_strat = hermes_host.get("sessionStrategy") or cfg.get("sessionStrategy", "per-directory")
print(f"\n Session strategy options:")
print("\n Session strategy options:")
print(" per-directory — one session per working directory (default)")
print(" per-session — new Honcho session each run, named by Hermes session ID")
print(" per-repo — one session per git repository (uses repo root name)")
@ -203,7 +203,7 @@ def cmd_setup(args) -> None:
print(f"FAILED\n Error: {e}")
return
print(f"\n Honcho is ready.")
print("\n Honcho is ready.")
print(f" Session: {hcfg.resolve_session_name()}")
print(f" Workspace: {hcfg.workspace_id}")
print(f" Peer: {hcfg.peer_name}")
@ -213,17 +213,17 @@ def cmd_setup(args) -> None:
_mode_str = f"{hcfg.memory_mode} (peers: {overrides})"
print(f" Mode: {_mode_str}")
print(f" Frequency: {hcfg.write_frequency}")
print(f"\n Honcho tools available in chat:")
print(f" honcho_context — ask Honcho a question about you (LLM-synthesized)")
print(f" honcho_search — semantic search over your history (no LLM)")
print(f" honcho_profile — your peer card, key facts (no LLM)")
print(f" honcho_conclude — persist a user fact to Honcho memory (no LLM)")
print(f"\n Other commands:")
print(f" hermes honcho status — show full config")
print(f" hermes honcho mode — show or change memory mode")
print(f" hermes honcho tokens — show or set token budgets")
print(f" hermes honcho identity — seed or show AI peer identity")
print(f" hermes honcho map <name> — map this directory to a session name\n")
print("\n Honcho tools available in chat:")
print(" honcho_context — ask Honcho a question about you (LLM-synthesized)")
print(" honcho_search — semantic search over your history (no LLM)")
print(" honcho_profile — your peer card, key facts (no LLM)")
print(" honcho_conclude — persist a user fact to Honcho memory (no LLM)")
print("\n Other commands:")
print(" hermes honcho status — show full config")
print(" hermes honcho mode — show or change memory mode")
print(" hermes honcho tokens — show or set token budgets")
print(" hermes honcho identity — seed or show AI peer identity")
print(" hermes honcho map <name> — map this directory to a session name\n")
def cmd_status(args) -> None:
@ -253,7 +253,7 @@ def cmd_status(args) -> None:
api_key = hcfg.api_key or ""
masked = f"...{api_key[-8:]}" if len(api_key) > 8 else ("set" if api_key else "not set")
print(f"\nHoncho status\n" + "" * 40)
print("\nHoncho status\n" + "" * 40)
print(f" Enabled: {hcfg.enabled}")
print(f" API key: {masked}")
print(f" Workspace: {hcfg.workspace_id}")
@ -265,7 +265,7 @@ def cmd_status(args) -> None:
print(f" Recall mode: {hcfg.recall_mode}")
print(f" Memory mode: {hcfg.memory_mode}")
if hcfg.peer_memory_modes:
print(f" Per-peer modes:")
print(" Per-peer modes:")
for peer, mode in hcfg.peer_memory_modes.items():
print(f" {peer}: {mode}")
print(f" Write freq: {hcfg.write_frequency}")
@ -345,12 +345,12 @@ def cmd_peer(args) -> None:
ai = hermes.get('aiPeer') or cfg.get('aiPeer') or HOST
lvl = hermes.get("dialecticReasoningLevel") or cfg.get("dialecticReasoningLevel") or "low"
max_chars = hermes.get("dialecticMaxChars") or cfg.get("dialecticMaxChars") or 600
print(f"\nHoncho peers\n" + "" * 40)
print("\nHoncho peers\n" + "" * 40)
print(f" User peer: {user}")
print(f" Your identity in Honcho. Messages you send build this peer's card.")
print(" Your identity in Honcho. Messages you send build this peer's card.")
print(f" AI peer: {ai}")
print(f" Hermes' identity in Honcho. Seed with 'hermes honcho identity <file>'.")
print(f" Dialectic calls ask this peer questions to warm session context.")
print(" Hermes' identity in Honcho. Seed with 'hermes honcho identity <file>'.")
print(" Dialectic calls ask this peer questions to warm session context.")
print()
print(f" Dialectic reasoning: {lvl} ({', '.join(REASONING_LEVELS)})")
print(f" Dialectic cap: {max_chars} chars\n")
@ -394,11 +394,11 @@ def cmd_mode(args) -> None:
or cfg.get("memoryMode")
or "hybrid"
)
print(f"\nHoncho memory mode\n" + "" * 40)
print("\nHoncho memory mode\n" + "" * 40)
for m, desc in MODES.items():
marker = "" if m == current else ""
print(f" {m:<8} {desc}{marker}")
print(f"\n Set with: hermes honcho mode [hybrid|honcho]\n")
print("\n Set with: hermes honcho mode [hybrid|honcho]\n")
return
if mode_arg not in MODES:
@ -423,18 +423,18 @@ def cmd_tokens(args) -> None:
ctx_tokens = hermes.get("contextTokens") or cfg.get("contextTokens") or "(Honcho default)"
d_chars = hermes.get("dialecticMaxChars") or cfg.get("dialecticMaxChars") or 600
d_level = hermes.get("dialecticReasoningLevel") or cfg.get("dialecticReasoningLevel") or "low"
print(f"\nHoncho budgets\n" + "" * 40)
print("\nHoncho budgets\n" + "" * 40)
print()
print(f" Context {ctx_tokens} tokens")
print(f" Raw memory retrieval. Honcho returns stored facts/history about")
print(f" the user and session, injected directly into the system prompt.")
print(" Raw memory retrieval. Honcho returns stored facts/history about")
print(" the user and session, injected directly into the system prompt.")
print()
print(f" Dialectic {d_chars} chars, reasoning: {d_level}")
print(f" AI-to-AI inference. Hermes asks Honcho's AI peer a question")
print(f" (e.g. \"what were we working on?\") and Honcho runs its own model")
print(f" to synthesize an answer. Used for first-turn session continuity.")
print(f" Level controls how much reasoning Honcho spends on the answer.")
print(f"\n Set with: hermes honcho tokens [--context N] [--dialectic N]\n")
print(" AI-to-AI inference. Hermes asks Honcho's AI peer a question")
print(" (e.g. \"what were we working on?\") and Honcho runs its own model")
print(" to synthesize an answer. Used for first-turn session continuity.")
print(" Level controls how much reasoning Honcho spends on the answer.")
print("\n Set with: hermes honcho tokens [--context N] [--dialectic N]\n")
return
changed = False
@ -523,7 +523,7 @@ def cmd_identity(args) -> None:
print(f" Seeded AI peer identity from {p.name} into session '{session_key}'")
print(f" Honcho will incorporate this into {hcfg.ai_peer}'s representation over time.\n")
else:
print(f" Failed to seed identity. Check logs for details.\n")
print(" Failed to seed identity. Check logs for details.\n")
def cmd_migrate(args) -> None:
@ -623,7 +623,7 @@ def cmd_migrate(args) -> None:
print()
print(" If you want to migrate them now without starting a session:")
for f in user_files:
print(f" hermes honcho migrate — this step handles it interactively")
print(" hermes honcho migrate — this step handles it interactively")
if has_key:
answer = _prompt(" Upload user memory files to Honcho now?", default="y")
if answer.lower() in ("y", "yes"):

View file

@ -217,7 +217,7 @@ class MiniSWERunner:
# Tool definition
self.tools = [TERMINAL_TOOL_DEFINITION]
print(f"🤖 Mini-SWE Runner initialized")
print("🤖 Mini-SWE Runner initialized")
print(f" Model: {self.model}")
print(f" Environment: {self.env_type}")
if self.env_type != "local":
@ -233,7 +233,7 @@ class MiniSWERunner:
cwd=self.cwd,
timeout=self.command_timeout
)
print(f"✅ Environment ready")
print("✅ Environment ready")
def _cleanup_env(self):
"""Cleanup the execution environment."""
@ -365,7 +365,7 @@ class MiniSWERunner:
except (json.JSONDecodeError, AttributeError):
pass
tool_response = f"<tool_response>\n"
tool_response = "<tool_response>\n"
tool_response += json.dumps({
"tool_call_id": tool_msg.get("tool_call_id", ""),
"name": msg["tool_calls"][len(tool_responses)]["function"]["name"] \
@ -505,7 +505,7 @@ Complete the user's task step by step."""
# Check for task completion signal
if "MINI_SWE_AGENT_FINAL_OUTPUT" in result["output"]:
print(f" ✅ Task completion signal detected!")
print(" ✅ Task completion signal detected!")
completed = True
# Add tool response
@ -530,7 +530,7 @@ Complete the user's task step by step."""
"content": final_response
})
completed = True
print(f"🎉 Agent finished (no more tool calls)")
print("🎉 Agent finished (no more tool calls)")
break
if api_call_count >= self.max_iterations:

View file

@ -524,7 +524,7 @@ class AIAgent:
# Pre-warm OpenRouter model metadata cache in a background thread.
# fetch_model_metadata() is cached for 1 hour; this avoids a blocking
# HTTP request on the first API response when pricing is estimated.
if self.provider == "openrouter" or "openrouter" in self._base_url_lower:
if self.provider == "openrouter" or self._is_openrouter_url():
threading.Thread(
target=lambda: fetch_model_metadata(),
daemon=True,
@ -574,7 +574,7 @@ class AIAgent:
# Anthropic prompt caching: auto-enabled for Claude models via OpenRouter.
# Reduces input costs by ~75% on multi-turn conversations by caching the
# conversation prefix. Uses system_and_3 strategy (4 breakpoints).
is_openrouter = "openrouter" in self._base_url_lower
is_openrouter = self._is_openrouter_url()
is_claude = "claude" in self.model.lower()
is_native_anthropic = self.api_mode == "anthropic_messages"
self._use_prompt_caching = (is_openrouter and is_claude) or is_native_anthropic
@ -694,6 +694,7 @@ class AIAgent:
# raw_codex=True because the main agent needs direct responses.stream()
# access for Codex Responses API streaming.
self._anthropic_client = None
self._is_anthropic_oauth = False
if self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client, resolve_anthropic_token
@ -1178,6 +1179,14 @@ class AIAgent:
url = (base_url or self._base_url_lower).lower()
return "api.openai.com" in url and "openrouter" not in url
def _is_openrouter_url(self) -> bool:
"""Return True when the base URL targets OpenRouter."""
return "openrouter" in self._base_url_lower
def _is_anthropic_url(self) -> bool:
"""Return True when the base URL targets Anthropic (native or /anthropic proxy path)."""
return "api.anthropic.com" in self._base_url_lower or self._base_url_lower.rstrip("/").endswith("/anthropic")
def _max_tokens_param(self, value: int) -> dict:
"""Return the correct max tokens kwarg for the current provider.
@ -1731,7 +1740,7 @@ class AIAgent:
while j < len(messages) and messages[j]["role"] == "tool":
tool_msg = messages[j]
# Format tool response with XML tags
tool_response = f"<tool_response>\n"
tool_response = "<tool_response>\n"
# Try to parse tool content as JSON if it looks like JSON
tool_content = tool_msg["content"]
@ -2064,7 +2073,7 @@ class AIAgent:
except Exception as e:
logger.debug("Failed to propagate interrupt to child agent: %s", e)
if not self.quiet_mode:
print(f"\n⚡ Interrupt requested" + (f": '{message[:40]}...'" if message and len(message) > 40 else f": '{message}'" if message else ""))
print("\n⚡ Interrupt requested" + (f": '{message[:40]}...'" if message and len(message) > 40 else f": '{message}'" if message else ""))
def clear_interrupt(self) -> None:
"""Clear any pending interrupt request and the global tool interrupt signal."""
@ -4257,7 +4266,7 @@ class AIAgent:
tools=self.tools,
max_tokens=self.max_tokens,
reasoning_config=self.reasoning_config,
is_oauth=getattr(self, "_is_anthropic_oauth", False),
is_oauth=self._is_anthropic_oauth,
preserve_dots=self._anthropic_preserve_dots(),
)
@ -4378,7 +4387,7 @@ class AIAgent:
extra_body = {}
_is_openrouter = "openrouter" in self._base_url_lower
_is_openrouter = self._is_openrouter_url()
_is_github_models = (
"models.github.ai" in self._base_url_lower
or "api.githubcopilot.com" in self._base_url_lower
@ -4747,7 +4756,7 @@ class AIAgent:
tool_calls = assistant_msg.tool_calls
elif self.api_mode == "anthropic_messages" and not _aux_available:
from agent.anthropic_adapter import normalize_anthropic_response as _nar_flush
_flush_msg, _ = _nar_flush(response, strip_tool_prefix=getattr(self, '_is_anthropic_oauth', False))
_flush_msg, _ = _nar_flush(response, strip_tool_prefix=self._is_anthropic_oauth)
if _flush_msg and _flush_msg.tool_calls:
tool_calls = _flush_msg.tool_calls
elif hasattr(response, "choices") and response.choices:
@ -5548,10 +5557,10 @@ class AIAgent:
from agent.anthropic_adapter import build_anthropic_kwargs as _bak, normalize_anthropic_response as _nar
_ant_kw = _bak(model=self.model, messages=api_messages, tools=None,
max_tokens=self.max_tokens, reasoning_config=self.reasoning_config,
is_oauth=getattr(self, '_is_anthropic_oauth', False),
is_oauth=self._is_anthropic_oauth,
preserve_dots=self._anthropic_preserve_dots())
summary_response = self._anthropic_messages_create(_ant_kw)
_msg, _ = _nar(summary_response, strip_tool_prefix=getattr(self, '_is_anthropic_oauth', False))
_msg, _ = _nar(summary_response, strip_tool_prefix=self._is_anthropic_oauth)
final_response = (_msg.content or "").strip()
else:
summary_response = self._ensure_primary_openai_client(reason="iteration_limit_summary").chat.completions.create(**summary_kwargs)
@ -5579,11 +5588,11 @@ class AIAgent:
elif self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_kwargs as _bak2, normalize_anthropic_response as _nar2
_ant_kw2 = _bak2(model=self.model, messages=api_messages, tools=None,
is_oauth=getattr(self, '_is_anthropic_oauth', False),
is_oauth=self._is_anthropic_oauth,
max_tokens=self.max_tokens, reasoning_config=self.reasoning_config,
preserve_dots=self._anthropic_preserve_dots())
retry_response = self._anthropic_messages_create(_ant_kw2)
_retry_msg, _ = _nar2(retry_response, strip_tool_prefix=getattr(self, '_is_anthropic_oauth', False))
_retry_msg, _ = _nar2(retry_response, strip_tool_prefix=self._is_anthropic_oauth)
final_response = (_retry_msg.content or "").strip()
else:
summary_kwargs = {
@ -5845,7 +5854,7 @@ class AIAgent:
if self._interrupt_requested:
interrupted = True
if not self.quiet_mode:
self._safe_print(f"\n⚡ Breaking out of tool loop due to interrupt...")
self._safe_print("\n⚡ Breaking out of tool loop due to interrupt...")
break
api_call_count += 1
@ -6074,7 +6083,7 @@ class AIAgent:
if response_invalid:
# Stop spinner before printing error messages
if thinking_spinner:
thinking_spinner.stop(f"(´;ω;`) oops, retrying...")
thinking_spinner.stop("(´;ω;`) oops, retrying...")
thinking_spinner = None
if self.thinking_callback:
self.thinking_callback("")
@ -6356,7 +6365,7 @@ class AIAgent:
except Exception as api_error:
# Stop spinner before printing error messages
if thinking_spinner:
thinking_spinner.stop(f"(╥_╥) error, retrying...")
thinking_spinner.stop("(╥_╥) error, retrying...")
thinking_spinner = None
if self.thinking_callback:
self.thinking_callback("")
@ -6764,7 +6773,7 @@ class AIAgent:
elif self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import normalize_anthropic_response
assistant_message, finish_reason = normalize_anthropic_response(
response, strip_tool_prefix=getattr(self, "_is_anthropic_oauth", False)
response, strip_tool_prefix=self._is_anthropic_oauth
)
else:
assistant_message = response.choices[0].message
@ -6952,7 +6961,7 @@ class AIAgent:
if tc.function.name not in self.valid_tool_names:
content = f"Tool '{tc.function.name}' does not exist. Available tools: {available}"
else:
content = f"Skipped: another tool call in this turn used an invalid name. Please retry this tool call."
content = "Skipped: another tool call in this turn used an invalid name. Please retry this tool call."
messages.append({
"role": "tool",
"tool_call_id": tc.id,
@ -7547,20 +7556,20 @@ def main(
toolset = get_toolset_for_tool(tool_name)
print(f" 📌 {tool_name} (from {toolset})")
print(f"\n💡 Usage Examples:")
print(f" # Use predefined toolsets")
print(f" python run_agent.py --enabled_toolsets=research --query='search for Python news'")
print(f" python run_agent.py --enabled_toolsets=development --query='debug this code'")
print(f" python run_agent.py --enabled_toolsets=safe --query='analyze without terminal'")
print(f" ")
print(f" # Combine multiple toolsets")
print(f" python run_agent.py --enabled_toolsets=web,vision --query='analyze website'")
print(f" ")
print(f" # Disable toolsets")
print(f" python run_agent.py --disabled_toolsets=terminal --query='no command execution'")
print(f" ")
print(f" # Run with trajectory saving enabled")
print(f" python run_agent.py --save_trajectories --query='your question here'")
print("\n💡 Usage Examples:")
print(" # Use predefined toolsets")
print(" python run_agent.py --enabled_toolsets=research --query='search for Python news'")
print(" python run_agent.py --enabled_toolsets=development --query='debug this code'")
print(" python run_agent.py --enabled_toolsets=safe --query='analyze without terminal'")
print(" ")
print(" # Combine multiple toolsets")
print(" python run_agent.py --enabled_toolsets=web,vision --query='analyze website'")
print(" ")
print(" # Disable toolsets")
print(" python run_agent.py --disabled_toolsets=terminal --query='no command execution'")
print(" ")
print(" # Run with trajectory saving enabled")
print(" python run_agent.py --save_trajectories --query='your question here'")
return
# Parse toolset selection arguments
@ -7576,9 +7585,9 @@ def main(
print(f"🚫 Disabled toolsets: {disabled_toolsets_list}")
if save_trajectories:
print(f"💾 Trajectory saving: ENABLED")
print(f" - Successful conversations → trajectory_samples.jsonl")
print(f" - Failed conversations → failed_trajectories.jsonl")
print("💾 Trajectory saving: ENABLED")
print(" - Successful conversations → trajectory_samples.jsonl")
print(" - Failed conversations → failed_trajectories.jsonl")
# Initialize agent with provided parameters
try:
@ -7620,7 +7629,7 @@ def main(
print(f"💬 Messages: {len(result['messages'])}")
if result['final_response']:
print(f"\n🎯 FINAL RESPONSE:")
print("\n🎯 FINAL RESPONSE:")
print("-" * 30)
print(result['final_response'])

View file

@ -1073,7 +1073,7 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
_run_browser_command(effective_task_id, "open", ["about:blank"], timeout=10)
return json.dumps({
"success": False,
"error": f"Blocked: redirect landed on a private/internal address",
"error": "Blocked: redirect landed on a private/internal address",
})
response = {

View file

@ -505,7 +505,7 @@ class CheckpointManager:
# Get the hash of the commit at the cutoff point
ok, cutoff_hash, _ = _run_git(
["rev-list", "--reverse", "HEAD", "--skip=0",
f"--max-count=1"],
"--max-count=1"],
shadow_repo, working_dir,
)
@ -542,7 +542,7 @@ def format_checkpoint_list(checkpoints: List[Dict], directory: str) -> str:
lines.append(f" {i}. {cp['short_hash']} {ts} {cp['reason']}{stat}")
lines.append(f"\n /rollback <N> restore to checkpoint N")
lines.append(f" /rollback diff <N> preview changes since checkpoint N")
lines.append(f" /rollback <N> <file> restore a single file from checkpoint N")
lines.append("\n /rollback <N> restore to checkpoint N")
lines.append(" /rollback diff <N> preview changes since checkpoint N")
lines.append(" /rollback <N> <file> restore a single file from checkpoint N")
return "\n".join(lines)

View file

@ -187,11 +187,6 @@ def _strategy_indentation_flexible(content: str, pattern: str) -> List[Tuple[int
Strips all leading whitespace from lines before matching.
"""
def strip_indent(s):
return '\n'.join(line.lstrip() for line in s.split('\n'))
pattern_stripped = strip_indent(pattern)
content_lines = content.split('\n')
content_stripped_lines = [line.lstrip() for line in content_lines]
pattern_lines = [line.lstrip() for line in pattern.split('\n')]

View file

@ -156,7 +156,7 @@ async def _redirect_to_browser(auth_url: str) -> None:
try:
if _can_open_browser():
webbrowser.open(auth_url)
print(f" Opened browser for authorization...")
print(" Opened browser for authorization...")
else:
print(f"\n Open this URL to authorize:\n {auth_url}\n")
except Exception:

View file

@ -466,7 +466,7 @@ if __name__ == "__main__":
# Show current configuration
config = get_moa_configuration()
print(f"\n⚙️ Current Configuration:")
print("\n⚙️ Current Configuration:")
print(f" 🤖 Reference models ({len(config['reference_models'])}): {', '.join(config['reference_models'])}")
print(f" 🧠 Aggregator model: {config['aggregator_model']}")
print(f" 🌡️ Reference temperature: {config['reference_temperature']}")
@ -506,7 +506,7 @@ if __name__ == "__main__":
print(f" - Optimized temperatures: {REFERENCE_TEMPERATURE} for reference models, {AGGREGATOR_TEMPERATURE} for aggregation")
print(" - Token-efficient: only returns final aggregated response")
print(" - Resilient: continues with partial model failures")
print(f" - Configurable: easy to modify models and settings at top of file")
print(" - Configurable: easy to modify models and settings at top of file")
print(" - State-of-the-art results on challenging benchmarks")
print("\nDebug mode:")

View file

@ -456,7 +456,7 @@ async def _monitor_training_run(run_state: RunState):
if run_state.api_process and run_state.api_process.poll() is not None:
run_state.status = "failed"
run_state.error_message = f"API server exited unexpectedly"
run_state.error_message = "API server exited unexpectedly"
_stop_training_run(run_state)
break
@ -1233,11 +1233,11 @@ async def rl_test_inference(
print(f"\n ❌ Error: {model_results['error']}")
# Print last few lines of stderr for debugging
if stderr_lines:
print(f" Last errors:")
print(" Last errors:")
for line in stderr_lines[-5:]:
print(f" {line}")
else:
print(f"\n ✅ Process completed successfully")
print("\n ✅ Process completed successfully")
print(f" Output file: {output_file}")
print(f" File exists: {output_file.exists()}")
@ -1272,7 +1272,7 @@ async def rl_test_inference(
except asyncio.TimeoutError:
model_results["error"] = "Process timed out after 10 minutes"
print(f" Timeout!")
print(" Timeout!")
except Exception as e:
model_results["error"] = str(e)
print(f" Error: {e}")

View file

@ -2021,8 +2021,8 @@ class LobeHubSource(SkillSource):
"metadata:",
" hermes:",
f" tags: [{', '.join(str(t) for t in tag_list)}]",
f" lobehub:",
f" source: lobehub",
" lobehub:",
" source: lobehub",
"---",
]

View file

@ -1252,7 +1252,7 @@ if __name__ == "__main__":
print("=" * 50)
config = _get_env_config()
print(f"\nCurrent Configuration:")
print("\nCurrent Configuration:")
print(f" Environment type: {config['env_type']}")
print(f" Docker image: {config['docker_image']}")
print(f" Modal image: {config['modal_image']}")

View file

@ -634,7 +634,7 @@ def check_command_security(command: str) -> dict:
logger.warning("tirith timed out after %ds", timeout)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"}
return {"action": "block", "findings": [], "summary": f"tirith timed out (fail-closed)"}
return {"action": "block", "findings": [], "summary": "tirith timed out (fail-closed)"}
# Map exit code to action
exit_code = result.returncode

View file

@ -797,7 +797,7 @@ if __name__ == "__main__":
except ImportError:
return False
print(f"\nProvider availability:")
print("\nProvider availability:")
print(f" Edge TTS: {'installed' if _check(_import_edge_tts, 'edge') else 'not installed (pip install edge-tts)'}")
print(f" ElevenLabs: {'installed' if _check(_import_elevenlabs, 'el') else 'not installed (pip install elevenlabs)'}")
print(f" API Key: {'set' if os.getenv('ELEVENLABS_API_KEY') else 'not set'}")

View file

@ -1665,7 +1665,7 @@ if __name__ == "__main__":
print(" # - Final processed results")
print(" # Logs saved to: ./logs/web_tools_debug_UUID.json")
print(f"\n📝 Run 'python test_web_tools_llm.py' to test LLM processing capabilities")
print("\n📝 Run 'python test_web_tools_llm.py' to test LLM processing capabilities")
# ---------------------------------------------------------------------------

View file

@ -315,7 +315,7 @@ def print_distribution_info(distribution_name: str) -> None:
print(f"\n📊 Distribution: {distribution_name}")
print(f" Description: {dist['description']}")
print(f" Toolsets:")
print(" Toolsets:")
for toolset, prob in sorted(dist["toolsets"].items(), key=lambda x: x[1], reverse=True):
print(f"{toolset:15} : {prob:3}% chance")

View file

@ -558,7 +558,7 @@ if __name__ == "__main__":
print("\nMultiple Toolset Resolution:")
print("-" * 40)
combined = resolve_multiple_toolsets(["web", "vision", "terminal"])
print(f" Combining ['web', 'vision', 'terminal']:")
print(" Combining ['web', 'vision', 'terminal']:")
print(f" Result: {', '.join(sorted(combined))}")
print("\nCustom Toolset Creation:")
@ -570,6 +570,6 @@ if __name__ == "__main__":
includes=["terminal", "vision"]
)
custom_info = get_toolset_info("my_custom")
print(f" Created 'my_custom' toolset:")
print(" Created 'my_custom' toolset:")
print(f" Description: {custom_info['description']}")
print(f" Resolved tools: {', '.join(custom_info['resolved_tools'])}")