chore: ruff auto-fix PLR6201 — tuple → set in membership tests (#23937)

Replace  with  for all literal-tuple
membership tests. Set lookup is O(1) vs O(n) for tuple — consistent
micro-optimization across the codebase.

608 instances fixed via `ruff --fix --unsafe-fixes`, 0 remaining.
133 files, +626/-626 (net zero).
This commit is contained in:
kshitij 2026-05-11 11:13:25 -07:00 committed by GitHub
parent 8c11710314
commit 2ec8d2b42f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
133 changed files with 626 additions and 626 deletions

View file

@ -1450,7 +1450,7 @@ def resolve_provider(
# whose availability isn't implied by LM_API_KEY presence (it may be
# offline, and the no-auth setup uses a placeholder value), so it
# also requires explicit selection.
if pid in ("copilot", "lmstudio"):
if pid in {"copilot", "lmstudio"}:
continue
for env_var in pconfig.api_key_env_vars:
if has_usable_secret(os.getenv(env_var, "")):
@ -2541,7 +2541,7 @@ def refresh_codex_oauth_pure(
# A 401/403 from the token endpoint always means the refresh token
# is invalid/expired — force relogin even if the body error code
# wasn't one of the known strings above.
if response.status_code in (401, 403) and not relogin_required:
if response.status_code in {401, 403} and not relogin_required:
relogin_required = True
raise AuthError(
message,
@ -2947,7 +2947,7 @@ def _merge_shared_nous_oauth_state(state: Dict[str, Any]) -> bool:
"expires_at",
):
value = shared.get(key)
if value not in (None, ""):
if value not in {None, ""}:
state[key] = value
return True
@ -3986,7 +3986,7 @@ def get_api_key_provider_status(provider_id: str) -> Dict[str, Any]:
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip()
if provider_id in ("kimi-coding", "kimi-coding-cn"):
if provider_id in {"kimi-coding", "kimi-coding-cn"}:
base_url = _resolve_kimi_base_url(api_key, pconfig.inference_base_url, env_url)
elif env_url:
base_url = env_url
@ -4090,7 +4090,7 @@ def resolve_api_key_provider_credentials(provider_id: str) -> Dict[str, Any]:
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip()
if provider_id in ("kimi-coding", "kimi-coding-cn"):
if provider_id in {"kimi-coding", "kimi-coding-cn"}:
base_url = _resolve_kimi_base_url(api_key, pconfig.inference_base_url, env_url)
elif provider_id == "zai":
base_url = _resolve_zai_base_url(api_key, pconfig.inference_base_url, env_url)
@ -4510,7 +4510,7 @@ def _login_openai_codex(
reuse = input("Use existing credentials? [Y/n]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
reuse = "y"
if reuse in ("", "y", "yes"):
if reuse in {"", "y", "yes"}:
config_path = _update_config_for_provider("openai-codex", existing.get("base_url", DEFAULT_CODEX_BASE_URL))
print()
print("Login successful!")
@ -4531,7 +4531,7 @@ def _login_openai_codex(
do_import = input("Import these credentials? (a separate login is recommended) [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
do_import = "n"
if do_import in ("y", "yes"):
if do_import in {"y", "yes"}:
_save_codex_tokens(cli_tokens)
base_url = os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/") or DEFAULT_CODEX_BASE_URL
config_path = _update_config_for_provider("openai-codex", base_url)
@ -4623,7 +4623,7 @@ def _codex_device_code_login() -> Dict[str, Any]:
if poll_resp.status_code == 200:
code_resp = poll_resp.json()
break
elif poll_resp.status_code in (403, 404):
elif poll_resp.status_code in {403, 404}:
continue # User hasn't completed login yet
else:
raise AuthError(
@ -5188,7 +5188,7 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
do_import = input("Import these credentials? [Y/n]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
do_import = "y"
if do_import in ("", "y", "yes"):
if do_import in {"", "y", "yes"}:
print("Rehydrating Nous session from shared credentials...")
auth_state = _try_import_shared_nous_state(
timeout_seconds=timeout_seconds,

View file

@ -266,7 +266,7 @@ def auth_add_command(args) -> None:
do_import = input("Import these credentials? [Y/n]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
do_import = "y"
if do_import in ("", "y", "yes"):
if do_import in {"", "y", "yes"}:
print("Rehydrating Nous session from shared credentials...")
rehydrated = auth_mod._try_import_shared_nous_state(
timeout_seconds=getattr(args, "timeout", None) or 15.0,

View file

@ -298,7 +298,7 @@ def _detect_prefix(zf: zipfile.ZipFile) -> str:
if len(first_parts) == 1:
prefix = first_parts.pop()
# Only strip if it looks like a hermes dir name
if prefix in (".hermes", "hermes"):
if prefix in {".hermes", "hermes"}:
return prefix + "/"
return ""
@ -349,7 +349,7 @@ def run_import(args) -> None:
except (EOFError, KeyboardInterrupt):
print("\nAborted.")
sys.exit(1)
if answer not in ("y", "yes"):
if answer not in {"y", "yes"}:
print("Aborted.")
return

View file

@ -139,7 +139,7 @@ def _confirm(prompt: str) -> bool:
except (EOFError, KeyboardInterrupt):
print()
return False
return resp in ("y", "yes")
return resp in {"y", "yes"}
def cmd_clear(args: argparse.Namespace) -> int:

View file

@ -298,7 +298,7 @@ def claw_command(args):
if action == "migrate":
_cmd_migrate(args)
elif action in ("cleanup", "clean"):
elif action in {"cleanup", "clean"}:
_cmd_cleanup(args)
else:
print("Usage: hermes claw <command> [options]")

View file

@ -101,7 +101,7 @@ def _fetch_models_from_api(access_token: str) -> List[str]:
# Some valid Codex CLI models (for example gpt-5.3-codex-spark) are
# marked false here but are still accepted by the Codex route.
visibility = item.get("visibility", "")
if isinstance(visibility, str) and visibility.strip().lower() in ("hide", "hidden"):
if isinstance(visibility, str) and visibility.strip().lower() in {"hide", "hidden"}:
continue
priority = item.get("priority")
rank = int(priority) if isinstance(priority, (int, float)) else 10_000
@ -152,7 +152,7 @@ def _read_cache_models(codex_home: Path) -> List[str]:
# public OpenAI API, while Hermes openai-codex talks to the same
# OAuth-backed Codex backend as Codex CLI.
visibility = item.get("visibility")
if isinstance(visibility, str) and visibility.strip().lower() in ("hide", "hidden"):
if isinstance(visibility, str) and visibility.strip().lower() in {"hide", "hidden"}:
continue
priority = item.get("priority")
rank = int(priority) if isinstance(priority, (int, float)) else 10_000

View file

@ -3202,7 +3202,7 @@ def warn_deprecated_cwd_env_vars(config: Optional[Dict[str, Any]] = None) -> Non
terminal_cfg = config.get("terminal", {})
config_cwd = terminal_cfg.get("cwd", ".") if isinstance(terminal_cfg, dict) else "."
# Only warn if config.yaml doesn't have an explicit path
config_has_explicit_cwd = config_cwd not in (".", "auto", "cwd", "")
config_has_explicit_cwd = config_cwd not in {".", "auto", "cwd", ""}
lines: list[str] = []
if messaging_cwd:
@ -3262,10 +3262,10 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
if "tool_progress" not in display:
old_enabled = get_env_value("HERMES_TOOL_PROGRESS")
old_mode = get_env_value("HERMES_TOOL_PROGRESS_MODE")
if old_enabled and old_enabled.lower() in ("false", "0", "no"):
if old_enabled and old_enabled.lower() in {"false", "0", "no"}:
display["tool_progress"] = "off"
results["config_added"].append("display.tool_progress=off (from HERMES_TOOL_PROGRESS=false)")
elif old_mode and old_mode.lower() in ("new", "all"):
elif old_mode and old_mode.lower() in {"new", "all"}:
display["tool_progress"] = old_mode.lower()
results["config_added"].append(f"display.tool_progress={old_mode.lower()} (from HERMES_TOOL_PROGRESS_MODE)")
else:
@ -3344,7 +3344,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
new_entry = {"api": old_url}
if old_name:
new_entry["name"] = old_name
if old_key and old_key not in ("no-key", "no-key-required", ""):
if old_key and old_key not in {"no-key", "no-key-required", ""}:
new_entry["api_key"] = old_key
# Carry over model and api_mode if present
@ -3402,7 +3402,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
stt.pop("model", None)
# Place it in the appropriate provider section only if the
# user didn't already set a model there
if provider in ("local", "local_command"):
if provider in {"local", "local_command"}:
# Don't migrate an OpenAI model name into the local section
_local_models = {
"tiny.en", "tiny", "base.en", "base", "small.en", "small",
@ -3486,7 +3486,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
if not aux_comp.get("model"):
aux_comp["model"] = str(s_model).strip()
migrated_keys.append(f"model={s_model}")
if s_provider and str(s_provider).strip() not in ("", "auto"):
if s_provider and str(s_provider).strip() not in {"", "auto"}:
aux = config.setdefault("auxiliary", {})
aux_comp = aux.setdefault("compression", {})
if not aux_comp.get("provider") or aux_comp.get("provider") == "auto":
@ -3717,7 +3717,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
except (EOFError, KeyboardInterrupt):
answer = "n"
if answer in ("y", "yes"):
if answer in {"y", "yes"}:
print()
for name, info in new_and_unset:
if info.get("url"):
@ -3778,7 +3778,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
except (EOFError, KeyboardInterrupt):
answer = "n"
if answer in ("y", "yes"):
if answer in {"y", "yes"}:
print()
config = load_config()
try:
@ -4860,9 +4860,9 @@ def set_config_value(key: str, value: str):
# inline navigation here silently overwrote lists with dicts.
# Convert value to appropriate type
if value.lower() in ('true', 'yes', 'on'):
if value.lower() in {'true', 'yes', 'on'}:
value = True
elif value.lower() in ('false', 'no', 'off'):
elif value.lower() in {'false', 'no', 'off'}:
value = False
elif value.isdigit():
value = int(value)
@ -5067,7 +5067,7 @@ def _inject_profile_env_vars() -> None:
try:
from providers import list_providers
for _pp in list_providers():
if _pp.auth_type not in ("api_key",):
if _pp.auth_type not in {"api_key",}:
continue
for _var in _pp.env_vars:
if _var in OPTIONAL_ENV_VARS:

View file

@ -128,7 +128,7 @@ def _try_gh_cli_token() -> Optional[str]:
# Build a clean env so gh doesn't short-circuit on GITHUB_TOKEN / GH_TOKEN
clean_env = {k: v for k, v in os.environ.items()
if k not in ("GITHUB_TOKEN", "GH_TOKEN")}
if k not in {"GITHUB_TOKEN", "GH_TOKEN"}}
for gh_path in _gh_cli_candidates():
cmd = [gh_path, "auth", "token"]

View file

@ -347,7 +347,7 @@ def _cmd_prune(args) -> int:
except (EOFError, KeyboardInterrupt):
print("\ncurator: aborted")
return 1
if reply not in ("y", "yes"):
if reply not in {"y", "yes"}:
print("curator: aborted")
return 1
@ -449,7 +449,7 @@ def _cmd_rollback(args) -> int:
except (EOFError, KeyboardInterrupt):
print("\ncancelled")
return 1
if ans not in ("y", "yes"):
if ans not in {"y", "yes"}:
print("cancelled")
return 1

View file

@ -139,16 +139,16 @@ def curses_checklist(
stdscr.refresh()
key = stdscr.getch()
if key in (curses.KEY_UP, ord("k")):
if key in {curses.KEY_UP, ord("k")}:
cursor = (cursor - 1) % len(items)
elif key in (curses.KEY_DOWN, ord("j")):
elif key in {curses.KEY_DOWN, ord("j")}:
cursor = (cursor + 1) % len(items)
elif key == ord(" "):
chosen.symmetric_difference_update({cursor})
elif key in (curses.KEY_ENTER, 10, 13):
elif key in {curses.KEY_ENTER, 10, 13}:
result_holder[0] = set(chosen)
return
elif key in (27, ord("q")):
elif key in {27, ord("q")}:
result_holder[0] = cancel_returns
return
@ -265,14 +265,14 @@ def curses_radiolist(
stdscr.refresh()
key = stdscr.getch()
if key in (curses.KEY_UP, ord("k")):
if key in {curses.KEY_UP, ord("k")}:
cursor = (cursor - 1) % len(items)
elif key in (curses.KEY_DOWN, ord("j")):
elif key in {curses.KEY_DOWN, ord("j")}:
cursor = (cursor + 1) % len(items)
elif key in (ord(" "), curses.KEY_ENTER, 10, 13):
elif key in {ord(" "), curses.KEY_ENTER, 10, 13}:
result_holder[0] = cursor
return
elif key in (27, ord("q")):
elif key in {27, ord("q")}:
result_holder[0] = cancel_returns
return
@ -388,14 +388,14 @@ def curses_single_select(
stdscr.refresh()
key = stdscr.getch()
if key in (curses.KEY_UP, ord("k")):
if key in {curses.KEY_UP, ord("k")}:
cursor = (cursor - 1) % len(all_items)
elif key in (curses.KEY_DOWN, ord("j")):
elif key in {curses.KEY_DOWN, ord("j")}:
cursor = (cursor + 1) % len(all_items)
elif key in (curses.KEY_ENTER, 10, 13):
elif key in {curses.KEY_ENTER, 10, 13}:
result_holder[0] = cursor
return
elif key in (27, ord("q")):
elif key in {27, ord("q")}:
result_holder[0] = None
return

View file

@ -93,7 +93,7 @@ def poll_registration(device_code: str) -> dict:
"""
data = _api_post("/app/registration/poll", {"device_code": device_code})
status_raw = str(data.get("status", "")).strip().upper()
if status_raw not in ("WAITING", "SUCCESS", "FAIL", "EXPIRED"):
if status_raw not in {"WAITING", "SUCCESS", "FAIL", "EXPIRED"}:
status_raw = "UNKNOWN"
return {
"status": status_raw,

View file

@ -473,7 +473,7 @@ def run_doctor(args):
if (
provider
and _resolve_auth_provider is not None
and provider not in ("auto", "custom")
and provider not in {"auto", "custom"}
):
try:
runtime_provider = _resolve_auth_provider(provider)
@ -485,7 +485,7 @@ def run_doctor(args):
if (
provider
and _resolve_provider_full is not None
and provider not in ("auto", "custom")
and provider not in {"auto", "custom"}
):
provider_def = _resolve_provider_full(provider, user_providers, custom_providers)
catalog_provider = provider_def.id if provider_def is not None else None
@ -542,7 +542,7 @@ def run_doctor(args):
# own env-var checks elsewhere in doctor, and get_auth_status()
# returns a bare {logged_in: False} for anything it doesn't
# explicitly dispatch, which would produce false positives.
if runtime_provider and runtime_provider not in ("auto", "custom", "openrouter"):
if runtime_provider and runtime_provider not in {"auto", "custom", "openrouter"}:
try:
from hermes_cli.auth import PROVIDER_REGISTRY, get_auth_status
pconfig = PROVIDER_REGISTRY.get(runtime_provider)
@ -1010,7 +1010,7 @@ def run_doctor(args):
issues.append(f"Set TERMINAL_VERCEL_RUNTIME to one of: {supported}")
disk = os.getenv("TERMINAL_CONTAINER_DISK", "51200").strip()
if disk in ("", "0", "51200"):
if disk in {"", "0", "51200"}:
check_ok("Vercel disk setting", "(uses platform default)")
else:
check_fail("Vercel custom disk unsupported", "(reset terminal.container_disk to 51200)")
@ -1036,7 +1036,7 @@ def run_doctor(args):
for line in auth_status.detail_lines:
check_info(f"Vercel auth {line}")
persistent = os.getenv("TERMINAL_CONTAINER_PERSISTENT", "true").lower() in ("1", "true", "yes", "on")
persistent = os.getenv("TERMINAL_CONTAINER_PERSISTENT", "true").lower() in {"1", "true", "yes", "on"}
if persistent:
check_info("Vercel persistence: snapshot filesystem only; live processes do not survive sandbox recreation")
else:

View file

@ -307,7 +307,7 @@ def cmd_fallback_clear(args) -> None: # noqa: ARG001
print()
print(" Cancelled.")
return
if resp not in ("y", "yes"):
if resp not in {"y", "yes"}:
print(" Cancelled — no change.")
return
@ -347,11 +347,11 @@ def _numbered_pick(question: str, choices: List[str]) -> Optional[int]:
def cmd_fallback(args) -> None:
"""Top-level dispatcher for ``hermes fallback [subcommand]``."""
sub = getattr(args, "fallback_command", None)
if sub in (None, "", "list", "ls"):
if sub in {None, "", "list", "ls"}:
cmd_fallback_list(args)
elif sub == "add":
cmd_fallback_add(args)
elif sub in ("remove", "rm"):
elif sub in {"remove", "rm"}:
cmd_fallback_remove(args)
elif sub == "clear":
cmd_fallback_clear(args)

View file

@ -1194,7 +1194,7 @@ def _systemd_operational(system: bool = False) -> bool:
)
# "running", "degraded", "starting" all mean systemd is PID 1
status = result.stdout.strip().lower()
return status in ("running", "degraded", "starting", "initializing")
return status in {"running", "degraded", "starting", "initializing"}
except (RuntimeError, subprocess.TimeoutExpired, OSError):
return False
@ -2915,7 +2915,7 @@ def launchd_start():
try:
subprocess.run(["launchctl", "kickstart", f"{_launchd_domain()}/{label}"], check=True, timeout=30)
except subprocess.CalledProcessError as e:
if e.returncode not in (3, 113):
if e.returncode not in {3, 113}:
raise
print("↻ launchd job was unloaded; reloading service definition")
subprocess.run(["launchctl", "bootstrap", _launchd_domain(), str(plist_path)], check=True, timeout=30)
@ -2939,7 +2939,7 @@ def launchd_stop():
try:
subprocess.run(["launchctl", "bootout", target], check=True, timeout=90)
except subprocess.CalledProcessError as e:
if e.returncode in (3, 113):
if e.returncode in {3, 113}:
pass # Already unloaded — nothing to stop.
else:
raise
@ -3011,7 +3011,7 @@ def launchd_restart():
subprocess.run(["launchctl", "kickstart", "-k", target], check=True, timeout=90)
print("✓ Service restarted")
except subprocess.CalledProcessError as e:
if e.returncode not in (3, 113):
if e.returncode not in {3, 113}:
raise
# Job not loaded — bootstrap and start fresh
print("↻ launchd job was unloaded; reloading")
@ -3749,7 +3749,7 @@ def _platform_status(platform: dict) -> str:
password = get_env_value("MATRIX_PASSWORD")
if (val or password) and homeserver:
e2ee = get_env_value("MATRIX_ENCRYPTION")
suffix = " + E2EE" if e2ee and e2ee.lower() in ("true", "1", "yes") else ""
suffix = " + E2EE" if e2ee and e2ee.lower() in {"true", "1", "yes"} else ""
return f"configured{suffix}"
if val or password or homeserver:
return "partially configured"

View file

@ -270,7 +270,7 @@ def _parse_judge_response(raw: str) -> Tuple[bool, str, bool]:
done_val = data.get("done")
if isinstance(done_val, str):
done = done_val.strip().lower() in ("true", "yes", "1", "done")
done = done_val.strip().lower() in {"true", "yes", "1", "done"}
else:
done = bool(done_val)
reason = str(data.get("reason") or "").strip()
@ -389,11 +389,11 @@ class GoalManager:
return self._state is not None and self._state.status == "active"
def has_goal(self) -> bool:
return self._state is not None and self._state.status in ("active", "paused")
return self._state is not None and self._state.status in {"active", "paused"}
def status_line(self) -> str:
s = self._state
if s is None or s.status in ("cleared",):
if s is None or s.status in {"cleared",}:
return "No active goal. Set one with /goal <text>."
turns = f"{s.turns_used}/{s.max_turns} turns"
if s.status == "active":

View file

@ -32,11 +32,11 @@ def hooks_command(args) -> None:
print("Run 'hermes hooks --help' for details.")
return
if sub in ("list", "ls"):
if sub in {"list", "ls"}:
_cmd_list(args)
elif sub == "test":
_cmd_test(args)
elif sub in ("revoke", "remove", "rm"):
elif sub in {"revoke", "remove", "rm"}:
_cmd_revoke(args)
elif sub == "doctor":
_cmd_doctor(args)
@ -220,7 +220,7 @@ def _cmd_test(args) -> None:
if getattr(args, "for_tool", None):
specs = [
s for s in specs
if s.event not in ("pre_tool_call", "post_tool_call")
if s.event not in {"pre_tool_call", "post_tool_call"}
or s.matches_tool(args.for_tool)
]

View file

@ -82,7 +82,7 @@ def _parse_workspace_flag(value: str) -> tuple[str, Optional[str]]:
if not value:
return ("scratch", None)
v = value.strip()
if v in ("scratch", "worktree"):
if v in {"scratch", "worktree"}:
return (v, None)
if v.startswith("dir:"):
path = v[len("dir:"):].strip()
@ -788,15 +788,15 @@ def _dispatch_boards(args: argparse.Namespace) -> int:
can still run ``boards create`` / ``boards list``.
"""
sub = getattr(args, "boards_action", None) or "list"
if sub in ("list", "ls"):
if sub in {"list", "ls"}:
return _cmd_boards_list(args)
if sub in ("create", "new"):
if sub in {"create", "new"}:
return _cmd_boards_create(args)
if sub in ("rm", "remove", "delete"):
if sub in {"rm", "remove", "delete"}:
return _cmd_boards_rm(args)
if sub in ("switch", "use"):
if sub in {"switch", "use"}:
return _cmd_boards_switch(args)
if sub in ("show", "current"):
if sub in {"show", "current"}:
return _cmd_boards_show(args)
if sub == "rename":
return _cmd_boards_rename(args)
@ -1301,7 +1301,7 @@ def _cmd_show(args: argparse.Namespace) -> int:
def _cmd_assign(args: argparse.Namespace) -> int:
profile = None if args.profile.lower() in ("none", "-", "null") else args.profile
profile = None if args.profile.lower() in {"none", "-", "null"} else args.profile
with kb.connect() as conn:
ok = kb.assign_task(conn, args.task_id, profile)
if not ok:
@ -1328,7 +1328,7 @@ def _cmd_reclaim(args: argparse.Namespace) -> int:
def _cmd_reassign(args: argparse.Namespace) -> int:
profile = None if args.profile.lower() in ("none", "-", "null") else args.profile
profile = None if args.profile.lower() in {"none", "-", "null"} else args.profile
with kb.connect() as conn:
ok = kb.reassign_task(
conn, args.task_id, profile,
@ -2230,7 +2230,7 @@ def run_slash(rest: str) -> str:
out = buf_out.getvalue().rstrip()
err = buf_err.getvalue().rstrip()
# Help dump (exit 0) → return the captured help text directly.
if exc.code in (0, None) and out:
if exc.code in {0, None} and out:
return out
body = err or out
return f"⚠ /kanban usage error\n{body}" if body else "⚠ /kanban usage error"

View file

@ -1844,7 +1844,7 @@ def recompute_ready(conn: sqlite3.Connection) -> int:
"WHERE l.child_id = ?",
(task_id,),
).fetchall()
if all(p["status"] in ("done", "archived") for p in parents):
if all(p["status"] in {"done", "archived"} for p in parents):
conn.execute(
"UPDATE tasks SET status = 'ready' WHERE id = ? AND status = 'todo'",
(task_id,),

View file

@ -177,7 +177,7 @@ def _active_hallucination_events(
active: list[Any] = []
for ev in events:
k = _event_kind(ev)
if k in ("completed", "edited"):
if k in {"completed", "edited"}:
active.clear()
elif k == kind:
active.append(ev)
@ -193,7 +193,7 @@ def _latest_clean_event_ts(events: Iterable[Any]) -> int:
"""
latest = 0
for ev in events:
if _event_kind(ev) in ("completed", "edited"):
if _event_kind(ev) in {"completed", "edited"}:
t = _event_ts(ev)
latest = max(latest, t)
return latest
@ -355,7 +355,7 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
most_recent_outcome = None
for r in reversed(ordered_runs):
oc = _task_field(r, "outcome")
if oc in ("spawn_failed", "timed_out", "crashed"):
if oc in {"spawn_failed", "timed_out", "crashed"}:
most_recent_outcome = oc
break
@ -373,7 +373,7 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
label=f"Fix profile auth: hermes -p {assignee} auth",
payload={"command": f"hermes -p {assignee} auth"},
))
elif most_recent_outcome in ("timed_out", "crashed"):
elif most_recent_outcome in {"timed_out", "crashed"}:
# Worker got off the ground but died. Logs are the right place
# to diagnose; reclaim/reassign are the recovery levers.
task_id = _task_field(task, "id")
@ -466,7 +466,7 @@ def _rule_repeated_crashes(task, events, runs, now, cfg) -> list[Diagnostic]:
consecutive += 1
if last_err is None:
last_err = _task_field(r, "error")
elif outcome in ("completed", "reclaimed"):
elif outcome in {"completed", "reclaimed"}:
# A success (or manual reclaim) breaks the streak.
break
else:
@ -541,7 +541,7 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
return []
# Any comment / unblock after the block breaks the "stale" signal.
for ev in events:
if _event_kind(ev) in ("commented", "unblocked") and _event_ts(ev) > last_blocked_ts:
if _event_kind(ev) in {"commented", "unblocked"} and _event_ts(ev) > last_blocked_ts:
return []
actions: list[DiagnosticAction] = [
DiagnosticAction(

View file

@ -124,7 +124,7 @@ def _apply_profile_override() -> None:
# 1. Check for explicit -p / --profile flag
for i, arg in enumerate(argv):
if arg in ("--profile", "-p") and i + 1 < len(argv):
if arg in {"--profile", "-p"} and i + 1 < len(argv):
profile_name = argv[i + 1]
consume = 2
break
@ -192,7 +192,7 @@ def _apply_profile_override() -> None:
# Strip the flag from argv so argparse doesn't choke
if consume > 0:
for i, arg in enumerate(argv):
if arg in ("--profile", "-p"):
if arg in {"--profile", "-p"}:
start = i + 1 # +1 because argv is sys.argv[1:]
sys.argv = sys.argv[:start] + sys.argv[start + consume :]
break
@ -567,13 +567,13 @@ def _session_browse_picker(sessions: list) -> Optional[str]:
stdscr.refresh()
key = stdscr.getch()
if key in (curses.KEY_UP,):
if key in {curses.KEY_UP,}:
if filtered:
cursor = (cursor - 1) % len(filtered)
elif key in (curses.KEY_DOWN,):
elif key in {curses.KEY_DOWN,}:
if filtered:
cursor = (cursor + 1) % len(filtered)
elif key in (curses.KEY_ENTER, 10, 13):
elif key in {curses.KEY_ENTER, 10, 13}:
if filtered:
result_holder[0] = filtered[cursor]["id"]
return
@ -587,7 +587,7 @@ def _session_browse_picker(sessions: list) -> Optional[str]:
else:
# Second Esc exits
return
elif key in (curses.KEY_BACKSPACE, 127, 8):
elif key in {curses.KEY_BACKSPACE, 127, 8}:
if search_text:
search_text = search_text[:-1]
if search_text:
@ -626,7 +626,7 @@ def _session_browse_picker(sessions: list) -> Optional[str]:
while True:
try:
val = input(f"\n Select [1-{len(sessions)}]: ").strip()
if not val or val.lower() in ("q", "quit", "exit"):
if not val or val.lower() in {"q", "quit", "exit"}:
return None
idx = int(val) - 1
if 0 <= idx < len(sessions):
@ -1303,7 +1303,7 @@ def _launch_tui(
except KeyboardInterrupt:
code = 130
if code in (0, 130):
if code in {0, 130}:
_print_tui_exit_summary(resume_session_id, active_session_file)
finally:
try:
@ -1403,7 +1403,7 @@ def cmd_chat(args):
reply = input("Run setup now? [Y/n] ").strip().lower()
except (EOFError, KeyboardInterrupt):
reply = "n"
if reply in ("", "y", "yes"):
if reply in {"", "y", "yes"}:
cmd_setup(args)
return
print()
@ -1583,7 +1583,7 @@ def cmd_whatsapp(args):
response = input("\n Update allowed users? [y/N] ").strip()
except (EOFError, KeyboardInterrupt):
response = "n"
if response.lower() in ("y", "yes"):
if response.lower() in {"y", "yes"}:
if wa_mode == "bot":
phone = input(
" Phone numbers that can message the bot (comma-separated): "
@ -1658,7 +1658,7 @@ def cmd_whatsapp(args):
).strip()
except (EOFError, KeyboardInterrupt):
response = "n"
if response.lower() in ("y", "yes"):
if response.lower() in {"y", "yes"}:
shutil.rmtree(session_dir, ignore_errors=True)
session_dir.mkdir(parents=True, exist_ok=True)
print(" ✓ Session cleared")
@ -2012,7 +2012,7 @@ def select_provider_and_model(args=None):
_model_flow_bedrock(config, current_model)
elif selected_provider == "azure-foundry":
_model_flow_azure_foundry(config, current_model)
elif selected_provider in (
elif selected_provider in {
"gemini",
"deepseek",
"xai",
@ -2032,18 +2032,18 @@ def select_provider_and_model(args=None):
"ollama-cloud",
"tencent-tokenhub",
"lmstudio",
) or _is_profile_api_key_provider(selected_provider):
} or _is_profile_api_key_provider(selected_provider):
_model_flow_api_key_provider(config, selected_provider, current_model)
# ── Post-switch cleanup: clear stale OPENAI_BASE_URL ──────────────
# When the user switches to a named provider (anything except "custom"),
# a leftover OPENAI_BASE_URL in ~/.hermes/.env can poison auxiliary
# clients that use provider:auto. Clear it proactively. (#5161)
if selected_provider not in (
if selected_provider not in {
"custom",
"cancel",
"remove-custom",
) and not selected_provider.startswith("custom:"):
} and not selected_provider.startswith("custom:"):
_clear_stale_openai_base_url()
@ -2169,7 +2169,7 @@ def _reset_aux_to_auto() -> int:
entry = {}
aux[task] = entry
changed = False
if entry.get("provider") not in (None, "", "auto"):
if entry.get("provider") not in {None, "", "auto"}:
entry["provider"] = "auto"
changed = True
for field in ("model", "base_url", "api_key"):
@ -3080,7 +3080,7 @@ def _model_flow_custom(config):
_add_v1 = input(" Add /v1? [Y/n]: ").strip().lower()
except (KeyboardInterrupt, EOFError):
_add_v1 = "n"
if _add_v1 in ("", "y", "yes"):
if _add_v1 in {"", "y", "yes"}:
effective_url = effective_url.rstrip("/") + "/v1"
if base_url:
base_url = effective_url
@ -3124,7 +3124,7 @@ def _model_flow_custom(config):
if len(detected_models) == 1:
print(f" Detected model: {detected_models[0]}")
confirm = input(" Use this model? [Y/n]: ").strip().lower()
if confirm in ("", "y", "yes"):
if confirm in {"", "y", "yes"}:
model_name = detected_models[0]
else:
model_name = input("Model name (e.g. gpt-4, llama-3-70b): ").strip()
@ -3957,7 +3957,7 @@ def _model_flow_copilot(config, current_model=""):
api_key = creds.get("api_key", "")
source = creds.get("source", "")
else:
if source in ("GITHUB_TOKEN", "GH_TOKEN"):
if source in {"GITHUB_TOKEN", "GH_TOKEN"}:
print(f" GitHub token: {api_key[:8]}... ✓ ({source})")
elif source == "gh auth token":
print(" GitHub token: ✓ (from `gh auth token`)")
@ -5277,7 +5277,7 @@ def cmd_slack(args):
command registered as a first-class slash.
"""
sub = getattr(args, "slack_command", None)
if sub in (None, ""):
if sub in {None, ""}:
# No subcommand — print usage hint.
print(
"usage: hermes slack <subcommand>\n"
@ -5424,7 +5424,7 @@ def _clear_bytecode_cache(root: Path) -> int:
dirnames[:] = [
d
for d in dirnames
if d not in ("venv", ".venv", "node_modules", ".git", ".worktrees")
if d not in {"venv", ".venv", "node_modules", ".git", ".worktrees"}
]
if os.path.basename(dirpath) == "__pycache__":
try:
@ -6219,7 +6219,7 @@ def _restore_stashed_changes(
response = input_fn("Restore local changes now? [Y/n]", "y")
else:
response = input().strip().lower()
if response not in ("", "y", "yes"):
if response not in {"", "y", "yes"}:
print("Skipped restoring local changes.")
print("Your changes are still preserved in git stash.")
print(f"Restore manually with: git stash apply {stash_ref}")
@ -6462,7 +6462,7 @@ def _sync_with_upstream_if_needed(git_cmd: list[str], cwd: Path) -> None:
print()
response = "n"
if response in ("", "y", "yes"):
if response in {"", "y", "yes"}:
print("→ Adding upstream remote...")
if _add_upstream_remote(git_cmd, cwd):
print(
@ -7521,7 +7521,7 @@ def _cmd_update_impl(args, gateway_mode: bool):
prompt_user=prompt_for_restore,
input_fn=gw_input_fn,
)
if current_branch not in ("main", "HEAD"):
if current_branch not in {"main", "HEAD"}:
subprocess.run(
git_cmd + ["checkout", current_branch],
cwd=PROJECT_ROOT,
@ -7805,7 +7805,7 @@ def _cmd_update_impl(args, gateway_mode: bool):
except EOFError:
response = "n"
if response in ("", "y", "yes", "auto"):
if response in {"", "y", "yes", "auto"}:
print()
# Gateway mode, --yes, and non-interactive update contexts
# (dashboard / web server actions) cannot prompt for API keys.
@ -8866,7 +8866,7 @@ def cmd_profile(args):
answer = input("\nProceed with install? [y/N] ").strip().lower()
except (EOFError, KeyboardInterrupt):
answer = ""
if answer not in ("y", "yes"):
if answer not in {"y", "yes"}:
print("Install cancelled.")
return
@ -8925,7 +8925,7 @@ def cmd_profile(args):
answer = input("\nProceed? [y/N] ").strip().lower()
except (EOFError, KeyboardInterrupt):
answer = ""
if answer not in ("y", "yes"):
if answer not in {"y", "yes"}:
print("Update cancelled.")
return
@ -10713,9 +10713,9 @@ Examples:
mem_dir = get_hermes_home() / "memories"
target = getattr(args, "target", "all")
files_to_reset = []
if target in ("all", "memory"):
if target in {"all", "memory"}:
files_to_reset.append(("MEMORY.md", "agent notes"))
if target in ("all", "user"):
if target in {"all", "user"}:
files_to_reset.append(("USER.md", "user profile"))
# Check what exists
@ -10826,7 +10826,7 @@ Examples:
def cmd_tools(args):
action = getattr(args, "tools_action", None)
if action in ("list", "disable", "enable"):
if action in {"list", "disable", "enable"}:
from hermes_cli.tools_config import tools_disable_enable_command
tools_disable_enable_command(args)
@ -11035,7 +11035,7 @@ Examples:
def _confirm_prompt(prompt: str) -> bool:
"""Prompt for y/N confirmation, safe against non-TTY environments."""
try:
return input(prompt).strip().lower() in ("y", "yes")
return input(prompt).strip().lower() in {"y", "yes"}
except (EOFError, KeyboardInterrupt):
return False

View file

@ -63,7 +63,7 @@ def _confirm(question: str, default: bool = True) -> bool:
return default
if not val:
return default
return val in ("y", "yes")
return val in {"y", "yes"}
def _prompt(question: str, *, password: bool = False, default: str = "") -> str:
@ -375,11 +375,11 @@ def cmd_mcp_add(args):
_info("Cancelled.")
return
if choice in ("n", "no"):
if choice in {"n", "no"}:
_info("Cancelled — server not saved.")
return
if choice in ("s", "select"):
if choice in {"s", "select"}:
# Interactive tool selection
from hermes_cli.curses_ui import curses_checklist
@ -509,7 +509,7 @@ def cmd_mcp_list(args=None):
# Enabled status
enabled = cfg.get("enabled", True)
if isinstance(enabled, str):
enabled = enabled.lower() in ("true", "1", "yes")
enabled = enabled.lower() in {"true", "1", "yes"}
status = color("✓ enabled", Colors.GREEN) if enabled else color("✗ disabled", Colors.DIM)
print(f" {name:<16} {transport:<30} {tools_str:<12} {status}")

View file

@ -825,7 +825,7 @@ def switch_model(
# --- Step e: detect_provider_for_model() as last resort ---
_base = current_base_url or ""
is_custom = current_provider in ("custom", "local") or (
is_custom = current_provider in {"custom", "local"} or (
"localhost" in _base or "127.0.0.1" in _base
)
@ -1525,7 +1525,7 @@ def list_authenticated_providers(
api_key = os.environ.get(key_env, "").strip() if key_env else ""
discover = ep_cfg.get("discover_models", True)
if isinstance(discover, str):
discover = discover.lower() not in ("false", "no", "0")
discover = discover.lower() not in {"false", "no", "0"}
if api_url and api_key and discover:
try:
from hermes_cli.models import fetch_api_models

View file

@ -818,7 +818,7 @@ try:
for _pp in _list_providers_for_canonical():
if _pp.name in _canonical_slugs:
continue
if _pp.auth_type in ("oauth_device_code", "oauth_external", "external_process", "aws_sdk", "copilot"):
if _pp.auth_type in {"oauth_device_code", "oauth_external", "external_process", "aws_sdk", "copilot"}:
continue # non-api-key flows need bespoke picker UX; skip auto-inject
_label = _pp.display_name or _pp.name
_desc = _pp.description or f"{_label} (direct API)"
@ -2335,7 +2335,7 @@ def _lmstudio_fetch_raw_models(
with urllib.request.urlopen(request, timeout=timeout) as resp:
payload = json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
if exc.code in (401, 403):
if exc.code in {401, 403}:
from hermes_cli.auth import AuthError
raise AuthError(
f"LM Studio rejected the request with HTTP {exc.code}.",
@ -3270,7 +3270,7 @@ def validate_requested_model(
# MiniMax providers don't expose a /models endpoint — validate against
# the static catalog instead, similar to openai-codex.
if normalized in ("minimax", "minimax-cn"):
if normalized in {"minimax", "minimax-cn"}:
try:
catalog_models = provider_model_ids(normalized)
except Exception:

View file

@ -86,9 +86,9 @@ logger = logging.getLogger(__name__)
# The env var is read once at import time; tests that need to flip it
# mid-process can call ``_install_plugin_debug_handler(force=True)``.
_PLUGINS_DEBUG = os.getenv("HERMES_PLUGINS_DEBUG", "").strip().lower() in (
_PLUGINS_DEBUG = os.getenv("HERMES_PLUGINS_DEBUG", "").strip().lower() in {
"1", "true", "yes", "on",
)
}
_DEBUG_HANDLER_INSTALLED = False
@ -100,9 +100,9 @@ def _install_plugin_debug_handler(force: bool = False) -> None:
"""
global _DEBUG_HANDLER_INSTALLED, _PLUGINS_DEBUG
if force:
_PLUGINS_DEBUG = os.getenv("HERMES_PLUGINS_DEBUG", "").strip().lower() in (
_PLUGINS_DEBUG = os.getenv("HERMES_PLUGINS_DEBUG", "").strip().lower() in {
"1", "true", "yes", "on",
)
}
if not _PLUGINS_DEBUG or _DEBUG_HANDLER_INSTALLED:
return
handler = logging.StreamHandler(sys.stderr)
@ -824,7 +824,7 @@ class PluginManager:
# Bundled platform plugins (gateway adapters like IRC) auto-load
# for the same reason: every platform Hermes ships must be
# available out of the box without the user having to opt in.
if manifest.source == "bundled" and manifest.kind in ("backend", "platform"):
if manifest.source == "bundled" and manifest.kind in {"backend", "platform"}:
self._load_plugin(manifest)
continue
@ -1075,7 +1075,7 @@ class PluginManager:
)
try:
if manifest.source in ("user", "project", "bundled"):
if manifest.source in {"user", "project", "bundled"}:
module = self._load_directory_module(manifest)
else:
module = self._load_entrypoint_module(manifest)

View file

@ -85,7 +85,7 @@ def _sanitize_plugin_name(name: str, plugins_dir: Path) -> Path:
if not name:
raise ValueError("Plugin name must not be empty.")
if name in (".", ".."):
if name in {".", ".."}:
raise ValueError(
f"Invalid plugin name '{name}': must not reference the plugins directory itself."
)
@ -491,7 +491,7 @@ def cmd_install(
answer = input(
f" Enable '{installed_name}' now? [y/N]: ",
).strip().lower()
should_enable = answer in ("y", "yes")
should_enable = answer in {"y", "yes"}
except (EOFError, KeyboardInterrupt):
should_enable = False
else:
@ -731,7 +731,7 @@ def _discover_all_plugins() -> list:
for d in sorted(base.iterdir()):
if not d.is_dir():
continue
if source == "bundled" and d.name in ("memory", "context_engine"):
if source == "bundled" and d.name in {"memory", "context_engine"}:
continue
manifest_file = d / "plugin.yaml"
if not manifest_file.exists():
@ -1129,10 +1129,10 @@ def _run_composite_ui(curses, plugin_names, plugin_labels, plugin_selected,
stdscr.refresh()
key = stdscr.getch()
if key in (curses.KEY_UP, ord("k")):
if key in {curses.KEY_UP, ord("k")}:
if total_items > 0:
cursor = (cursor - 1) % total_items
elif key in (curses.KEY_DOWN, ord("j")):
elif key in {curses.KEY_DOWN, ord("j")}:
if total_items > 0:
cursor = (cursor + 1) % total_items
elif key == ord(" "):
@ -1168,7 +1168,7 @@ def _run_composite_ui(curses, plugin_names, plugin_labels, plugin_selected,
curses.init_pair(3, curses.COLOR_CYAN, -1)
curses.init_pair(4, 8, -1)
curses.curs_set(0)
elif key in (curses.KEY_ENTER, 10, 13):
elif key in {curses.KEY_ENTER, 10, 13}:
if cursor < n_plugins:
# ENTER on a plugin checkbox — confirm and exit
result_holder["plugins_changed"] = True
@ -1200,7 +1200,7 @@ def _run_composite_ui(curses, plugin_names, plugin_labels, plugin_selected,
curses.init_pair(3, curses.COLOR_CYAN, -1)
curses.init_pair(4, 8, -1)
curses.curs_set(0)
elif key in (27, ord("q")):
elif key in {27, ord("q")}:
# Save plugin changes on exit
result_holder["plugins_changed"] = True
return
@ -1569,13 +1569,13 @@ def plugins_command(args) -> None:
)
elif action == "update":
cmd_update(args.name)
elif action in ("remove", "rm", "uninstall"):
elif action in {"remove", "rm", "uninstall"}:
cmd_remove(args.name)
elif action == "enable":
cmd_enable(args.name)
elif action == "disable":
cmd_disable(args.name)
elif action in ("list", "ls"):
elif action in {"list", "ls"}:
cmd_list()
elif action is None:
cmd_toggle()

View file

@ -989,7 +989,7 @@ def _default_export_ignore(root_dir: Path):
if entry == "__pycache__" or entry.endswith((".sock", ".tmp")):
ignored.add(entry)
# npm lockfiles can appear at root
elif entry in ("package.json", "package-lock.json"):
elif entry in {"package.json", "package-lock.json"}:
ignored.add(entry)
# Root-level exclusions
if Path(directory) == root_dir:
@ -1057,7 +1057,7 @@ def _normalize_profile_archive_parts(member_name: str) -> List[str]:
):
raise ValueError(f"Unsafe archive member path: {member_name}")
parts = [part for part in posix_path.parts if part not in ("", ".")]
parts = [part for part in posix_path.parts if part not in {"", "."}]
if not parts or any(part == ".." for part in parts):
raise ValueError(f"Unsafe archive member path: {member_name}")
return parts

View file

@ -164,7 +164,7 @@ class PtyBridge:
data = os.read(self._fd, 65536)
except OSError as exc:
# EIO on Linux = slave side closed. EBADF = already closed.
if exc.errno in (errno.EIO, errno.EBADF):
if exc.errno in {errno.EIO, errno.EBADF}:
return None
raise
if not data:
@ -181,7 +181,7 @@ class PtyBridge:
try:
n = os.write(self._fd, view)
except OSError as exc:
if exc.errno in (errno.EIO, errno.EBADF, errno.EPIPE):
if exc.errno in {errno.EIO, errno.EBADF, errno.EPIPE}:
return
raise
if n <= 0:

View file

@ -260,7 +260,7 @@ def _resolve_runtime_from_pool_entry(
if cfg_base_url:
base_url = cfg_base_url
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
if provider in ("opencode-zen", "opencode-go"):
if provider in {"opencode-zen", "opencode-go"}:
# Re-derive api_mode from the effective model rather than the
# persisted api_mode: the opencode providers serve both
# anthropic_messages and chat_completions models, so the previous
@ -282,7 +282,7 @@ def _resolve_runtime_from_pool_entry(
# Anthropic SDK prepends its own /v1/messages to the base_url. Strip the
# trailing /v1 so the SDK constructs the correct path (e.g.
# https://opencode.ai/zen/go/v1/messages instead of .../v1/v1/messages).
if api_mode == "anthropic_messages" and provider in ("opencode-zen", "opencode-go"):
if api_mode == "anthropic_messages" and provider in {"opencode-zen", "opencode-go"}:
base_url = re.sub(r"/v1/?$", "", base_url)
return {
@ -859,7 +859,7 @@ def _resolve_explicit_runtime(
base_url = explicit_base_url
if not base_url:
if provider in ("kimi-coding", "kimi-coding-cn"):
if provider in {"kimi-coding", "kimi-coding-cn"}:
creds = resolve_api_key_provider_credentials(provider)
base_url = creds.get("base_url", "").rstrip("/")
else:
@ -1223,7 +1223,7 @@ def resolve_runtime_provider(
# trust boto3's credential chain — it handles IMDS, ECS task roles,
# Lambda execution roles, SSO, and other implicit sources that our
# env-var check can't detect.
is_explicit = requested_provider in ("bedrock", "aws", "aws-bedrock", "amazon-bedrock", "amazon")
is_explicit = requested_provider in {"bedrock", "aws", "aws-bedrock", "amazon-bedrock", "amazon"}
if not is_explicit and not has_aws_credentials():
raise AuthError(
"No AWS credentials found for Bedrock. Configure one of:\n"
@ -1303,7 +1303,7 @@ def resolve_runtime_provider(
configured_provider = str(model_cfg.get("provider") or "").strip().lower()
# Only honor persisted api_mode when it belongs to the same provider family.
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
if provider in ("opencode-zen", "opencode-go"):
if provider in {"opencode-zen", "opencode-go"}:
# opencode-zen/go must always re-derive api_mode from the
# target model (not the stale persisted api_mode), because
# the same provider serves both anthropic_messages
@ -1325,7 +1325,7 @@ def resolve_runtime_provider(
if detected:
api_mode = detected
# Strip trailing /v1 for OpenCode Anthropic models (see comment above).
if api_mode == "anthropic_messages" and provider in ("opencode-zen", "opencode-go"):
if api_mode == "anthropic_messages" and provider in {"opencode-zen", "opencode-go"}:
base_url = re.sub(r"/v1/?$", "", base_url)
return {
"provider": provider,

View file

@ -292,9 +292,9 @@ def prompt_yes_no(question: str, default: bool = True) -> bool:
if not value:
return default
if value in ("y", "yes"):
if value in {"y", "yes"}:
return True
if value in ("n", "no"):
if value in {"n", "no"}:
return False
print_error("Please enter 'y' or 'n'")
@ -641,7 +641,7 @@ def _prompt_container_resources(config: dict):
persist_str = prompt(
" Persist filesystem across sessions? (yes/no)", persist_label
)
terminal["container_persistent"] = persist_str.lower() in ("yes", "true", "y", "1")
terminal["container_persistent"] = persist_str.lower() in {"yes", "true", "y", "1"}
# CPU
current_cpu = terminal.get("container_cpu", 1)
@ -692,7 +692,7 @@ def _prompt_vercel_sandbox_settings(config: dict):
persist_label = "yes" if current_persist else "no"
terminal["container_persistent"] = prompt(
" Persist filesystem with snapshots? (yes/no)", persist_label
).lower() in ("yes", "true", "y", "1")
).lower() in {"yes", "true", "y", "1"}
current_cpu = terminal.get("container_cpu", 1)
cpu_str = prompt(" CPU cores", str(current_cpu))
@ -708,7 +708,7 @@ def _prompt_vercel_sandbox_settings(config: dict):
except ValueError:
pass
if terminal.get("container_disk", 51200) not in (0, 51200):
if terminal.get("container_disk", 51200) not in {0, 51200}:
print_warning("Vercel Sandbox does not support custom disk sizing; resetting container_disk to 51200.")
terminal["container_disk"] = 51200
@ -1729,7 +1729,7 @@ def setup_agent_settings(config: dict):
current_mode = cfg_get(config, "display", "tool_progress", default="all")
mode = prompt("Tool progress mode", current_mode)
if mode.lower() in ("off", "new", "all", "verbose"):
if mode.lower() in {"off", "new", "all", "verbose"}:
if "display" not in config:
config["display"] = {}
config["display"]["tool_progress"] = mode.lower()

View file

@ -593,7 +593,7 @@ def do_install(identifier: str, category: str = "", force: bool = False,
answer = input("Confirm [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
answer = "n"
if answer not in ("y", "yes"):
if answer not in {"y", "yes"}:
c.print("[dim]Installation cancelled.[/]\n")
shutil.rmtree(q_path, ignore_errors=True)
return
@ -948,7 +948,7 @@ def do_uninstall(name: str, console: Optional[Console] = None,
answer = input("Confirm [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
answer = "n"
if answer not in ("y", "yes"):
if answer not in {"y", "yes"}:
c.print("[dim]Cancelled.[/]\n")
return
@ -984,7 +984,7 @@ def do_reset(name: str, restore: bool = False,
answer = input("Confirm [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
answer = "n"
if answer not in ("y", "yes"):
if answer not in {"y", "yes"}:
c.print("[dim]Cancelled.[/]\n")
return
@ -1138,7 +1138,7 @@ def _github_publish(skill_path: Path, skill_name: str, target_repo: str,
f"https://api.github.com/repos/{target_repo}/forks",
headers=headers, timeout=30,
)
if resp.status_code in (200, 202):
if resp.status_code in {200, 202}:
fork = resp.json()
fork_repo = fork["full_name"]
elif resp.status_code == 403:
@ -1564,7 +1564,7 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None:
repo = args[1] if len(args) > 1 else ""
do_tap(tap_action, repo=repo, console=c)
elif action in ("help", "--help", "-h"):
elif action in {"help", "--help", "-h"}:
_print_skills_help(c)
else:

View file

@ -367,7 +367,7 @@ def show_status(args):
if persist is None:
persist_enabled = bool(terminal_cfg.get("container_persistent", True))
else:
persist_enabled = persist.lower() in ("1", "true", "yes", "on")
persist_enabled = persist.lower() in {"1", "true", "yes", "on"}
auth_status = describe_vercel_auth()
sdk_ok = importlib.util.find_spec("vercel") is not None
sdk_label = "installed" if sdk_ok else "missing (install: pip install 'hermes-agent[vercel]')"

View file

@ -105,7 +105,7 @@ def configure_windows_stdio() -> bool:
_CONFIGURED = True
return False
if os.environ.get("HERMES_DISABLE_WINDOWS_UTF8") in ("1", "true", "True", "yes"):
if os.environ.get("HERMES_DISABLE_WINDOWS_UTF8") in {"1", "true", "True", "yes"}:
_CONFIGURED = True
return False

View file

@ -594,7 +594,7 @@ def _pip_install(
def _run_post_setup(post_setup_key: str):
"""Run post-setup hooks for tools that need extra installation steps."""
import shutil
if post_setup_key in ("agent_browser", "browserbase"):
if post_setup_key in {"agent_browser", "browserbase"}:
node_modules = PROJECT_ROOT / "node_modules" / "agent-browser"
npm_bin = shutil.which("npm")
npx_bin = shutil.which("npx")
@ -1631,7 +1631,7 @@ def _is_provider_active(provider: dict, config: dict) -> bool:
image_cfg = config.get("image_gen", {})
if isinstance(image_cfg, dict):
configured_provider = image_cfg.get("provider")
if configured_provider not in (None, "", "fal"):
if configured_provider not in {None, "", "fal"}:
return False
if image_cfg.get("use_gateway") is not None and not is_truthy_value(image_cfg.get("use_gateway"), default=False):
return False
@ -1664,7 +1664,7 @@ def _is_provider_active(provider: dict, config: dict) -> bool:
configured_provider = image_cfg.get("provider")
return (
provider["imagegen_backend"] == "fal"
and configured_provider in (None, "", "fal")
and configured_provider in {None, "", "fal"}
and not is_truthy_value(image_cfg.get("use_gateway"), default=False)
)
return False
@ -1914,7 +1914,7 @@ def _configure_provider(provider: dict, config: dict):
# For tools without a specific config key (e.g. image_gen), still
# track use_gateway so the runtime knows the user's intent.
if managed_feature and managed_feature not in ("web", "tts", "browser"):
if managed_feature and managed_feature not in {"web", "tts", "browser"}:
config.setdefault(managed_feature, {})["use_gateway"] = True
elif not managed_feature:
# User picked a non-gateway provider — find which category this
@ -1946,7 +1946,7 @@ def _configure_provider(provider: dict, config: dict):
# image_gen.provider clear so the dispatch shim falls through
# to the legacy FAL path.
img_cfg = config.setdefault("image_gen", {})
if isinstance(img_cfg, dict) and img_cfg.get("provider") not in (None, "", "fal"):
if isinstance(img_cfg, dict) and img_cfg.get("provider") not in {None, "", "fal"}:
img_cfg["provider"] = "fal"
return
@ -1991,7 +1991,7 @@ def _configure_provider(provider: dict, config: dict):
if backend:
_configure_imagegen_model(backend, config)
img_cfg = config.setdefault("image_gen", {})
if isinstance(img_cfg, dict) and img_cfg.get("provider") not in (None, "", "fal"):
if isinstance(img_cfg, dict) and img_cfg.get("provider") not in {None, "", "fal"}:
img_cfg["provider"] = "fal"
@ -2186,7 +2186,7 @@ def _reconfigure_provider(provider: dict, config: dict):
web_cfg["use_gateway"] = bool(managed_feature)
_print_success(f" Web backend set to: {provider['web_backend']}")
if managed_feature and managed_feature not in ("web", "tts", "browser"):
if managed_feature and managed_feature not in {"web", "tts", "browser"}:
section = config.setdefault(managed_feature, {})
if not isinstance(section, dict):
section = {}
@ -2535,7 +2535,7 @@ def _configure_mcp_tools_interactive(config: dict):
# Count enabled servers
enabled_names = [
k for k, v in mcp_servers.items()
if v.get("enabled", True) not in (False, "false", "0", "no", "off")
if v.get("enabled", True) not in {False, "false", "0", "no", "off"}
]
if not enabled_names:
_print_info("All MCP servers are disabled.")

View file

@ -490,7 +490,7 @@ def run_uninstall(args):
print("Cancelled.")
return
if choice == "3" or choice.lower() in ("c", "cancel", "q", "quit", "n", "no"):
if choice == "3" or choice.lower() in {"c", "cancel", "q", "quit", "n", "no"}:
print()
print("Uninstall cancelled.")
return
@ -517,7 +517,7 @@ def run_uninstall(args):
print()
print("Cancelled.")
return
remove_profiles = resp in ("y", "yes")
remove_profiles = resp in {"y", "yes"}
# Final confirmation
print()

View file

@ -179,7 +179,7 @@ def _is_accepted_host(host_header: str, bound_host: str) -> bool:
# 0.0.0.0 bind means operator explicitly opted into all-interfaces
# (requires --insecure per web_server.start_server). No Host-layer
# defence can protect that mode; rely on operator network controls.
if bound_host in ("0.0.0.0", "::"):
if bound_host in {"0.0.0.0", "::"}:
return True
# Loopback bind: accept the loopback names
@ -385,7 +385,7 @@ def _build_schema_from_config(
full_key = f"{prefix}.{key}" if prefix else key
# Skip internal / version keys
if full_key in ("_config_version",):
if full_key in {"_config_version",}:
continue
# Category is the first path component for nested keys, or "general"
@ -576,13 +576,13 @@ async def get_status():
gateway_exit_reason = runtime.get("exit_reason")
gateway_updated_at = runtime.get("updated_at")
if not gateway_running:
gateway_state = gateway_state if gateway_state in ("stopped", "startup_failed") else "stopped"
gateway_state = gateway_state if gateway_state in {"stopped", "startup_failed"} else "stopped"
gateway_platforms = {}
elif gateway_running and remote_health_body is not None:
# The health probe confirmed the gateway is alive, but the local
# runtime status file may be stale (cross-container). Override
# stopped/None state so the dashboard shows the correct badge.
if gateway_state in (None, "stopped"):
if gateway_state in {None, "stopped"}:
gateway_state = "running"
# If there was no runtime info at all but the health probe confirmed alive,
@ -1075,7 +1075,7 @@ async def set_model_assignment(body: ModelAssignment):
model = (body.model or "").strip()
task = (body.task or "").strip().lower()
if scope not in ("main", "auxiliary"):
if scope not in {"main", "auxiliary"}:
raise HTTPException(status_code=400, detail="scope must be 'main' or 'auxiliary'")
try:
@ -1568,7 +1568,7 @@ async def disconnect_oauth_provider(provider_id: str, request: Request):
# AND forget the Claude Code import. We don't touch ~/.claude/* directly
# — that's owned by the Claude Code CLI; users can re-auth there if they
# want to undo a disconnect.
if provider_id in ("anthropic", "claude-code"):
if provider_id in {"anthropic", "claude-code"}:
try:
from agent.anthropic_adapter import _HERMES_OAUTH_FILE
if _HERMES_OAUTH_FILE.exists():
@ -2024,7 +2024,7 @@ def _codex_full_login_worker(session_id: str) -> None:
if poll.status_code == 200:
code_resp = poll.json()
break
if poll.status_code in (403, 404):
if poll.status_code in {403, 404}:
continue # user hasn't authorized yet
raise RuntimeError(f"deviceauth/token poll returned {poll.status_code}")
@ -3003,7 +3003,7 @@ _LOOPBACK_HOSTS = frozenset({"127.0.0.1", "::1", "localhost", "testclient"})
def _is_public_bind() -> bool:
"""True when bound to all-interfaces (operator used --insecure)."""
return getattr(app.state, "bound_host", "") in ("0.0.0.0", "::")
return getattr(app.state, "bound_host", "") in {"0.0.0.0", "::"}
def _ws_client_is_allowed(ws: "WebSocket") -> bool:
@ -3585,7 +3585,7 @@ def _normalise_theme_definition(data: Dict[str, Any]) -> Optional[Dict[str, Any]
if isinstance(radius, str) and radius.strip():
layout["radius"] = radius
density = layout_src.get("density")
if isinstance(density, str) and density in ("compact", "comfortable", "spacious"):
if isinstance(density, str) and density in {"compact", "comfortable", "spacious"}:
layout["density"] = density
# Color overrides — keep only valid keys with string values.
@ -3918,7 +3918,7 @@ def _merged_plugins_hub() -> Dict[str, Any]:
pass
can_remove_update = (
source in ("user", "git") and under_user_tree and Path(dir_str).is_dir()
source in {"user", "git"} and under_user_tree and Path(dir_str).is_dir()
)
# Check if this plugin provides tools that require auth

View file

@ -124,11 +124,11 @@ def webhook_command(args):
if not _require_webhook_enabled():
return
if sub in ("subscribe", "add"):
if sub in {"subscribe", "add"}:
_cmd_subscribe(args)
elif sub in ("list", "ls"):
elif sub in {"list", "ls"}:
_cmd_list(args)
elif sub in ("remove", "rm"):
elif sub in {"remove", "rm"}:
_cmd_remove(args)
elif sub == "test":
_cmd_test(args)