chore: ruff auto-fixes — collapsible-else-if, if-stmt-min-max, dict.fromkeys (#23926)

PLR5501 (collapsible-else-if): 28 instances — else: if: → elif:
PLR1730 (if-stmt-min-max):   15 instances — if x<y: x=y → x=max(x,y)
C420   (dict.fromkeys):       2 instances — dictcomp → dict.fromkeys
PLR1704 (redefined-argument): 1 instance — reason → err_msg (shadow fix)
C414   (unnecessary-list):    1 instance — sorted(list(x)) → sorted(x)

28 files, -44 net lines. All mechanical, zero logic changes.
17,211 tests pass, zero regressions.
This commit is contained in:
kshitij 2026-05-11 11:03:29 -07:00 committed by GitHub
parent 8e2eb4b511
commit 657874460f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 223 additions and 267 deletions

View file

@ -1316,8 +1316,7 @@ The user has requested that this compaction PRIORITISE preserving all informatio
# Ensure we protect at least min_tail messages
fallback_cut = n - min_tail
if cut_idx > fallback_cut:
cut_idx = fallback_cut
cut_idx = min(cut_idx, fallback_cut)
# If the token budget would protect everything (small conversations),
# force a cut after the head so compression can still remove middle turns.

55
cli.py
View file

@ -7264,20 +7264,19 @@ class HermesCLI:
_cprint(f" {format_session_db_unavailable()}")
else:
_cprint(" Usage: /title <your session title>")
else:
# Show current title and session ID if no argument given
if self._session_db:
_cprint(f" Session ID: {self.session_id}")
session = self._session_db.get_session(self.session_id)
if session and session.get("title"):
_cprint(f" Title: {session['title']}")
elif self._pending_title:
_cprint(f" Title (pending): {self._pending_title}")
else:
_cprint(" No title set. Usage: /title <your session title>")
# Show current title and session ID if no argument given
elif self._session_db:
_cprint(f" Session ID: {self.session_id}")
session = self._session_db.get_session(self.session_id)
if session and session.get("title"):
_cprint(f" Title: {session['title']}")
elif self._pending_title:
_cprint(f" Title (pending): {self._pending_title}")
else:
from hermes_state import format_session_db_unavailable
_cprint(f" {format_session_db_unavailable()}")
_cprint(" No title set. Usage: /title <your session title>")
else:
from hermes_state import format_session_db_unavailable
_cprint(f" {format_session_db_unavailable()}")
elif canonical == "handoff":
if not self._handle_handoff_command(cmd_original):
return False
@ -11586,16 +11585,15 @@ class HermesCLI:
self._last_ctrl_c_time = now
print("\n⚡ Interrupting agent... (press Ctrl+C again to force exit)")
self.agent.interrupt()
# If there's text or images, clear them (like bash).
# If everything is already empty, exit.
elif event.app.current_buffer.text or self._attached_images:
event.app.current_buffer.reset()
self._attached_images.clear()
event.app.invalidate()
else:
# If there's text or images, clear them (like bash).
# If everything is already empty, exit.
if event.app.current_buffer.text or self._attached_images:
event.app.current_buffer.reset()
self._attached_images.clear()
event.app.invalidate()
else:
self._should_exit = True
event.app.exit()
self._should_exit = True
event.app.exit()
# Ctrl+Shift+C: no binding needed. Terminal emulators (GNOME Terminal,
# iTerm2, kitty, Windows Terminal, etc.) intercept Ctrl+Shift+C before
@ -11680,14 +11678,13 @@ class HermesCLI:
if self._agent_running and self.agent:
print("\n⚡ Interrupting agent...")
self.agent.interrupt()
elif event.app.current_buffer.text or self._attached_images:
event.app.current_buffer.reset()
self._attached_images.clear()
event.app.invalidate()
else:
if event.app.current_buffer.text or self._attached_images:
event.app.current_buffer.reset()
self._attached_images.clear()
event.app.invalidate()
else:
self._should_exit = True
event.app.exit()
self._should_exit = True
event.app.exit()
@kb.add('c-d')
def handle_ctrl_d(event):

View file

@ -1082,9 +1082,8 @@ def rewrite_skill_refs(
new_skills.append(target)
elif name in pruned_set:
dropped.append(name)
else:
if name not in new_skills:
new_skills.append(name)
elif name not in new_skills:
new_skills.append(name)
if not mapped and not dropped:
continue

View file

@ -610,8 +610,7 @@ class GatewayConfig:
try:
session_store_max_age_days = int(data.get("session_store_max_age_days", 90))
if session_store_max_age_days < 0:
session_store_max_age_days = 0
session_store_max_age_days = max(session_store_max_age_days, 0)
except (TypeError, ValueError):
session_store_max_age_days = 90

View file

@ -246,7 +246,7 @@ class ThreadParticipationTracker:
thread_list = list(self._threads)
if len(thread_list) > self._max_tracked:
thread_list = thread_list[-self._max_tracked:]
self._threads = {thread_id: None for thread_id in thread_list}
self._threads = dict.fromkeys(thread_list)
atomic_json_write(path, thread_list, indent=None)
def mark(self, thread_id: str) -> None:

View file

@ -592,8 +592,7 @@ async def _run_with_concurrency(
concurrency: int,
) -> None:
"""Run a list of thunks with a bounded number in flight at once."""
if concurrency < 1:
concurrency = 1
concurrency = max(concurrency, 1)
sem = asyncio.Semaphore(concurrency)
async def _wrap(thunk: Callable[[], Awaitable[None]]) -> None:

View file

@ -268,9 +268,8 @@ def _build_replay_entry(role: str, content: Any, msg: Dict[str, Any]) -> Dict[st
# Preserve empty-string sentinel for thinking-mode replay.
if _rval is None:
continue
else:
if not _rval:
continue
elif not _rval:
continue
entry[_rkey] = _rval
return entry
@ -4503,8 +4502,7 @@ class GatewayRunner:
return
interval = float(kanban_cfg.get("dispatch_interval_seconds", 60) or 60)
if interval < 1.0:
interval = 1.0 # sanity floor — tighter than this is a footgun
interval = max(interval, 1.0) # sanity floor — tighter than this is a footgun
# Read max_spawn config to limit concurrent kanban tasks
max_spawn = kanban_cfg.get("max_spawn", None)
@ -4756,34 +4754,33 @@ class GatewayRunner:
await build_channel_directory(self.adapters)
except Exception:
pass
# Check if the failure is non-retryable
elif adapter.has_fatal_error and not adapter.fatal_error_retryable:
self._update_platform_runtime_status(
platform.value,
platform_state="fatal",
error_code=adapter.fatal_error_code,
error_message=adapter.fatal_error_message,
)
logger.warning(
"Reconnect %s: non-retryable error (%s), removing from retry queue",
platform.value, adapter.fatal_error_message,
)
del self._failed_platforms[platform]
else:
# Check if the failure is non-retryable
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
self._update_platform_runtime_status(
platform.value,
platform_state="fatal",
error_code=adapter.fatal_error_code,
error_message=adapter.fatal_error_message,
)
logger.warning(
"Reconnect %s: non-retryable error (%s), removing from retry queue",
platform.value, adapter.fatal_error_message,
)
del self._failed_platforms[platform]
else:
self._update_platform_runtime_status(
platform.value,
platform_state="retrying",
error_code=adapter.fatal_error_code,
error_message=adapter.fatal_error_message or "failed to reconnect",
)
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
info["attempts"] = attempt
info["next_retry"] = time.monotonic() + backoff
logger.info(
"Reconnect %s failed, next retry in %ds",
platform.value, backoff,
)
self._update_platform_runtime_status(
platform.value,
platform_state="retrying",
error_code=adapter.fatal_error_code,
error_message=adapter.fatal_error_message or "failed to reconnect",
)
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
info["attempts"] = attempt
info["next_retry"] = time.monotonic() + backoff
logger.info(
"Reconnect %s failed, next retry in %ds",
platform.value, backoff,
)
except Exception as e:
self._update_platform_runtime_status(
platform.value,
@ -12699,11 +12696,10 @@ class GatewayRunner:
msg = f"✅ Hermes update finished.\n\n```\n{output}\n```"
else:
msg = f"❌ Hermes update failed.\n\n```\n{output}\n```"
elif exit_code == 0:
msg = "✅ Hermes update finished successfully."
else:
if exit_code == 0:
msg = "✅ Hermes update finished successfully."
else:
msg = "❌ Hermes update failed. Check the gateway logs or run `hermes update` manually for details."
msg = "❌ Hermes update failed. Check the gateway logs or run `hermes update` manually for details."
await adapter.send(chat_id, msg, metadata=metadata)
logger.info(
"Sent post-update notification to %s:%s (exit=%s)",

View file

@ -442,22 +442,21 @@ def _parse_systemd_duration_to_us(raw: str) -> Optional[int]:
digits += ch
elif ch.isalpha():
token += ch
else:
if digits and token:
multiplier = units.get(token.lower())
if multiplier is None:
return None
try:
total_us += int(float(digits) * multiplier)
except ValueError:
return None
digits = ""
token = ""
elif digits and not token:
# Bare number = seconds (rare but valid)
try:
total_us += int(float(digits) * 1_000_000)
except ValueError:
return None
digits = ""
elif digits and token:
multiplier = units.get(token.lower())
if multiplier is None:
return None
try:
total_us += int(float(digits) * multiplier)
except ValueError:
return None
digits = ""
token = ""
elif digits and not token:
# Bare number = seconds (rare but valid)
try:
total_us += int(float(digits) * 1_000_000)
except ValueError:
return None
digits = ""
return total_us if total_us > 0 else None

View file

@ -802,8 +802,7 @@ def _prune_pre_update_backups(backup_dir: Path, keep: int) -> int:
Operators who genuinely don't want a backup should set
``updates.pre_update_backup: false`` in config that gates creation.
"""
if keep < 1:
keep = 1
keep = max(keep, 1)
if not backup_dir.exists():
return 0
@ -875,8 +874,7 @@ def _prune_pre_migration_backups(backup_dir: Path, keep: int) -> int:
Only touches files matching ``pre-migration-*.zip`` so other backups in
the same directory are never touched.
"""
if keep < 0:
keep = 0
keep = max(keep, 0)
if not backup_dir.exists():
return 0

View file

@ -670,17 +670,16 @@ def _cmd_cleanup(args):
elif not auto_yes and not sys.stdin.isatty():
print_info(f"Non-interactive session — would archive: {source_dir}")
print_info("To execute, re-run with: hermes claw cleanup --yes")
elif auto_yes or prompt_yes_no(f"Archive {source_dir}?", default=True):
try:
archive_path = _archive_directory(source_dir)
print_success(f"Archived: {source_dir}{archive_path}")
total_archived += 1
except OSError as e:
print_error(f"Could not archive: {e}")
print_info(f"Try manually: mv {source_dir} {source_dir}.pre-migration")
else:
if auto_yes or prompt_yes_no(f"Archive {source_dir}?", default=True):
try:
archive_path = _archive_directory(source_dir)
print_success(f"Archived: {source_dir}{archive_path}")
total_archived += 1
except OSError as e:
print_error(f"Could not archive: {e}")
print_info(f"Try manually: mv {source_dir} {source_dir}.pre-migration")
else:
print_info("Skipped.")
print_info("Skipped.")
# Summary
print()

View file

@ -811,7 +811,7 @@ def discord_skill_commands_by_category(
# names are marked with a sentinel so the warning distinguishes
# "skill collided with a reserved command" from "two skills collided
# on the 32-char clamp" — the latter is the rename-worthy case.
_names_used: dict[str, str] = {n: "<reserved>" for n in reserved_names}
_names_used: dict[str, str] = dict.fromkeys(reserved_names, "<reserved>")
hidden = 0
try:

View file

@ -729,13 +729,12 @@ def run_doctor(args):
hermes_home = HERMES_HOME
if hermes_home.exists():
check_ok(f"{_DHH} directory exists")
elif should_fix:
hermes_home.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH} directory")
fixed_count += 1
else:
if should_fix:
hermes_home.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH} directory")
fixed_count += 1
else:
check_warn(f"{_DHH} not found", "(will be created on first use)")
check_warn(f"{_DHH} not found", "(will be created on first use)")
# Check expected subdirectories
expected_subdirs = ["cron", "sessions", "logs", "skills", "memories"]
@ -743,13 +742,12 @@ def run_doctor(args):
subdir_path = hermes_home / subdir_name
if subdir_path.exists():
check_ok(f"{_DHH}/{subdir_name}/ exists")
elif should_fix:
subdir_path.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH}/{subdir_name}/")
fixed_count += 1
else:
if should_fix:
subdir_path.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH}/{subdir_name}/")
fixed_count += 1
else:
check_warn(f"{_DHH}/{subdir_name}/ not found", "(will be created on first use)")
check_warn(f"{_DHH}/{subdir_name}/ not found", "(will be created on first use)")
# Check for SOUL.md persona file
soul_path = hermes_home / "SOUL.md"
@ -955,14 +953,12 @@ def run_doctor(args):
else:
check_fail("docker not found", "(required for TERMINAL_ENV=docker)")
issues.append("Install Docker or change TERMINAL_ENV")
elif _safe_which("docker"):
check_ok("docker", "(optional)")
elif _is_termux():
check_info("Docker backend is not available inside Termux (expected on Android)")
else:
if _safe_which("docker"):
check_ok("docker", "(optional)")
else:
if _is_termux():
check_info("Docker backend is not available inside Termux (expected on Android)")
else:
check_warn("docker not found", "(optional)")
check_warn("docker not found", "(optional)")
# SSH (if using ssh backend)
if terminal_env == "ssh":
@ -1058,15 +1054,14 @@ def run_doctor(args):
elif shutil.which("agent-browser"):
check_ok("agent-browser", "(browser automation)")
agent_browser_ok = True
elif _is_termux():
check_info("agent-browser is not installed (expected in the tested Termux path)")
check_info("Install it manually later with: npm install -g agent-browser && agent-browser install")
check_info("Termux browser setup:")
for step in _termux_browser_setup_steps(node_installed=True):
check_info(step)
else:
if _is_termux():
check_info("agent-browser is not installed (expected in the tested Termux path)")
check_info("Install it manually later with: npm install -g agent-browser && agent-browser install")
check_info("Termux browser setup:")
for step in _termux_browser_setup_steps(node_installed=True):
check_info(step)
else:
check_warn("agent-browser not installed", "(run: npm install)")
check_warn("agent-browser not installed", "(run: npm install)")
# Chromium presence — the browser tools silently fail to register when
# agent-browser is found but no Playwright-managed Chromium is on disk
@ -1117,15 +1112,14 @@ def run_doctor(args):
f"Install with: cd {PROJECT_ROOT} && "
"npx playwright install --with-deps chromium"
)
elif _is_termux():
check_info("Node.js not found (browser tools are optional in the tested Termux path)")
check_info("Install Node.js on Termux with: pkg install nodejs")
check_info("Termux browser setup:")
for step in _termux_browser_setup_steps(node_installed=False):
check_info(step)
else:
if _is_termux():
check_info("Node.js not found (browser tools are optional in the tested Termux path)")
check_info("Install Node.js on Termux with: pkg install nodejs")
check_info("Termux browser setup:")
for step in _termux_browser_setup_steps(node_installed=False):
check_info(step)
else:
check_warn("Node.js not found", "(optional, needed for browser tools)")
check_warn("Node.js not found", "(optional, needed for browser tools)")
# npm audit for all Node.js packages
_npm_bin = _safe_which("npm")

View file

@ -4947,15 +4947,14 @@ def gateway_setup():
print_info(" Run in foreground: hermes gateway run")
print_info(" For persistence: tmux new -s hermes 'hermes gateway run'")
print_info(" To enable systemd: add systemd=true to /etc/wsl.conf, then 'wsl --shutdown'")
elif is_termux():
from hermes_constants import display_hermes_home as _dhh
print_info(" Termux does not use systemd/launchd services.")
print_info(" Run in foreground: hermes gateway run")
print_info(f" Or start it manually in the background (best effort): nohup hermes gateway run >{_dhh()}/logs/gateway.log 2>&1 &")
else:
if is_termux():
from hermes_constants import display_hermes_home as _dhh
print_info(" Termux does not use systemd/launchd services.")
print_info(" Run in foreground: hermes gateway run")
print_info(f" Or start it manually in the background (best effort): nohup hermes gateway run >{_dhh()}/logs/gateway.log 2>&1 &")
else:
print_info(" Service install not supported on this platform.")
print_info(" Run in foreground: hermes gateway run")
print_info(" Service install not supported on this platform.")
print_info(" Run in foreground: hermes gateway run")
else:
print()
print_info("No platforms configured. Run 'hermes gateway setup' when ready.")

View file

@ -2096,19 +2096,18 @@ def _cmd_specify(args: argparse.Namespace) -> int:
"reason": outcome.reason,
"new_title": outcome.new_title,
}))
elif outcome.ok:
title_suffix = (
f" — retitled: {outcome.new_title!r}"
if outcome.new_title
else ""
)
print(f"Specified {outcome.task_id} → todo{title_suffix}")
else:
if outcome.ok:
title_suffix = (
f" — retitled: {outcome.new_title!r}"
if outcome.new_title
else ""
)
print(f"Specified {outcome.task_id} → todo{title_suffix}")
else:
print(
f"kanban: specify {outcome.task_id}: {outcome.reason}",
file=sys.stderr,
)
print(
f"kanban: specify {outcome.task_id}: {outcome.reason}",
file=sys.stderr,
)
if not all_flag:
return 0 if ok_count == 1 else 1
# --all: succeed if at least one promotion landed; exit 1 only when

View file

@ -195,8 +195,7 @@ def _latest_clean_event_ts(events: Iterable[Any]) -> int:
for ev in events:
if _event_kind(ev) in ("completed", "edited"):
t = _event_ts(ev)
if t > latest:
latest = t
latest = max(latest, t)
return latest
@ -534,8 +533,7 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
for ev in events:
if _event_kind(ev) == "blocked":
t = _event_ts(ev)
if t > last_blocked_ts:
last_blocked_ts = t
last_blocked_ts = max(last_blocked_ts, t)
if last_blocked_ts == 0:
return []
age_hours = (now - last_blocked_ts) / 3600.0
@ -626,8 +624,7 @@ def _rule_stranded_in_ready(task, events, runs, now, cfg) -> list[Diagnostic]:
for ev in events:
if _event_kind(ev) in READY_TRANSITION_KINDS:
t = _event_ts(ev)
if t > last_ready_ts:
last_ready_ts = t
last_ready_ts = max(last_ready_ts, t)
# Fallback: if no qualifying event exists (very old task or events
# truncated), fall back to ``created_at`` on the task row. Better

View file

@ -505,8 +505,7 @@ def _session_browse_picker(sessions: list) -> Optional[str]:
# Compute visible area
visible_rows = max_y - 4 # header + col header + blank + footer
if visible_rows < 1:
visible_rows = 1
visible_rows = max(visible_rows, 1)
# Clamp cursor and scroll
if not filtered:
@ -518,8 +517,7 @@ def _session_browse_picker(sessions: list) -> Optional[str]:
else:
if cursor >= len(filtered):
cursor = len(filtered) - 1
if cursor < 0:
cursor = 0
cursor = max(cursor, 0)
if cursor < scroll_offset:
scroll_offset = cursor
elif cursor >= scroll_offset + visible_rows:
@ -5963,8 +5961,8 @@ def _kill_stale_dashboard_processes(
for pid in killed:
print(f" ✓ stopped PID {pid}")
for pid, reason in failed:
print(f" ✗ failed to stop PID {pid}: {reason}")
for pid, err_msg in failed:
print(f" ✗ failed to stop PID {pid}: {err_msg}")
if killed:
print(" Restart the dashboard when you're ready:")

View file

@ -1428,10 +1428,9 @@ def _toggle_plugin_toolset(name: str, *, enable: bool) -> None:
if toolset_key not in ts_list:
ts_list.append(toolset_key)
changed = True
else:
if toolset_key in ts_list:
ts_list.remove(toolset_key)
changed = True
elif toolset_key in ts_list:
ts_list.remove(toolset_key)
changed = True
# If enabling and no platforms have toolset lists yet, add to "cli" at minimum
if enable and not changed and not platform_toolsets:

View file

@ -1355,14 +1355,13 @@ def setup_terminal_backend(config: dict):
existing_sudo = get_env_value("SUDO_PASSWORD")
if existing_sudo:
print_info("Sudo password: configured")
else:
if prompt_yes_no(
"Enable sudo support? (stores password for apt install, etc.)", False
):
sudo_pass = prompt(" Sudo password", password=True)
if sudo_pass:
save_env_value("SUDO_PASSWORD", sudo_pass)
print_success("Sudo password saved")
elif prompt_yes_no(
"Enable sudo support? (stores password for apt install, etc.)", False
):
sudo_pass = prompt(" Sudo password", password=True)
if sudo_pass:
save_env_value("SUDO_PASSWORD", sudo_pass)
print_success("Sudo password saved")
elif selected_backend == "docker":
print_success("Terminal backend: Docker")

View file

@ -1190,14 +1190,13 @@ def _denormalize_config_from_web(config: Dict[str, Any]) -> Dict[str, Any]:
else:
disk_model.pop("context_length", None)
config["model"] = disk_model
else:
# Model was previously a bare string — upgrade to dict if
# user is setting a context_length override
if ctx_override > 0:
config["model"] = {
"default": model_val,
"context_length": ctx_override,
}
# Model was previously a bare string — upgrade to dict if
# user is setting a context_length override
elif ctx_override > 0:
config["model"] = {
"default": model_val,
"context_length": ctx_override,
}
except Exception:
pass # can't read disk config — just use the string form
return config

View file

@ -353,9 +353,8 @@ def _compute_tool_definitions(
tools_to_include.update(legacy_tools)
if not quiet_mode:
print(f"✅ Enabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
else:
if not quiet_mode:
print(f"⚠️ Unknown toolset: {toolset_name}")
elif not quiet_mode:
print(f"⚠️ Unknown toolset: {toolset_name}")
else:
# Default: start with everything
from toolsets import get_all_toolsets
@ -378,9 +377,8 @@ def _compute_tool_definitions(
tools_to_include.difference_update(legacy_tools)
if not quiet_mode:
print(f"🚫 Disabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
else:
if not quiet_mode:
print(f"⚠️ Unknown toolset: {toolset_name}")
elif not quiet_mode:
print(f"⚠️ Unknown toolset: {toolset_name}")
# Plugin-registered tools are now resolved through the normal toolset
# path — validate_toolset() / resolve_toolset() / get_all_toolsets()

View file

@ -1433,19 +1433,18 @@ class AIAgent:
if self.verbose_logging:
setup_verbose_logging()
logger.info("Verbose logging enabled (third-party library logs suppressed)")
else:
if self.quiet_mode:
# In quiet mode (CLI default), keep console output clean —
# but DO NOT raise per-logger levels. Doing so prevents the
# root logger's file handlers (agent.log, errors.log) from
# ever seeing the records, because Python checks
# logger.isEnabledFor() before handler propagation. We rely
# on the fact that hermes_logging.setup_logging() does not
# install a console StreamHandler in quiet mode — so INFO
# records flow to the file handlers but never reach a
# console. Any future noise reduction belongs at the
# handler level inside hermes_logging.py, not here.
pass
elif self.quiet_mode:
# In quiet mode (CLI default), keep console output clean —
# but DO NOT raise per-logger levels. Doing so prevents the
# root logger's file handlers (agent.log, errors.log) from
# ever seeing the records, because Python checks
# logger.isEnabledFor() before handler propagation. We rely
# on the fact that hermes_logging.setup_logging() does not
# install a console StreamHandler in quiet mode — so INFO
# records flow to the file handlers but never reach a
# console. Any future noise reduction belongs at the
# handler level inside hermes_logging.py, not here.
pass
# Internal stream callback (set during streaming TTS).
# Initialized here so _vprint can reference it before run_conversation.
@ -2011,8 +2010,7 @@ class AIAgent:
try:
_raw_api_retries = _agent_section.get("api_max_retries", 3)
_api_retries = int(_raw_api_retries)
if _api_retries < 1:
_api_retries = 1 # 1 = no retry (single attempt)
_api_retries = max(_api_retries, 1) # 1 = no retry (single attempt)
except (TypeError, ValueError):
_api_retries = 3
self._api_max_retries = _api_retries
@ -7728,24 +7726,23 @@ class AIAgent:
_fire_first_delta()
self._fire_stream_delta(delta.content)
deltas_were_sent["yes"] = True
else:
# Tool calls suppress regular content streaming (avoids
# displaying chatty "I'll use the tool..." text alongside
# tool calls). But reasoning tags embedded in suppressed
# content should still reach the display — otherwise the
# reasoning box only appears as a post-response fallback,
# rendering it confusingly after the already-streamed
# response. Route suppressed content through the stream
# delta callback so its tag extraction can fire the
# reasoning display. Non-reasoning text is harmlessly
# suppressed by the CLI's _stream_delta when the stream
# box is already closed (tool boundary flush).
if self.stream_delta_callback:
try:
self.stream_delta_callback(delta.content)
self._record_streamed_assistant_text(delta.content)
except Exception:
pass
# Tool calls suppress regular content streaming (avoids
# displaying chatty "I'll use the tool..." text alongside
# tool calls). But reasoning tags embedded in suppressed
# content should still reach the display — otherwise the
# reasoning box only appears as a post-response fallback,
# rendering it confusingly after the already-streamed
# response. Route suppressed content through the stream
# delta callback so its tag extraction can fire the
# reasoning display. Non-reasoning text is harmlessly
# suppressed by the CLI's _stream_delta when the stream
# box is already closed (tool boundary flush).
elif self.stream_delta_callback:
try:
self.stream_delta_callback(delta.content)
self._record_streamed_assistant_text(delta.content)
except Exception:
pass
# Accumulate tool call deltas — notify display on first name
if delta and delta.tool_calls:
@ -10820,12 +10817,11 @@ class AIAgent:
# Tool blocked by plugin or guardrail policy — skip counters,
# callbacks, checkpointing, activity mutation, and real execution.
pass
else:
# Reset nudge counters when the relevant tool is actually used
if function_name == "memory":
self._turns_since_memory = 0
elif function_name == "skill_manage":
self._iters_since_skill = 0
# Reset nudge counters when the relevant tool is actually used
elif function_name == "memory":
self._turns_since_memory = 0
elif function_name == "skill_manage":
self._iters_since_skill = 0
if not self.quiet_mode:
args_str = json.dumps(function_args, ensure_ascii=False)

View file

@ -1312,8 +1312,7 @@ def prune_checkpoints(
for p in child.rglob("*"):
try:
mt = p.stat().st_mtime
if mt > newest:
newest = mt
newest = max(newest, mt)
except OSError:
continue
except OSError:
@ -1455,8 +1454,7 @@ def prune_checkpoints(
size_after = _dir_size_bytes(base)
delta = size_before - size_after
if delta > result["bytes_freed"]:
result["bytes_freed"] = delta
result["bytes_freed"] = max(result["bytes_freed"], delta)
return result

View file

@ -327,9 +327,8 @@ def cronjob(
"the script is the job.",
success=False,
)
else:
if not prompt and not canonical_skills:
return tool_error("create requires either prompt or at least one skill", success=False)
elif not prompt and not canonical_skills:
return tool_error("create requires either prompt or at least one skill", success=False)
if prompt:
scan_error = _scan_cron_prompt(prompt)
if scan_error:

View file

@ -1239,7 +1239,7 @@ def _dump_subagent_timeout_diagnostic(
if tool_names:
_w(f" loaded tool count: {len(tool_names)}")
try:
_w(f" loaded tools: {sorted(list(tool_names))}")
_w(f" loaded tools: {sorted(tool_names)}")
except Exception:
pass
_w("")

View file

@ -505,8 +505,7 @@ def _calculate_line_positions(content_lines: List[str], start_line: int,
"""
start_pos = sum(len(line) + 1 for line in content_lines[:start_line])
end_pos = sum(len(line) + 1 for line in content_lines[:end_line]) - 1
if end_pos >= content_length:
end_pos = content_length
end_pos = min(content_length, end_pos)
return start_pos, end_pos

View file

@ -466,13 +466,12 @@ def _shell_quote_context(command_template: str, position: int) -> Optional[str]:
escaped = True
elif char == '"':
quote = None
else:
if char == "'":
quote = "'"
elif char == '"':
quote = '"'
elif char == "\\":
i += 1
elif char == "'":
quote = "'"
elif char == '"':
quote = '"'
elif char == "\\":
i += 1
i += 1
return quote

View file

@ -456,8 +456,7 @@ class AudioRecorder:
# Compute RMS for level display and silence detection
rms = int(np.sqrt(np.mean(indata.astype(np.float64) ** 2)))
self._current_rms = rms
if rms > self._peak_rms:
self._peak_rms = rms
self._peak_rms = max(self._peak_rms, rms)
# Silence detection
if self._on_silence_stop is not None:

View file

@ -2130,15 +2130,14 @@ if __name__ == "__main__":
print(" Using Brave Search free tier (search only)")
elif backend == "ddgs":
print(" Using DuckDuckGo via ddgs package (search only)")
elif firecrawl_url_available:
print(f" Using self-hosted Firecrawl: {os.getenv('FIRECRAWL_API_URL').strip().rstrip('/')}")
elif firecrawl_key_available:
print(" Using direct Firecrawl cloud API")
elif tool_gateway_available:
print(f" Using Firecrawl tool-gateway: {_get_firecrawl_gateway_url()}")
else:
if firecrawl_url_available:
print(f" Using self-hosted Firecrawl: {os.getenv('FIRECRAWL_API_URL').strip().rstrip('/')}")
elif firecrawl_key_available:
print(" Using direct Firecrawl cloud API")
elif tool_gateway_available:
print(f" Using Firecrawl tool-gateway: {_get_firecrawl_gateway_url()}")
else:
print(" Firecrawl backend selected but not configured")
print(" Firecrawl backend selected but not configured")
else:
print("❌ No web search backend configured")
print(