chore: ruff auto-fix PLR6201 resweep — tuple → set in membership tests (#27355)

Six days after #23937 (608 fixes) the codebase had accumulated 241 new
PLR6201 violations. Same mechanical `x in (...)` → `x in {...}` fix,
same zero-risk profile: set lookup is O(1) vs O(n) for tuple and the
two are semantically equivalent for hashable scalar membership tests.

All 241 instances fixed via `ruff check --select PLR6201 --fix
--unsafe-fixes`, zero remaining. Every changed value is a hashable
scalar (str/int/None/enum/signal); no risk of unhashable runtime
errors. No behavior change.

Test plan:
- 119 files changed, +244/-244 (net zero) — exactly one-line edits
- `ruff check` clean afterward
- Compile checks pass on the largest touched files (cli.py, run_agent.py,
  gateway/run.py, gateway/platforms/discord.py, model_tools.py)
- Subset broad test run on tests/gateway/ tests/hermes_cli/ tests/agent/
  tests/tools/: 18187 passed, 59 pre-existing failures (verified against
  origin/main with the same shape — identical failure count, identical
  category — all xdist test-order flakes unrelated to this change)

Follows the same template as PR #23937 ([tracker: #23972](https://github.com/NousResearch/hermes-agent/issues/23972)).
This commit is contained in:
kshitij 2026-05-17 02:29:41 -07:00 committed by GitHub
parent ad00777f04
commit 5fba236644
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
119 changed files with 244 additions and 244 deletions

View file

@ -232,7 +232,7 @@ class LSPClient:
the process is killed and the client is left in state
``"error"`` re-call ``start()`` to retry.
"""
if self._state in ("running", "starting"):
if self._state in {"running", "starting"}:
return
self._state = "starting"
try:

View file

@ -151,7 +151,7 @@ def try_install(pkg: str, strategy: str = "auto") -> Optional[str]:
same path (or ``None``) without reinstalling. Concurrent calls
are serialized.
"""
if strategy not in ("auto",):
if strategy not in {"auto",}:
# Only ``auto`` triggers an actual install. In manual/off,
# we still check whether the binary already exists.
recipe = INSTALL_RECIPES.get(pkg, {})

View file

@ -162,7 +162,7 @@ class LSPService:
idle_timeout: float = DEFAULT_IDLE_TIMEOUT,
) -> None:
self._enabled = enabled
self._wait_mode = wait_mode if wait_mode in ("document", "full") else "document"
self._wait_mode = wait_mode if wait_mode in {"document", "full"} else "document"
self._wait_timeout = wait_timeout
self._install_strategy = install_strategy
self._binary_overrides = binary_overrides or {}

View file

@ -28,7 +28,7 @@ def format_diagnostic(d: Dict[str, Any]) -> str:
col = int(start.get("character", 0)) + 1
msg = str(d.get("message") or "").rstrip()
code = d.get("code")
code_part = f" [{code}]" if code not in (None, "") else ""
code_part = f" [{code}]" if code not in {None, ""} else ""
source = d.get("source")
source_part = f" ({source})" if source else ""
return f"{sev} [{line}:{col}] {msg}{code_part}{source_part}"

View file

@ -237,7 +237,7 @@ def _spawn_pyright(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
return None
# If we got the cli ``pyright``, the langserver is its sibling.
base = os.path.basename(bin_path)
if base in ("pyright", "pyright.exe"):
if base in {"pyright", "pyright.exe"}:
sibling = os.path.join(os.path.dirname(bin_path), "pyright-langserver")
if os.path.exists(sibling):
bin_path = sibling

View file

@ -541,7 +541,7 @@ class CodexAppServerSession:
turn_status = (
(note.get("params") or {}).get("turn") or {}
).get("status")
if turn_status and turn_status not in ("completed", "interrupted"):
if turn_status and turn_status not in {"completed", "interrupted"}:
err_obj = (
(note.get("params") or {}).get("turn") or {}
).get("error")
@ -775,9 +775,9 @@ def _approval_choice_to_codex_decision(choice: str) -> str:
(verified against codex-rs/app-server-protocol/src/protocol/v2/item.rs
on codex 0.130.0).
"""
if choice in ("once",):
if choice in {"once",}:
return "accept"
if choice in ("session", "always"):
if choice in {"session", "always"}:
return "acceptForSession"
return "decline"

6
cli.py
View file

@ -1396,7 +1396,7 @@ def _detect_light_mode() -> bool:
last = cfgbg.split(";")[-1] if ";" in cfgbg else cfgbg
if last.isdigit():
bg = int(last)
if bg in (7, 15):
if bg in {7, 15}:
result = True
_LIGHT_MODE_CACHE = result
return result
@ -7706,7 +7706,7 @@ class HermesCLI:
# google-gemini/gemini-cli#19332.
_rest = cmd_original.split(None, 1)
_args = (_rest[1] if len(_rest) > 1 else "").strip().lower()
if _args in ("--delete", "-d"):
if _args in {"--delete", "-d"}:
self._delete_session_on_exit = True
elif _args:
_cprint(f" {_DIM}✗ Unknown argument: {_escape(_args)}. Use /exit --delete to also remove session history.{_RST}")
@ -13835,7 +13835,7 @@ class HermesCLI:
if _errno == errno.EIO:
pass # suppress broken-stdout I/O errors on interrupt (#13710)
elif (
_errno in (errno.EINVAL, errno.EBADF)
_errno in {errno.EINVAL, errno.EBADF}
or "is not registered" in _msg
or "Bad file descriptor" in _msg
or "Invalid argument" in _msg

View file

@ -3639,18 +3639,18 @@ class DiscordAdapter(BasePlatformAdapter):
configured = self.config.extra.get("thread_require_mention")
if configured is not None:
if isinstance(configured, str):
return configured.lower() not in ("false", "0", "no", "off")
return configured.lower() not in {"false", "0", "no", "off"}
return bool(configured)
return os.getenv("DISCORD_THREAD_REQUIRE_MENTION", "false").lower() in ("true", "1", "yes", "on")
return os.getenv("DISCORD_THREAD_REQUIRE_MENTION", "false").lower() in {"true", "1", "yes", "on"}
def _discord_history_backfill(self) -> bool:
"""Return whether history backfill is enabled for shared sessions."""
configured = self.config.extra.get("history_backfill")
if configured is not None:
if isinstance(configured, str):
return configured.lower() not in ("false", "0", "no", "off")
return configured.lower() not in {"false", "0", "no", "off"}
return bool(configured)
return os.getenv("DISCORD_HISTORY_BACKFILL", "true").lower() in ("true", "1", "yes")
return os.getenv("DISCORD_HISTORY_BACKFILL", "true").lower() in {"true", "1", "yes"}
def _discord_history_backfill_limit(self) -> int:
"""Return the max number of messages to scan backwards for context.
@ -3737,7 +3737,7 @@ class DiscordAdapter(BasePlatformAdapter):
break
# Skip system messages (pins, joins, thread renames, etc.)
if msg.type not in (discord.MessageType.default, discord.MessageType.reply):
if msg.type not in {discord.MessageType.default, discord.MessageType.reply}:
continue
# Respect DISCORD_ALLOW_BOTS for other bots.

View file

@ -8863,7 +8863,7 @@ class GatewayRunner:
lines.append("Failed/paused: (none)")
return "\n".join(lines)
if action in ("pause", "resume"):
if action in {"pause", "resume"}:
if not target:
return f"Usage: /platform {action} <name>"
platform = _resolve_platform(target)

View file

@ -2610,7 +2610,7 @@ def _print_loopback_ssh_hint(redirect_uri: str, *, docs_url: str | None = None)
return
host = parsed.hostname or ""
port = parsed.port
if host not in ("127.0.0.1", "::1", "localhost") or not port:
if host not in {"127.0.0.1", "::1", "localhost"} or not port:
return
print()
print("Remote session detected. Your browser will redirect to")
@ -5246,7 +5246,7 @@ def _login_xai_oauth(
reuse = input("Use existing credentials? [Y/n]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
reuse = "y"
if reuse in ("", "y", "yes"):
if reuse in {"", "y", "yes"}:
config_path = _update_config_for_provider(
"xai-oauth",
existing.get("base_url", DEFAULT_XAI_OAUTH_BASE_URL),

View file

@ -48,9 +48,9 @@ def parse_args(arg_string: str) -> tuple[Optional[str], list[str]]:
if not raw:
return None, []
# Accept human-friendly synonyms
if raw in ("on", "codex", "enable"):
if raw in {"on", "codex", "enable"}:
return "codex_app_server", []
if raw in ("off", "default", "disable", "hermes"):
if raw in {"off", "default", "disable", "hermes"}:
return "auto", []
if raw in VALID_RUNTIMES:
return raw, []

View file

@ -91,7 +91,7 @@ def ensure_dependency(dep: str, interactive: bool = True) -> bool:
reply = input(f"{desc} is not installed. Install now? [Y/n] ").strip().lower()
except (EOFError, KeyboardInterrupt):
return False
if reply not in ("", "y", "yes"):
if reply not in {"", "y", "yes"}:
return False
result = subprocess.run(

View file

@ -114,7 +114,7 @@ def cmd_proxy(args: Any) -> int:
return cmd_proxy_start(args)
if sub == "status":
return cmd_proxy_status(args)
if sub in ("providers", "list"):
if sub in {"providers", "list"}:
return cmd_proxy_list_providers(args)
# No subcommand → print short help.
print(

View file

@ -76,7 +76,7 @@ def _filter_response_headers(headers) -> dict:
if key.lower() in _HOP_BY_HOP_HEADERS:
continue
# aiohttp recomputes Content-Encoding/Content-Length on stream — let it.
if key.lower() in ("content-encoding", "content-length"):
if key.lower() in {"content-encoding", "content-length"}:
continue
out[key] = value
return out

View file

@ -209,7 +209,7 @@ def _maybe_apply_codex_app_server_runtime(
Returns the (possibly-rewritten) api_mode."""
if not model_cfg:
return api_mode
if provider not in ("openai", "openai-codex"):
if provider not in {"openai", "openai-codex"}:
return api_mode
runtime = str(model_cfg.get("openai_runtime") or "").strip().lower()
if runtime == "codex_app_server":

View file

@ -171,7 +171,7 @@ def _recent_window(
cut = 0
for i in range(len(messages) - 1, -1, -1):
msg = messages[i]
if isinstance(msg, Mapping) and msg.get("role") in ("user", "assistant"):
if isinstance(msg, Mapping) and msg.get("role") in {"user", "assistant"}:
count += 1
if count >= window:
cut = i

View file

@ -358,7 +358,7 @@ def generate_meme(template_id: str, texts: list[str], output_path: str) -> str:
img = _overlay_on_image(img, texts, fields)
output = Path(output_path)
if output.suffix.lower() in (".jpg", ".jpeg"):
if output.suffix.lower() in {".jpg", ".jpeg"}:
img = img.convert("RGB")
img.save(str(output), quality=95)
return str(output)
@ -378,7 +378,7 @@ def generate_from_image(
result = _overlay_on_image(img, texts, fields)
output = Path(output_path)
if output.suffix.lower() in (".jpg", ".jpeg"):
if output.suffix.lower() in {".jpg", ".jpeg"}:
result = result.convert("RGB")
result.save(str(output), quality=95)
return str(output)

View file

@ -43,7 +43,7 @@ def _parse_feed(xml_bytes: bytes):
entries = []
for item in root.iter():
tag = _strip_ns(item.tag)
if tag not in ("item", "entry"):
if tag not in {"item", "entry"}:
continue
# ElementTree Elements without children are *falsy* — use `is not None`.
children = {_strip_ns(c.tag): c for c in item}

View file

@ -125,7 +125,7 @@ def fetch_url(url: str, headers: dict | None = None, retries: int = MAX_RETRIES)
return json.loads(raw.decode("utf-8", errors="replace"))
except urllib.error.HTTPError as e:
last_err = e
if e.code in (404, 400):
if e.code in {404, 400}:
break # no point retrying
wait = BACKOFF_BASE ** attempt
time.sleep(wait)

View file

@ -95,11 +95,11 @@ def one_rep_max(weight, reps):
def macros(tdee_kcal, goal):
goal = goal.lower()
if goal in ("cut", "lose", "deficit"):
if goal in {"cut", "lose", "deficit"}:
cals = tdee_kcal - 500
p, f, c = 0.40, 0.30, 0.30
label = "Fat Loss (-500 kcal)"
elif goal in ("bulk", "gain", "surplus"):
elif goal in {"bulk", "gain", "surplus"}:
cals = tdee_kcal + 400
p, f, c = 0.30, 0.25, 0.45
label = "Lean Bulk (+400 kcal)"
@ -184,7 +184,7 @@ def main():
int(sys.argv[4]), sys.argv[5], int(sys.argv[6]),
)
elif cmd in ("1rm", "orm"):
elif cmd in {"1rm", "orm"}:
one_rep_max(float(sys.argv[2]), int(sys.argv[3]))
elif cmd == "macros":

View file

@ -610,7 +610,7 @@ def _is_secret_key(key: str) -> bool:
normalized = _normalize_secret_key(key)
if normalized == "token" or normalized.endswith("token"):
return True
if normalized in ("auth", "authorization"):
if normalized in {"auth", "authorization"}:
return True
return any(marker in normalized for marker in _SECRET_KEY_MARKERS)
@ -831,7 +831,7 @@ class Migrator:
# Flip the config-block flag when a conflict/error occurs on a
# config.yaml write. Later config-mutating options will skip rather
# than attempting a partial write.
if status in (STATUS_CONFLICT, STATUS_ERROR) and destination is not None:
if status in {STATUS_CONFLICT, STATUS_ERROR} and destination is not None:
dest_str = str(destination)
if dest_str.endswith("config.yaml") or dest_str.endswith("config.yml"):
self._config_apply_blocked = True
@ -1526,7 +1526,7 @@ class Migrator:
api_key = resolve_secret_input(raw_key, openclaw_env)
if not api_key:
# Warn if a SecretRef with file/exec source was silently unresolvable
if isinstance(raw_key, dict) and raw_key.get("source") in ("file", "exec"):
if isinstance(raw_key, dict) and raw_key.get("source") in {"file", "exec"}:
self.record(
"provider-keys",
self.source_root / "openclaw.json",
@ -1736,7 +1736,7 @@ class Migrator:
tts_data: Dict[str, Any] = {}
provider = tts.get("provider")
if isinstance(provider, str) and provider in ("elevenlabs", "openai", "edge", "microsoft"):
if isinstance(provider, str) and provider in {"elevenlabs", "openai", "edge", "microsoft"}:
# OpenClaw renamed "edge" to "microsoft"; Hermes still uses "edge"
tts_data["provider"] = "edge" if provider == "microsoft" else provider
@ -2304,11 +2304,11 @@ class Migrator:
if defaults.get("thinkingDefault"):
# Map OpenClaw thinking -> Hermes reasoning_effort
thinking = defaults["thinkingDefault"]
if thinking in ("always", "high", "xhigh"):
if thinking in {"always", "high", "xhigh"}:
agent_cfg["reasoning_effort"] = "high"
elif thinking in ("auto", "medium", "adaptive"):
elif thinking in {"auto", "medium", "adaptive"}:
agent_cfg["reasoning_effort"] = "medium"
elif thinking in ("off", "low", "none", "minimal"):
elif thinking in {"off", "low", "none", "minimal"}:
agent_cfg["reasoning_effort"] = "low"
changes = True
@ -2626,8 +2626,8 @@ class Migrator:
if not isinstance(ch_cfg, dict):
continue
complex_keys = {k: v for k, v in ch_cfg.items()
if k not in ("botToken", "appToken", "allowFrom", "enabled")
and v and k not in ("requireMention", "autoThread")}
if k not in {"botToken", "appToken", "allowFrom", "enabled"}
and v and k not in {"requireMention", "autoThread"}}
if complex_keys:
complex_archive[ch_name] = complex_keys
@ -2671,7 +2671,7 @@ class Migrator:
# Archive remaining browser settings
advanced = {k: v for k, v in browser.items()
if k not in ("cdpUrl", "headless") and v}
if k not in {"cdpUrl", "headless"} and v}
if advanced and self.archive_dir:
if self.execute:
self.archive_dir.mkdir(parents=True, exist_ok=True)

View file

@ -109,7 +109,7 @@ def _config_lookup(*paths: tuple[str, ...], default: str = "") -> str:
node = None
break
node = node.get(key)
if node not in (None, "") and not isinstance(node, dict):
if node not in {None, ""} and not isinstance(node, dict):
return str(node)
return default

View file

@ -51,7 +51,7 @@ def main() -> int:
field = args.field
if field is None:
for k, v in vars(org).items():
if isinstance(v, str) and not k.startswith("_") and k not in ("id",):
if isinstance(v, str) and not k.startswith("_") and k not in {"id",}:
field = k
break
val = getattr(org, field, None) if field else None

View file

@ -185,7 +185,7 @@ def whois_lookup(domain):
for key, pat in patterns.items():
matches = re.findall(pat, raw, re.IGNORECASE)
if matches:
if key in ("name_servers", "status"):
if key in {"name_servers", "status"}:
result[key] = list(dict.fromkeys(m.strip().lower() for m in matches))
else:
result[key] = matches[0].strip()

View file

@ -60,7 +60,7 @@ def get(
f"HTTP 429 rate-limited by {urllib.parse.urlsplit(url).netloc}. "
f"Slow down or supply a real API key. Body: {body[:300]}"
) from e
if e.code in (500, 502, 503, 504) and attempt < max_retries:
if e.code in {500, 502, 503, 504} and attempt < max_retries:
retry_after = e.headers.get("Retry-After") if e.headers else None
wait = float(retry_after) if (retry_after and retry_after.isdigit()) else backoff ** (attempt + 1)
time.sleep(wait)

View file

@ -122,7 +122,7 @@ def fetch(
with zipfile.ZipFile(zip_path) as zf:
for node_type, csv_substring in targets:
relevant_needles = [n for (k, n) in needles if k in (node_type, "Entity", "Officer")] or []
relevant_needles = [n for (k, n) in needles if k in {node_type, "Entity", "Officer"}] or []
# Only scan a CSV if we have a needle that could plausibly match it,
# or if we have ONLY a jurisdiction filter.
applicable_needles = [n for (k, n) in needles if k == node_type]

View file

@ -222,7 +222,7 @@ def _fmt_summary(summary: Dict[str, Any]) -> str:
def _handle_slash(raw_args: str) -> Optional[str]:
argv = raw_args.strip().split()
if not argv or argv[0] in ("help", "-h", "--help"):
if not argv or argv[0] in {"help", "-h", "--help"}:
return _HELP_TEXT
sub = argv[0]

View file

@ -72,7 +72,7 @@ def register(ctx) -> None:
# tested path there and guest-join Chromium is flakier. Refuse to register
# rather than half-working.
system = platform.system().lower()
if system not in ("linux", "darwin"):
if system not in {"linux", "darwin"}:
logger.info(
"google_meet plugin: platform=%s not supported (linux/macos only)",
system,

View file

@ -159,7 +159,7 @@ def _cmd_setup() -> int:
print("---------------------")
system = _p.system()
system_ok = system in ("Linux", "Darwin")
system_ok = system in {"Linux", "Darwin"}
print(f" platform : {system} [{'ok' if system_ok else 'unsupported'}]")
try:
@ -231,7 +231,7 @@ def _cmd_install(*, realtime: bool, assume_yes: bool) -> int:
import subprocess as _sp
system = _p.system()
if system not in ("Linux", "Darwin"):
if system not in {"Linux", "Darwin"}:
print(f"google_meet install: {system} is not supported (linux/macos only)")
return 1
@ -242,7 +242,7 @@ def _cmd_install(*, realtime: bool, assume_yes: bool) -> int:
ans = input(f"{prompt} [y/N] ").strip().lower()
except EOFError:
return False
return ans in ("y", "yes")
return ans in {"y", "yes"}
print("google_meet install")
print("-------------------")

View file

@ -447,7 +447,7 @@ def _mac_audio_device_index(device_name: str) -> str:
def run_bot() -> int: # noqa: C901 — orchestration, explicit branches
url = os.environ.get("HERMES_MEET_URL", "").strip()
out_dir_env = os.environ.get("HERMES_MEET_OUT_DIR", "").strip()
headed = os.environ.get("HERMES_MEET_HEADED", "").lower() in ("1", "true", "yes")
headed = os.environ.get("HERMES_MEET_HEADED", "").lower() in {"1", "true", "yes"}
auth_state = os.environ.get("HERMES_MEET_AUTH_STATE", "").strip()
guest_name = os.environ.get("HERMES_MEET_GUEST_NAME", "Hermes Agent")
duration_s = _parse_duration(os.environ.get("HERMES_MEET_DURATION", ""))
@ -808,7 +808,7 @@ def _looks_like_human_speaker(speaker: str, bot_guest_name: str) -> bool:
if not speaker or not speaker.strip():
return False
spk = speaker.strip().lower()
if spk in ("unknown", "you", bot_guest_name.strip().lower()):
if spk in {"unknown", "you", bot_guest_name.strip().lower()}:
return False
return True

View file

@ -103,7 +103,7 @@ def node_command(args: argparse.Namespace) -> int:
print(f"removed {args.name!r}" if ok else f"no such node: {args.name!r}")
return 0 if ok else 1
if cmd in ("status", "ping"):
if cmd in {"status", "ping"}:
entry = reg.get(args.name)
if entry is None:
print(f"no such node: {args.name!r}", file=sys.stderr)

View file

@ -183,7 +183,7 @@ class RealtimeSession:
rid = (frame.get("response") or {}).get("id")
if rid:
self._last_response_id = rid
elif ftype in ("response.done", "response.completed", "response.cancelled"):
elif ftype in {"response.done", "response.completed", "response.cancelled"}:
break
elif ftype == "error":
err = frame.get("error") or frame

View file

@ -36,7 +36,7 @@ def check_meet_requirements() -> bool:
handlers relax the requirement when a node is addressed.
"""
import platform as _p
if _p.system().lower() not in ("linux", "darwin"):
if _p.system().lower() not in {"linux", "darwin"}:
return False
try:
import playwright # noqa: F401
@ -238,7 +238,7 @@ def handle_meet_join(args: Dict[str, Any], **_kw) -> str:
if not url:
return _err("url is required")
mode = (args.get("mode") or "transcribe").strip().lower()
if mode not in ("transcribe", "realtime"):
if mode not in {"transcribe", "realtime"}:
return _err(f"mode must be 'transcribe' or 'realtime' (got {mode!r})")
node = args.get("node")

View file

@ -628,7 +628,7 @@ def update_task(task_id: str, payload: UpdateTaskBody, board: Optional[str] = Qu
status_code=400,
detail="Cannot set status to 'running' directly; use the dispatcher/claim path",
)
elif s in ("todo", "triage"):
elif s in {"todo", "triage"}:
ok = _set_status_direct(conn, task_id, s)
else:
raise HTTPException(status_code=400, detail=f"unknown status: {s}")
@ -742,7 +742,7 @@ def _set_status_direct(
(task_id, run_id, json.dumps({"status": new_status}), int(time.time())),
)
# If we re-opened something, children may have gone stale.
if new_status in ("done", "ready"):
if new_status in {"done", "ready"}:
kanban_db.recompute_ready(conn)
return True
@ -868,7 +868,7 @@ def bulk_update(payload: BulkTaskBody, board: Optional[str] = Query(None)):
ok = kanban_db.unblock_task(conn, tid)
else:
ok = _set_status_direct(conn, tid, "ready")
elif s in ("todo", "running", "triage"):
elif s in {"todo", "running", "triage"}:
ok = _set_status_direct(conn, tid, s)
else:
entry.update(ok=False, error=f"unknown status {s!r}")

View file

@ -263,7 +263,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes to ByteRover."""
if action not in ("add", "replace") or not content:
if action not in {"add", "replace"} or not content:
return
def _write():
@ -289,7 +289,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
for msg in messages[-10:]: # last 10 messages
role = msg.get("role", "")
content = msg.get("content", "")
if isinstance(content, str) and content.strip() and role in ("user", "assistant"):
if isinstance(content, str) and content.strip() and role in {"user", "assistant"}:
parts.append(f"{role}: {content[:500]}")
if not parts:

View file

@ -416,7 +416,7 @@ def _build_embedded_profile_env(config: dict[str, Any], *, llm_api_key: str | No
current_base_url = config.get("llm_base_url") or os.environ.get("HINDSIGHT_API_LLM_BASE_URL", "")
# The embedded daemon expects OpenAI wire format for these providers.
daemon_provider = "openai" if current_provider in ("openai_compatible", "openrouter") else current_provider
daemon_provider = "openai" if current_provider in {"openai_compatible", "openrouter"} else current_provider
env_values = {
"HINDSIGHT_API_LLM_PROVIDER": str(daemon_provider),
@ -596,7 +596,7 @@ class HindsightMemoryProvider(MemoryProvider):
try:
cfg = _load_config()
mode = cfg.get("mode", "cloud")
if mode in ("local", "local_embedded"):
if mode in {"local", "local_embedded"}:
available, _ = _check_local_runtime()
return available
if mode == "local_external":
@ -888,7 +888,7 @@ class HindsightMemoryProvider(MemoryProvider):
from hindsight import HindsightEmbedded
HindsightEmbedded.__del__ = lambda self: None
llm_provider = self._config.get("llm_provider", "")
if llm_provider in ("openai_compatible", "openrouter"):
if llm_provider in {"openai_compatible", "openrouter"}:
llm_provider = "openai"
logger.debug("Creating HindsightEmbedded client (profile=%s, provider=%s)",
self._config.get("profile", "hermes"), llm_provider)
@ -1132,7 +1132,7 @@ class HindsightMemoryProvider(MemoryProvider):
self._mode = "disabled"
return
self._api_key = self._config.get("apiKey") or self._config.get("api_key") or os.environ.get("HINDSIGHT_API_KEY", "")
default_url = _DEFAULT_LOCAL_URL if self._mode in ("local_embedded", "local_external") else _DEFAULT_API_URL
default_url = _DEFAULT_LOCAL_URL if self._mode in {"local_embedded", "local_external"} else _DEFAULT_API_URL
self._api_url = self._config.get("api_url") or os.environ.get("HINDSIGHT_API_URL", default_url)
self._llm_base_url = self._config.get("llm_base_url", "")
@ -1152,10 +1152,10 @@ class HindsightMemoryProvider(MemoryProvider):
self._budget = budget if budget in _VALID_BUDGETS else "mid"
memory_mode = self._config.get("memory_mode", "hybrid")
self._memory_mode = memory_mode if memory_mode in ("context", "tools", "hybrid") else "hybrid"
self._memory_mode = memory_mode if memory_mode in {"context", "tools", "hybrid"} else "hybrid"
prefetch_method = self._config.get("recall_prefetch_method") or self._config.get("prefetch_method", "recall")
self._prefetch_method = prefetch_method if prefetch_method in ("recall", "reflect") else "recall"
self._prefetch_method = prefetch_method if prefetch_method in {"recall", "reflect"} else "recall"
# Bank options
self._bank_mission = self._config.get("bank_mission", "")

View file

@ -283,7 +283,7 @@ class HonchoMemoryProvider(MemoryProvider):
# ----- Port #4053: cron guard -----
agent_context = kwargs.get("agent_context", "")
platform = kwargs.get("platform", "cli")
if agent_context in ("cron", "flush") or platform == "cron":
if agent_context in {"cron", "flush"} or platform == "cron":
logger.debug("Honcho skipped: cron/flush context (agent_context=%s, platform=%s)",
agent_context, platform)
self._cron_skipped = True
@ -404,7 +404,7 @@ class HonchoMemoryProvider(MemoryProvider):
# pop_context_result() in prefetch(). Dialectic prewarm runs the
# full configured depth and writes into _prefetch_result so turn 1
# consumes the result directly.
if self._recall_mode in ("context", "hybrid"):
if self._recall_mode in {"context", "hybrid"}:
try:
self._manager.prefetch_context(self._session_key)
except Exception as e:

View file

@ -233,7 +233,7 @@ _profile_override: str | None = None
def _host_key() -> str:
"""Return the active Honcho host key, derived from the current Hermes profile."""
if _profile_override:
if _profile_override in ("default", "custom"):
if _profile_override in {"default", "custom"}:
return HOST
return f"{HOST}.{_profile_override}"
return resolve_active_host()
@ -295,13 +295,13 @@ def _resolve_api_key(cfg: dict) -> str:
parsed = urlparse(base_url)
except (TypeError, ValueError):
parsed = None
if parsed and parsed.scheme in ("http", "https") and parsed.netloc:
if parsed and parsed.scheme in {"http", "https"} and parsed.netloc:
return "local"
# Schemeless but looks like a host (contains '.' or ':' and isn't
# a boolean literal): let it through so legacy configs don't
# regress into "no API key configured" when they previously worked.
lowered = base_url.lower()
if lowered not in ("true", "false", "none", "null") and any(
if lowered not in {"true", "false", "none", "null"} and any(
c in base_url for c in ".:"
) and not base_url.isdigit():
return "local"
@ -334,7 +334,7 @@ def _ensure_sdk_installed() -> bool:
print(" honcho-ai is not installed.")
answer = _prompt("Install it now? (honcho-ai>=2.0.1)", default="y")
if answer.lower() not in ("y", "yes"):
if answer.lower() not in {"y", "yes"}:
print(" Skipping install. Run: pip install 'honcho-ai>=2.0.1'\n")
return False
@ -382,7 +382,7 @@ def cmd_setup(args) -> None:
for h in ("localhost", "127.0.0.1", "::1")
) else "cloud"
deploy = _prompt("Cloud or local?", default=current_deploy)
is_local = deploy.lower() in ("local", "l")
is_local = deploy.lower() in {"local", "l"}
# Clean up legacy snake_case key
cfg.pop("base_url", None)
@ -441,7 +441,7 @@ def cmd_setup(args) -> None:
print(" directional -- all observations on, each AI peer builds its own view (default)")
print(" unified -- shared pool, user observes self, AI observes others only")
new_obs = _prompt("Observation mode", default=current_obs)
if new_obs in ("unified", "directional"):
if new_obs in {"unified", "directional"}:
hermes_host["observationMode"] = new_obs
else:
hermes_host["observationMode"] = "directional"
@ -457,17 +457,17 @@ def cmd_setup(args) -> None:
try:
hermes_host["writeFrequency"] = int(new_wf)
except (ValueError, TypeError):
hermes_host["writeFrequency"] = new_wf if new_wf in ("async", "turn", "session") else "async"
hermes_host["writeFrequency"] = new_wf if new_wf in {"async", "turn", "session"} else "async"
# --- 6. Recall mode ---
_raw_recall = hermes_host.get("recallMode") or cfg.get("recallMode", "hybrid")
current_recall = "hybrid" if _raw_recall not in ("hybrid", "context", "tools") else _raw_recall
current_recall = "hybrid" if _raw_recall not in {"hybrid", "context", "tools"} else _raw_recall
print("\n Recall mode:")
print(" hybrid -- auto-injected context + Honcho tools available (default)")
print(" context -- auto-injected context only, Honcho tools hidden")
print(" tools -- Honcho tools only, no auto-injected context")
new_recall = _prompt("Recall mode", default=current_recall)
if new_recall in ("hybrid", "context", "tools"):
if new_recall in {"hybrid", "context", "tools"}:
hermes_host["recallMode"] = new_recall
# --- 7. Context token budget ---
@ -477,7 +477,7 @@ def cmd_setup(args) -> None:
print(" uncapped -- no limit (default)")
print(" N -- token limit per turn (e.g. 1200)")
new_ctx_tokens = _prompt("Context tokens", default=current_display)
if new_ctx_tokens.strip().lower() in ("none", "uncapped", "no limit"):
if new_ctx_tokens.strip().lower() in {"none", "uncapped", "no limit"}:
hermes_host.pop("contextTokens", None)
elif new_ctx_tokens.strip() == "":
pass # keep current
@ -517,7 +517,7 @@ def cmd_setup(args) -> None:
print(" high -- complex behavioral patterns")
print(" max -- thorough audit-level analysis")
new_reasoning = _prompt("Reasoning level", default=current_reasoning)
if new_reasoning in ("minimal", "low", "medium", "high", "max"):
if new_reasoning in {"minimal", "low", "medium", "high", "max"}:
hermes_host["dialecticReasoningLevel"] = new_reasoning
else:
hermes_host["dialecticReasoningLevel"] = "low"
@ -530,7 +530,7 @@ def cmd_setup(args) -> None:
print(" per-repo -- one session per git repository")
print(" global -- single session across all directories")
new_strat = _prompt("Session strategy", default=current_strat)
if new_strat in ("per-session", "per-repo", "per-directory", "global"):
if new_strat in {"per-session", "per-repo", "per-directory", "global"}:
hermes_host["sessionStrategy"] = new_strat
hermes_host["enabled"] = True
@ -1130,7 +1130,7 @@ def cmd_migrate(args) -> None:
print(" Paste the key when prompted.")
print()
answer = _prompt(" Run 'hermes honcho setup' now?", default="y")
if answer.lower() in ("y", "yes"):
if answer.lower() in {"y", "yes"}:
cmd_setup(args)
cfg = _read_config()
has_key = bool(cfg.get("apiKey", ""))
@ -1176,7 +1176,7 @@ def cmd_migrate(args) -> None:
print(" hermes honcho migrate — this step handles it interactively")
if has_key:
answer = _prompt(" Upload user memory files to Honcho now?", default="y")
if answer.lower() in ("y", "yes"):
if answer.lower() in {"y", "yes"}:
try:
from plugins.memory.honcho.client import (
HonchoClientConfig,
@ -1226,7 +1226,7 @@ def cmd_migrate(args) -> None:
print()
if has_key:
answer = _prompt(" Seed AI identity from all detected files now?", default="y")
if answer.lower() in ("y", "yes"):
if answer.lower() in {"y", "yes"}:
try:
from plugins.memory.honcho.client import (
HonchoClientConfig,

View file

@ -47,7 +47,7 @@ def resolve_active_host() -> str:
try:
from hermes_cli.profiles import get_active_profile_name
profile = get_active_profile_name()
if profile and profile not in ("default", "custom"):
if profile and profile not in {"default", "custom"}:
return f"{HOST}.{profile}"
except Exception:
pass
@ -653,7 +653,7 @@ class HonchoClientConfig:
return base
# per-directory: one Honcho session per working directory (default)
if self.session_strategy in ("per-directory", "per-session"):
if self.session_strategy in {"per-directory", "per-session"}:
base = Path(cwd).name
if self.session_peer_prefix and self.peer_name:
return f"{self.peer_name}-{base}"

View file

@ -357,7 +357,7 @@ def _is_windows_absolute_path(value: str) -> bool:
len(value) >= 3
and value[0].isalpha()
and value[1] == ":"
and value[2] in ("/", "\\")
and value[2] in {"/", "\\"}
)
@ -381,7 +381,7 @@ def _is_local_path_reference(value: str) -> bool:
def _path_from_file_uri(uri: str) -> Path | str:
parsed = urlparse(uri)
if parsed.netloc not in ("", "localhost"):
if parsed.netloc not in {"", "localhost"}:
return f"Unsupported non-local file URI: {uri}"
return Path(url2pathname(parsed.path)).expanduser()
@ -755,7 +755,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
level = args.get("level", "overview")
summary_level = level in ("abstract", "overview")
summary_level = level in {"abstract", "overview"}
# OpenViking expects directory URIs for pseudo summary files
# (e.g. viking://user/hermes/.overview.md).
resolved_uri = self._normalize_summary_uri(uri) if summary_level else uri
@ -832,7 +832,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
result = self._unwrap_result(resp)
# Format list/tree results for readability
if action in ("list", "tree"):
if action in {"list", "tree"}:
raw_entries = result
if isinstance(result, dict):
raw_entries = result.get("entries") or result.get("items") or result.get("children") or []
@ -887,7 +887,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
payload: Dict[str, Any] = {}
for key in ("reason", "to", "parent", "instruction", "wait", "timeout"):
if key in args and args[key] not in (None, ""):
if key in args and args[key] not in {None, ""}:
payload[key] = args[key]
parsed_url = urlparse(url)

View file

@ -88,9 +88,9 @@ def _as_bool(value: Any, default: bool) -> bool:
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("true", "1", "yes", "y", "on"):
if lowered in {"true", "1", "yes", "y", "on"}:
return True
if lowered in ("false", "0", "no", "n", "off"):
if lowered in {"false", "0", "no", "n", "off"}:
return False
return default
@ -508,7 +508,7 @@ class SupermemoryMemoryProvider(MemoryProvider):
self._allowed_containers = [self._container_tag] + list(self._custom_containers)
agent_context = kwargs.get("agent_context", "")
self._write_enabled = agent_context not in ("cron", "flush", "subagent")
self._write_enabled = agent_context not in {"cron", "flush", "subagent"}
self._active = bool(self._api_key)
self._client = None
if self._active:
@ -598,7 +598,7 @@ class SupermemoryMemoryProvider(MemoryProvider):
cleaned = []
for message in messages or []:
role = message.get("role")
if role not in ("user", "assistant"):
if role not in {"user", "assistant"}:
continue
content = _clean_text_for_capture(str(message.get("content", "")))
if content:

View file

@ -74,9 +74,9 @@ class DeepSeekProfile(ProviderProfile):
# its server default (currently high).
if isinstance(reasoning_config, dict):
effort = (reasoning_config.get("effort") or "").strip().lower()
if effort in ("xhigh", "max"):
if effort in {"xhigh", "max"}:
top_level["reasoning_effort"] = "max"
elif effort in ("low", "medium", "high"):
elif effort in {"low", "medium", "high"}:
top_level["reasoning_effort"] = effort
return extra_body, top_level

View file

@ -37,7 +37,7 @@ class KimiProfile(ProviderProfile):
# Enabled
extra_body["thinking"] = {"type": "enabled"}
effort = (reasoning_config.get("effort") or "").strip().lower()
if effort in ("low", "medium", "high"):
if effort in {"low", "medium", "high"}:
top_level["reasoning_effort"] = effort
else:
top_level["reasoning_effort"] = "medium"

View file

@ -1539,7 +1539,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
if sender_email and space_name:
self._last_sender_by_chat[space_name] = sender_email.strip().lower()
chat_type = "dm" if space_type in ("DIRECT_MESSAGE", "DM") else "group"
chat_type = "dm" if space_type in {"DIRECT_MESSAGE", "DM"} else "group"
text = msg.get("argumentText") or msg.get("text") or ""
text = text.strip()
@ -1935,7 +1935,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
return True
except HttpError as exc:
status = getattr(getattr(exc, "resp", None), "status", None)
if status in (403, 404):
if status in {403, 404}:
return False
logger.debug(
"[GoogleChat] delete_message failed: %s",
@ -1958,7 +1958,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
update_mask = ",".join(update_mask_fields) or "text"
# Patch body cannot carry thread (immutable).
patch_body = {k: v for k, v in body.items() if k not in ("thread",)}
patch_body = {k: v for k, v in body.items() if k not in {"thread",}}
def _do_patch() -> Dict[str, Any]:
return (
@ -2791,7 +2791,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
upload_resp = await asyncio.to_thread(_upload)
except HttpError as exc:
status = getattr(getattr(exc, "resp", None), "status", None)
if status in (401, 403):
if status in {401, 403}:
logger.warning(
"[GoogleChat] media.upload auth failure for identity=%s "
"(token revoked or scope missing) — falling back to "
@ -2927,7 +2927,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
display = info.get("displayName") or chat_id
return {
"name": display,
"type": "dm" if space_type in ("DIRECT_MESSAGE", "DM") else "group",
"type": "dm" if space_type in {"DIRECT_MESSAGE", "DM"} else "group",
"chat_id": chat_id,
}

View file

@ -112,7 +112,7 @@ class IRCAdapter(BasePlatformAdapter):
self.nickname = os.getenv("IRC_NICKNAME") or extra.get("nickname", "hermes-bot")
self.channel = os.getenv("IRC_CHANNEL") or extra.get("channel", "")
self.use_tls = (
os.getenv("IRC_USE_TLS", "").lower() in ("1", "true", "yes")
os.getenv("IRC_USE_TLS", "").lower() in {"1", "true", "yes"}
if os.getenv("IRC_USE_TLS")
else extra.get("use_tls", True)
)
@ -680,7 +680,7 @@ def _env_enablement() -> dict | None:
seed["nickname"] = nickname
use_tls = os.getenv("IRC_USE_TLS", "").strip().lower()
if use_tls:
seed["use_tls"] = use_tls in ("1", "true", "yes")
seed["use_tls"] = use_tls in {"1", "true", "yes"}
# Passwords live in PlatformConfig.extra as well for back-compat with
# existing config.yaml users; env-reads at construct time still win.
if os.getenv("IRC_SERVER_PASSWORD"):
@ -756,7 +756,7 @@ async def _standalone_send(
nickname = os.getenv("IRC_NICKNAME") or extra.get("nickname", "hermes-bot")
use_tls_env = os.getenv("IRC_USE_TLS")
if use_tls_env is not None:
use_tls = use_tls_env.lower() in ("1", "true", "yes")
use_tls = use_tls_env.lower() in {"1", "true", "yes"}
else:
use_tls = bool(extra.get("use_tls", True))
@ -821,7 +821,7 @@ async def _standalone_send(
await _raw(f"PONG :{payload}")
elif cmd == "001":
registered = True
elif cmd in ("432", "433"):
elif cmd in {"432", "433"}:
nick_attempts += 1
if nick_attempts > max_nick_attempts:
return {"error": "IRC standalone send: too many nick collisions"}
@ -829,7 +829,7 @@ async def _standalone_send(
# mutated value, so the suffix stays bounded.
standalone_nick = f"{nick_base}-cron-{nick_attempts}"[:30]
await _raw(f"NICK {standalone_nick}")
elif cmd in ("464", "465"):
elif cmd in {"464", "465"}:
return {"error": f"IRC standalone send: server rejected client ({cmd})"}
if nickserv_password:
@ -860,9 +860,9 @@ async def _standalone_send(
if jcmd == "PING":
payload = jmsg["params"][0] if jmsg["params"] else ""
await _raw(f"PONG :{payload}")
elif jcmd in ("366", "JOIN"):
elif jcmd in {"366", "JOIN"}:
joined = True
elif jcmd in ("403", "405", "471", "473", "474", "475"):
elif jcmd in {"403", "405", "471", "473", "474", "475"}:
return {"error": f"IRC standalone send: JOIN {target} rejected ({jcmd})"}
# Bytes-aware per-line splitting so multi-line plain text never

View file

@ -325,7 +325,7 @@ class RequestCache:
def mark_delivered(self, request_id: str) -> None:
entry = self._entries.get(request_id)
if entry is None or entry.state not in (State.READY, State.ERROR):
if entry is None or entry.state not in {State.READY, State.ERROR}:
return
entry.state = State.DELIVERED
entry.updated_at = time.time()
@ -614,7 +614,7 @@ def _truthy_env(name: str, default: bool = False) -> bool:
v = os.getenv(name)
if v is None:
return default
return v.strip().lower() in ("1", "true", "yes", "on")
return v.strip().lower() in {"1", "true", "yes", "on"}
# ---------------------------------------------------------------------------
@ -910,7 +910,7 @@ class LineAdapter(BasePlatformAdapter):
await self._handle_message_event(event)
elif event_type == "postback":
await self._handle_postback_event(event)
elif event_type in ("follow", "unfollow", "join", "leave"):
elif event_type in {"follow", "unfollow", "join", "leave"}:
logger.info("LINE: lifecycle event %s from %s", event_type, source)
else:
logger.debug("LINE: ignoring event type %r", event_type)
@ -939,7 +939,7 @@ class LineAdapter(BasePlatformAdapter):
if msg_type == "text":
text = msg.get("text", "") or ""
elif msg_type in ("image", "audio", "video", "file"):
elif msg_type in {"image", "audio", "video", "file"}:
local_path = await self._download_media(message_id, msg_type)
if local_path:
media_urls.append(local_path)

View file

@ -101,11 +101,11 @@ def _guess_extension(data: bytes) -> str:
def _is_image_ext(ext: str) -> bool:
return ext.lower() in (".jpg", ".jpeg", ".png", ".gif", ".webp")
return ext.lower() in {".jpg", ".jpeg", ".png", ".gif", ".webp"}
def _is_audio_ext(ext: str) -> bool:
return ext.lower() in (".mp3", ".wav", ".ogg", ".m4a", ".aac")
return ext.lower() in {".mp3", ".wav", ".ogg", ".m4a", ".aac"}
# ---------------------------------------------------------------------------
@ -326,12 +326,12 @@ class SimplexAdapter(BasePlatformAdapter):
# Filter out messages sent by us (direction == "snd")
meta = chat_item.get("meta") or {}
direction = (meta.get("itemStatus") or {}).get("type", "")
if direction in ("sndSent", "sndSentDirect", "sndSentViaProxy", "sndNew"):
if direction in {"sndSent", "sndSentDirect", "sndSentViaProxy", "sndNew"}:
return
# Determine chat type and IDs
chat_type_raw = chat_info.get("type", "")
is_group = chat_type_raw in ("group", "groupInfo")
is_group = chat_type_raw in {"group", "groupInfo"}
if is_group:
group_info = chat_info.get("groupInfo") or chat_info.get("group") or {}
@ -374,7 +374,7 @@ class SimplexAdapter(BasePlatformAdapter):
media_urls: List[str] = []
media_types: List[str] = []
file_info = chat_item.get("file") or {}
if file_info and file_info.get("fileStatus") not in ("cancelled", "error"):
if file_info and file_info.get("fileStatus") not in {"cancelled", "error"}:
file_id = file_info.get("fileId")
file_name = file_info.get("fileName", "file")
if file_id:

View file

@ -841,7 +841,7 @@ class TeamsAdapter(BasePlatformAdapter):
# bot silently treated every clicker as authorized — meaning any
# Teams user who could message the bot could approve dangerous commands.
allowed_csv = os.getenv("TEAMS_ALLOWED_USERS", "").strip()
allow_all = os.getenv("TEAMS_ALLOW_ALL_USERS", "").strip().lower() in ("1", "true", "yes")
allow_all = os.getenv("TEAMS_ALLOW_ALL_USERS", "").strip().lower() in {"1", "true", "yes"}
if not allow_all:
if not allowed_csv:

View file

@ -99,15 +99,15 @@ def teams_pipeline_command(args: argparse.Namespace) -> int:
return 2
try:
if action in ("list", "ls"):
if action in {"list", "ls"}:
_cmd_list(args)
elif action == "show":
_cmd_show(args)
elif action in ("run", "replay"):
elif action in {"run", "replay"}:
_cmd_run(args)
elif action in ("fetch", "test"):
elif action in {"fetch", "test"}:
_cmd_fetch(args)
elif action in ("subscriptions", "subs"):
elif action in {"subscriptions", "subs"}:
_cmd_subscriptions(args)
elif action == "subscribe":
_cmd_subscribe(args)
@ -117,7 +117,7 @@ def teams_pipeline_command(args: argparse.Namespace) -> int:
_cmd_delete_subscription(args)
elif action == "maintain-subscriptions":
_cmd_maintain_subscriptions(args)
elif action in ("token-health", "token"):
elif action in {"token-health", "token"}:
_cmd_token_health(args)
elif action == "validate":
_cmd_validate(args)

View file

@ -33,7 +33,7 @@ def _meeting_path(meeting_ref: TeamsMeetingRef | str) -> str:
def _wrap_graph_error(exc: MicrosoftGraphAPIError, *, missing_message: str) -> TeamsMeetingError:
if exc.status_code in (401, 403):
if exc.status_code in {401, 403}:
return TeamsMeetingPermissionError(str(exc))
if exc.status_code == 404:
return TeamsMeetingNotFoundError(missing_message)
@ -286,7 +286,7 @@ async def fetch_call_record_artifact(
try:
payload = await client.get_json(f"/communications/callRecords/{quote(call_record_id, safe='')}")
except MicrosoftGraphAPIError as exc:
if exc.status_code in (401, 403) and allow_permission_errors:
if exc.status_code in {401, 403} and allow_permission_errors:
return None
if exc.status_code == 404:
return None

View file

@ -145,7 +145,7 @@ class MeetingArtifact:
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if self.artifact_type not in ("transcript", "recording", "call_record"):
if self.artifact_type not in {"transcript", "recording", "call_record"}:
raise ValueError(
"MeetingArtifact.artifact_type must be transcript, recording, or call_record."
)

View file

@ -62,7 +62,7 @@ def build_pipeline_runtime_config(gateway_config: Any) -> dict[str, Any]:
"chat_id",
):
value = teams_extra.get(key)
if value not in (None, ""):
if value not in {None, ""}:
teams_delivery[key] = value
if teams_delivery:

View file

@ -1365,7 +1365,7 @@ class AIAgent:
the existing 1M-context-beta branch handles them; revisit if other
subscription tiers start producing the same loop signature).
"""
if status_code not in (401, 403, None):
if status_code not in {401, 403, None}:
return False
if not isinstance(error_context, dict):
return False
@ -1774,7 +1774,7 @@ class AIAgent:
import os as _os
env = _os.environ.get("HERMES_FILE_MUTATION_VERIFIER")
if env is not None:
return env.strip().lower() not in ("0", "false", "no", "off")
return env.strip().lower() not in {"0", "false", "no", "off"}
# Read from the persisted config.yaml so gateway and CLI share
# the same setting. Import lazily to avoid a startup-time cycle.
try:

View file

@ -592,7 +592,7 @@ def _http_once(
# Build a new request with cleaned headers
clean_headers = {
k: v for k, v in req2.header_items()
if k.lower() not in ("x-api-key", "authorization", "cookie")
if k.lower() not in {"x-api-key", "authorization", "cookie"}
}
new_req = urllib.request.Request(newurl, headers=clean_headers, method="GET")
return new_req
@ -743,13 +743,13 @@ def safe_path_join(base: Path, *parts: str) -> Path:
def media_type_from_filename(filename: str) -> str:
ext = Path(filename).suffix.lower()
if ext in (".mp4", ".webm", ".avi", ".mov", ".mkv", ".gif", ".webp"):
if ext in {".mp4", ".webm", ".avi", ".mov", ".mkv", ".gif", ".webp"}:
return "video"
if ext in (".wav", ".mp3", ".flac", ".ogg", ".m4a"):
if ext in {".wav", ".mp3", ".flac", ".ogg", ".m4a"}:
return "audio"
if ext in (".glb", ".obj", ".ply", ".gltf"):
if ext in {".glb", ".obj", ".ply", ".gltf"}:
return "3d"
if ext in (".json", ".txt", ".md"):
if ext in {".json", ".txt", ".md"}:
return "text"
return "image"

View file

@ -81,7 +81,7 @@ def trace_to_node(workflow: dict, link: list, *, max_hops: int = 8) -> str | Non
return None
cls = node.get("class_type", "")
# Reroute / Primitive / passthrough wrappers
if cls in ("Reroute", "PrimitiveNode", "Note", "easy showAnything"):
if cls in {"Reroute", "PrimitiveNode", "Note", "easy showAnything"}:
inputs = node.get("inputs", {}) or {}
# Find first link-shaped input and follow it
next_link = next((v for v in inputs.values() if is_link(v)), None)
@ -105,7 +105,7 @@ def find_negative_prompt_node(workflow: dict) -> str | None:
src = trace_to_node(workflow, neg)
if src and isinstance(workflow.get(src), dict):
cls = workflow[src].get("class_type", "")
if cls.startswith("CLIPTextEncode") or cls in ("smZ CLIPTextEncode", "BNK_CLIPTextEncodeAdvanced"):
if cls.startswith("CLIPTextEncode") or cls in {"smZ CLIPTextEncode", "BNK_CLIPTextEncodeAdvanced"}:
return src
return None
@ -121,7 +121,7 @@ def find_positive_prompt_node(workflow: dict) -> str | None:
src = trace_to_node(workflow, pos)
if src and isinstance(workflow.get(src), dict):
cls = workflow[src].get("class_type", "")
if cls.startswith("CLIPTextEncode") or cls in ("smZ CLIPTextEncode", "BNK_CLIPTextEncodeAdvanced"):
if cls.startswith("CLIPTextEncode") or cls in {"smZ CLIPTextEncode", "BNK_CLIPTextEncodeAdvanced"}:
return src
return None

View file

@ -151,7 +151,7 @@ def main(argv: list[str] | None = None) -> int:
diag["source"] = res.get("source")
diag["prompt_id"] = args.prompt_id
emit_json(diag)
return 0 if diag.get("status_str") not in ("error",) else 1
return 0 if diag.get("status_str") not in {"error",} else 1
if __name__ == "__main__":

View file

@ -203,7 +203,7 @@ def detect_apple_silicon() -> dict | None:
def detect_intel_arc() -> dict | None:
if platform.system() not in ("Linux", "Windows"):
if platform.system() not in {"Linux", "Windows"}:
return None
if shutil.which("clinfo"):
out = _run(["clinfo", "--list"])

View file

@ -204,7 +204,7 @@ class ComfyRunner:
s = data.get("status")
if s == "completed":
return {"status": "success", "data": data}
if s in ("failed",):
if s in {"failed",}:
return {"status": "error", "data": data}
if s == "cancelled":
return {"status": "cancelled", "data": data}
@ -386,7 +386,7 @@ class ComfyRunner:
# local path; otherwise put the file in output_dir flat.
target_parts: list[str] = []
if preserve_subfolder and subfolder:
target_parts.extend(p for p in subfolder.split("/") if p and p not in (".", ".."))
target_parts.extend(p for p in subfolder.split("/") if p and p not in {".", ".."})
target_parts.append(filename)
out_path = safe_path_join(output_dir, *target_parts)
@ -467,7 +467,7 @@ def inject_params(
# Auto-randomize seed when it's -1 in args, or when randomize_seed_if_unset
# and user didn't pass a seed.
if "seed" in params:
if "seed" in args and args["seed"] in (None, -1, "-1"):
if "seed" in args and args["seed"] in {None, -1, "-1"}:
args = dict(args)
args["seed"] = coerce_seed(args["seed"])
warnings.append(f"seed=-1 expanded to {args['seed']}")

View file

@ -170,7 +170,7 @@ def main(argv: list[str] | None = None) -> int:
parsed = parse_binary_frame(msg)
if parsed is None:
continue
if parsed["kind"] in ("preview", "preview_with_metadata") and preview_dir:
if parsed["kind"] in {"preview", "preview_with_metadata"} and preview_dir:
img_bytes = parsed.get("image_bytes", b"")
if img_bytes:
ext = parsed.get("ext", "png")

View file

@ -53,7 +53,7 @@ class TestCloudEndpointsLive:
url = resolve_url("https://cloud.comfy.org", "/object_info")
r = http_get(url, headers={"X-API-Key": cloud_key})
# Should be either 200 (paid) or 403 (free) — not 404 / 500
assert r.status in (200, 403)
assert r.status in {200, 403}
if r.status == 403:
# Body should mention the limitation
assert "free tier" in r.text().lower() or "subscription" in r.text().lower()

View file

@ -40,7 +40,7 @@ class TestConnectionTracing:
}
# Should hit max_hops without infinite loop
result = trace_to_node(wf, ["1", 0], max_hops=5)
assert result in ("1", "2") # any node, just don't hang
assert result in {"1", "2"} # any node, just don't hang
class TestPositiveNegativeDetection:

View file

@ -721,7 +721,7 @@ def drive_share(args):
"type": args.type,
"role": args.role,
}
if args.type in ("user", "group"):
if args.type in {"user", "group"}:
if not args.email:
print("ERROR: --email is required for type=user or type=group", file=sys.stderr)
sys.exit(1)

View file

@ -181,7 +181,7 @@ def http_get(url, params=None, retries=MAX_RETRIES, silent=False):
return json.loads(raw)
except urllib.error.HTTPError as exc:
last_error = f"HTTP {exc.code}: {exc.reason} for {url}"
if exc.code in (429, 503, 502, 504):
if exc.code in {429, 503, 502, 504}:
time.sleep(RETRY_DELAY * attempt)
else:
if silent:
@ -217,7 +217,7 @@ def http_get_text(url, params=None, retries=MAX_RETRIES, silent=False):
return resp.read().decode("utf-8")
except urllib.error.HTTPError as exc:
last_error = f"HTTP {exc.code}: {exc.reason} for {url}"
if exc.code in (429, 503, 502, 504):
if exc.code in {429, 503, 502, 504}:
time.sleep(RETRY_DELAY * attempt)
else:
if silent:
@ -256,7 +256,7 @@ def http_post(url, data_str, retries=MAX_RETRIES):
return json.loads(raw)
except urllib.error.HTTPError as exc:
last_error = f"HTTP {exc.code}: {exc.reason}"
if exc.code in (429, 503, 502, 504):
if exc.code in {429, 503, 502, 504}:
time.sleep(RETRY_DELAY * attempt)
else:
error_exit(last_error)
@ -459,8 +459,8 @@ def parse_overpass_elements(elements, ref_lat=None, ref_lon=None):
"maps_url": f"https://www.google.com/maps/search/?api=1&query={el_lat},{el_lon}",
"tags": {
k: v for k, v in tags.items()
if k not in ("name", "name:en",
"addr:housenumber", "addr:street", "addr:city")
if k not in {"name", "name:en",
"addr:housenumber", "addr:street", "addr:city"}
},
}

View file

@ -63,7 +63,7 @@ def check_requirements():
if __name__ == "__main__":
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help"):
if not args or args[0] in {"-h", "--help"}:
print(__doc__)
sys.exit(0)

View file

@ -68,7 +68,7 @@ def show_metadata(path):
if __name__ == "__main__":
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help"):
if not args or args[0] in {"-h", "--help"}:
print(__doc__)
sys.exit(0)

View file

@ -81,7 +81,7 @@ def search(query=None, author=None, category=None, ids=None, max_results=5, sort
if __name__ == "__main__":
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help"):
if not args or args[0] in {"-h", "--help"}:
print(__doc__)
sys.exit(0)

View file

@ -233,7 +233,7 @@ def cmd_trades(limit: int = 10, market: str = None):
def main():
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help", "help"):
if not args or args[0] in {"-h", "--help", "help"}:
print(__doc__)
return

View file

@ -91,7 +91,7 @@ def main():
if msg.get("method") == "workspace/didChangeWatchedFiles":
continue
if msg.get("method") in ("textDocument/didOpen", "textDocument/didChange"):
if msg.get("method") in {"textDocument/didOpen", "textDocument/didChange"}:
params = msg.get("params") or {}
td = params.get("textDocument") or {}
uri = td.get("uri", "")

View file

@ -87,10 +87,10 @@ def test_install_npm_works_without_extras(tmp_path, monkeypatch):
cmd = captured["cmd"]
assert "pyright" in cmd
# Should not blow up when extra_pkgs is omitted/None
install_targets = [c for c in cmd if not c.startswith("-") and c not in (
install_targets = [c for c in cmd if not c.startswith("-") and c not in {
"install", "--prefix", str(install_mod.hermes_lsp_bin_dir().parent),
"/usr/bin/npm",
)]
}]
assert install_targets == ["pyright"]

View file

@ -1658,7 +1658,7 @@ class TestThinkingBlockSignatureManagement:
_, result = convert_messages_to_anthropic(messages)
assistant = next(m for m in result if m["role"] == "assistant")
for block in assistant["content"]:
if block.get("type") in ("thinking", "redacted_thinking"):
if block.get("type") in {"thinking", "redacted_thinking"}:
assert "cache_control" not in block
def test_thinking_stripped_from_merged_consecutive_assistants(self):
@ -1748,7 +1748,7 @@ class TestThinkingBlockSignatureManagement:
# First two: no thinking blocks
for a in assistants[:2]:
assert not any(
b.get("type") in ("thinking", "redacted_thinking")
b.get("type") in {"thinking", "redacted_thinking"}
for b in a["content"]
if isinstance(b, dict)
)

View file

@ -371,7 +371,7 @@ class TestResolveVisionMainFirst:
provider, client, model = resolve_vision_provider_client()
assert client is fallback_client
assert provider in ("openrouter", "nous")
assert provider in {"openrouter", "nous"}
def test_explicit_provider_override_still_wins(self):
"""Explicit config override bypasses main-first policy."""

View file

@ -1046,7 +1046,7 @@ class TestCompressWithClient:
for i in range(1, len(result)):
r1 = result[i - 1].get("role")
r2 = result[i].get("role")
if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
if r1 in {"user", "assistant"} and r2 in {"user", "assistant"}:
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
def test_double_collision_merges_summary_into_tail(self):
@ -1087,7 +1087,7 @@ class TestCompressWithClient:
for i in range(1, len(result)):
r1 = result[i - 1].get("role")
r2 = result[i].get("role")
if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
if r1 in {"user", "assistant"} and r2 in {"user", "assistant"}:
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
# The summary text should be merged into the first tail message
@ -1164,7 +1164,7 @@ class TestCompressWithClient:
for i in range(1, len(result)):
r1 = result[i - 1].get("role")
r2 = result[i].get("role")
if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
if r1 in {"user", "assistant"} and r2 in {"user", "assistant"}:
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
# The summary should be merged into the first tail message (assistant at index 5)

View file

@ -191,7 +191,7 @@ class TestDeepSeekAnthropicPreservesThinking:
if not isinstance(m.get("content"), list):
continue
for b in m["content"]:
if isinstance(b, dict) and b.get("type") in ("thinking", "redacted_thinking"):
if isinstance(b, dict) and b.get("type") in {"thinking", "redacted_thinking"}:
assert "cache_control" not in b
def test_openai_compat_deepseek_base_is_not_matched(self) -> None:

View file

@ -99,7 +99,7 @@ class TestVerboseAndToolProgress:
def test_tool_progress_mode_is_string(self):
cli = _make_cli()
assert isinstance(cli.tool_progress_mode, str)
assert cli.tool_progress_mode in ("off", "new", "all", "verbose")
assert cli.tool_progress_mode in {"off", "new", "all", "verbose"}
class TestBusyInputMode:

View file

@ -70,7 +70,7 @@ class TestHandleReasoningCommand(unittest.TestCase):
stub = self._make_cli(show_reasoning=False)
# Simulate /reasoning show
arg = "show"
if arg in ("show", "on"):
if arg in {"show", "on"}:
stub.show_reasoning = True
stub.agent.reasoning_callback = lambda x: None
self.assertTrue(stub.show_reasoning)
@ -79,7 +79,7 @@ class TestHandleReasoningCommand(unittest.TestCase):
stub = self._make_cli(show_reasoning=True)
# Simulate /reasoning hide
arg = "hide"
if arg in ("hide", "off"):
if arg in {"hide", "off"}:
stub.show_reasoning = False
stub.agent.reasoning_callback = None
self.assertFalse(stub.show_reasoning)
@ -88,14 +88,14 @@ class TestHandleReasoningCommand(unittest.TestCase):
def test_on_enables_display(self):
stub = self._make_cli(show_reasoning=False)
arg = "on"
if arg in ("show", "on"):
if arg in {"show", "on"}:
stub.show_reasoning = True
self.assertTrue(stub.show_reasoning)
def test_off_disables_display(self):
stub = self._make_cli(show_reasoning=True)
arg = "off"
if arg in ("hide", "off"):
if arg in {"hide", "off"}:
stub.show_reasoning = False
self.assertFalse(stub.show_reasoning)

View file

@ -68,7 +68,7 @@ def test_create_job_no_agent_stores_field(hermes_env):
assert job["no_agent"] is True
assert job["script"] == "watchdog.sh"
# Prompt can be empty/None for no_agent jobs.
assert job["prompt"] in (None, "")
assert job["prompt"] in {None, ""}
def test_create_job_default_is_not_no_agent(hermes_env):
@ -148,7 +148,7 @@ def test_cronjob_tool_update_toggles_no_agent(hermes_env):
off = json.loads(cronjob(action="update", job_id=job_id, no_agent=False, prompt="run"))
assert off["success"] is True
assert off["job"].get("no_agent") in (False, None)
assert off["job"].get("no_agent") in {False, None}
on = json.loads(cronjob(action="update", job_id=job_id, no_agent=True))
assert on["success"] is True

View file

@ -269,7 +269,7 @@ def _scan_for_plugin_adapter_antipattern(source: str) -> list[str]:
and isinstance(func.value.value, ast.Name)
and func.value.value.id == "sys"
and func.value.attr == "path"
and func.attr in ("insert", "append", "extend")
and func.attr in {"insert", "append", "extend"}
):
target_name = f"sys.path.{func.attr}"

View file

@ -16,8 +16,8 @@ def _would_warn():
"MATRIX_ALLOWED_USERS", "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", "WECOM_ALLOWED_USERS",
"GATEWAY_ALLOWED_USERS")
)
_allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any(
os.getenv(v, "").lower() in ("true", "1", "yes")
_allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in {"true", "1", "yes"} or any(
os.getenv(v, "").lower() in {"true", "1", "yes"}
for v in ("TELEGRAM_ALLOW_ALL_USERS", "DISCORD_ALLOW_ALL_USERS",
"WHATSAPP_ALLOW_ALL_USERS", "SLACK_ALLOW_ALL_USERS",
"SIGNAL_ALLOW_ALL_USERS", "EMAIL_ALLOW_ALL_USERS",

View file

@ -44,7 +44,7 @@ def _simulate_config_bridge(cfg: dict, initial_env: dict | None = None):
val = terminal_cfg[cfg_key]
# Skip cwd placeholder values — don't overwrite already-resolved
# TERMINAL_CWD. Mirrors the fix in gateway/run.py.
if cfg_key == "cwd" and str(val) in (".", "auto", "cwd"):
if cfg_key == "cwd" and str(val) in {".", "auto", "cwd"}:
continue
# Expand shell tilde so subprocess.Popen never receives a literal
# "~/" which the kernel rejects.
@ -70,7 +70,7 @@ def _simulate_config_bridge(cfg: dict, initial_env: dict | None = None):
# --- Replicate lines 144-147: MESSAGING_CWD fallback ---
configured_cwd = env.get("TERMINAL_CWD", "")
if not configured_cwd or configured_cwd in (".", "auto", "cwd"):
if not configured_cwd or configured_cwd in {".", "auto", "cwd"}:
messaging_cwd = env.get("MESSAGING_CWD") or "/root" # Path.home() for root
env["TERMINAL_CWD"] = messaging_cwd

View file

@ -48,7 +48,7 @@ class TestDiscordSystemMessageFilter(unittest.TestCase):
return False
# System message filter (the fix being tested)
if message.type not in (discord.MessageType.default, discord.MessageType.reply):
if message.type not in {discord.MessageType.default, discord.MessageType.reply}:
return False
return True # message accepted

View file

@ -76,12 +76,12 @@ def test_checker_returns_true_when_configured(platform, checker, monkeypatch):
elif platform == Platform.SMS:
monkeypatch.setenv("TWILIO_ACCOUNT_SID", "ACtest")
mock_config.extra = {}
elif platform in (
elif platform in {
Platform.API_SERVER,
Platform.WEBHOOK,
Platform.MSGRAPH_WEBHOOK,
Platform.WHATSAPP,
):
}:
mock_config.extra = {}
elif platform == Platform.FEISHU:
mock_config.extra = {"app_id": "app"}

View file

@ -1076,7 +1076,7 @@ class TestBuildApprovalKeyboard:
parsed = parse_approval_button_data(btn.action.data)
assert parsed is not None
assert parsed[0] == session_key
assert parsed[1] in ("allow-once", "allow-always", "deny")
assert parsed[1] in {"allow-once", "allow-always", "deny"}
class TestBuildUpdatePromptKeyboard:

View file

@ -89,7 +89,7 @@ def _build_agent_history(history: list) -> list:
agent_history: list = []
for msg in history:
role = msg.get("role")
if not role or role in ("session_meta", "system"):
if not role or role in {"session_meta", "system"}:
continue
has_tool_calls = "tool_calls" in msg
has_tool_call_id = "tool_call_id" in msg

View file

@ -108,7 +108,7 @@ async def test_finalize_before_reset(mock_invoke_hook):
await runner._handle_reset_command(_make_event("/new"))
calls = [c for c in mock_invoke_hook.call_args_list
if c[0][0] in ("on_session_finalize", "on_session_reset")]
if c[0][0] in {"on_session_finalize", "on_session_reset"}]
hook_names = [c[0][0] for c in calls]
assert hook_names == ["on_session_finalize", "on_session_reset"]

View file

@ -187,7 +187,7 @@ fallback_providers:
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
def fake_resolve_runtime_provider(*, requested=None, explicit_base_url=None, explicit_api_key=None):
if requested in (None, "", "openai-codex"):
if requested in {None, "", "openai-codex"}:
from hermes_cli.auth import AuthError
raise AuthError("No Codex credentials stored. Run `hermes auth` to authenticate.")
assert requested == "openrouter"

View file

@ -31,7 +31,7 @@ def _filter_history(history: list) -> list:
role = msg.get("role")
if not role:
continue
if role in ("session_meta",):
if role in {"session_meta",}:
continue
if role == "system":
continue

View file

@ -555,7 +555,7 @@ class TestLoginNousSkipKeepsCurrent:
auth_path = hermes_home / "auth.json"
auth_after = json.loads(auth_path.read_text())
# active_provider should NOT be set to "nous" after Skip
assert auth_after.get("active_provider") in (None, "")
assert auth_after.get("active_provider") in {None, ""}
# But Nous creds are still saved
assert "nous" in auth_after.get("providers", {})

View file

@ -162,7 +162,7 @@ class TestCmdUpdateBranchFallback:
if call.args
and call.args[0][0] == "/usr/bin/npm"
and call.args[0][1] == "ci"
and call.kwargs.get("cwd") in (PROJECT_ROOT, PROJECT_ROOT / "ui-tui")
and call.kwargs.get("cwd") in {PROJECT_ROOT, PROJECT_ROOT / "ui-tui"}
]
assert len(repo_and_tui_calls) == 2
for call in repo_and_tui_calls:

View file

@ -105,7 +105,7 @@ class TestApply:
assert "Cannot enable" in r.message
assert "npm i -g @openai/codex" in r.message
# Config NOT mutated on failure
assert cfg.get("model", {}).get("openai_runtime") in (None, "")
assert cfg.get("model", {}).get("openai_runtime") in {None, ""}
def test_enable_succeeds_when_codex_present(self):
cfg = {}

View file

@ -48,7 +48,7 @@ class TestInstallCuaDriverUpgrade:
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/local/bin/" + n
if n in ("cua-driver", "curl") else None), \
if n in {"cua-driver", "curl"} else None), \
patch.object(tools_config, "_run_cua_driver_installer",
return_value=True) as runner, \
patch("subprocess.run"):
@ -82,7 +82,7 @@ class TestInstallCuaDriverUpgrade:
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/local/bin/" + n
if n in ("cua-driver", "curl") else None), \
if n in {"cua-driver", "curl"} else None), \
patch.object(tools_config, "_run_cua_driver_installer") as runner, \
patch("subprocess.run"):
assert tools_config.install_cua_driver(upgrade=False) is True

View file

@ -1046,7 +1046,7 @@ def test_enforce_max_runtime_integrates_with_dispatch(kanban_home, monkeypatch):
task = kb.get_task(conn, tid)
# After timeout, task is back in 'ready' and will be re-spawned
# by the same pass. That's the intended behaviour.
assert task.status in ("ready", "running")
assert task.status in {"ready", "running"}
finally:
conn.close()

View file

@ -43,9 +43,9 @@ def _run_memory_reset(target="all", yes=False, monkeypatch=None, confirm_input="
mem_dir = get_hermes_home() / "memories"
files_to_reset = []
if target in ("all", "memory"):
if target in {"all", "memory"}:
files_to_reset.append(("MEMORY.md", "agent notes"))
if target in ("all", "user"):
if target in {"all", "user"}:
files_to_reset.append(("USER.md", "user profile"))
existing = [(f, desc) for f, desc in files_to_reset if (mem_dir / f).exists()]

View file

@ -252,7 +252,7 @@ class TestDetectProviderForModel:
result = detect_provider_for_model("deepseek-chat", "openai-codex")
assert result is not None
# Provider is deepseek (direct) or openrouter (fallback) depending on creds
assert result[0] in ("deepseek", "openrouter")
assert result[0] in {"deepseek", "openrouter"}
def test_current_provider_model_returns_none(self):
"""Models belonging to the current provider should not trigger a switch."""
@ -302,7 +302,7 @@ class TestDetectProviderForModel:
with patch("hermes_cli.models.fetch_openrouter_models", return_value=LIVE_OPENROUTER_MODELS):
result = detect_provider_for_model("claude-opus-4-6", "openai-codex")
assert result is not None
assert result[0] not in ("nous",) # nous has claude models but shouldn't be suggested
assert result[0] not in {"nous",} # nous has claude models but shouldn't be suggested
class TestIsNousFreeTier:

View file

@ -44,7 +44,7 @@ def test_opencode_go_appears_when_api_key_set():
# opencode-go can appear as "built-in" (from PROVIDER_TO_MODELS_DEV when
# models.dev is reachable) or "hermes" (from HERMES_OVERLAYS fallback when
# the API is unavailable, e.g. in CI).
assert opencode_go["source"] in ("built-in", "hermes")
assert opencode_go["source"] in {"built-in", "hermes"}
def test_opencode_go_not_appears_when_no_creds():

View file

@ -237,7 +237,7 @@ class TestKillStaleDashboardPosix:
sent.append((pid, sig))
# Simulate stubborn process: probe (sig 0) always succeeds,
# SIGTERM does nothing, SIGKILL is where it "dies".
if sig in (_signal.SIGTERM, 0, _signal.SIGKILL):
if sig in {_signal.SIGTERM, 0, _signal.SIGKILL}:
return
# Any other signal — also fine.

View file

@ -306,7 +306,7 @@ class TestWebServerEndpoints:
resp = self.client.get("/api/auth/session-token")
# The endpoint is gone — the catch-all SPA route serves index.html
# or the middleware returns 401 for unauthenticated /api/ paths.
assert resp.status_code in (200, 404)
assert resp.status_code in {200, 404}
# Either way, it must NOT return the token as JSON
try:
data = resp.json()
@ -333,7 +333,7 @@ class TestWebServerEndpoints:
# %2e%2e = ..
resp = self.client.get("/%2e%2e/%2e%2e/etc/passwd")
# Should return 200 with index.html (SPA fallback), not the actual file
assert resp.status_code in (200, 404)
assert resp.status_code in {200, 404}
if resp.status_code == 200:
# Should be the SPA fallback, not the system file
assert "root:" not in resp.text
@ -341,7 +341,7 @@ class TestWebServerEndpoints:
def test_path_traversal_dotdot_blocked(self):
"""Direct .. path traversal via encoded sequences."""
resp = self.client.get("/%2e%2e/hermes_cli/web_server.py")
assert resp.status_code in (200, 404)
assert resp.status_code in {200, 404}
if resp.status_code == 200:
assert "FastAPI" not in resp.text # Should not serve the actual source
@ -535,7 +535,7 @@ class TestConfigRoundTrip:
if val is None:
continue # not set in user config — fine
expected = entry["type"]
if expected in ("string", "select") and not isinstance(val, str):
if expected in {"string", "select"} and not isinstance(val, str):
mismatches.append(f"{key}: expected str, got {type(val).__name__}")
elif expected == "number" and not isinstance(val, (int, float)):
mismatches.append(f"{key}: expected number, got {type(val).__name__}")
@ -1032,7 +1032,7 @@ class TestNewEndpoints:
"""GET /api/auth/session-token no longer exists."""
resp = self.client.get("/api/auth/session-token")
# Should not return a JSON token object
assert resp.status_code in (200, 404)
assert resp.status_code in {200, 404}
try:
data = resp.json()
assert "token" not in data

View file

@ -1570,7 +1570,7 @@ class TestDialecticLifecycleSmoke:
self._await_thread(provider)
assert mgr.dialectic_query.call_count == 2, "turn 4 cadence fire"
_, kwargs = mgr.dialectic_query.call_args
assert kwargs.get("reasoning_level") in ("medium", "high"), \
assert kwargs.get("reasoning_level") in {"medium", "high"}, \
f"long query must bump reasoning level above 'low'; got {kwargs.get('reasoning_level')}"
assert provider._last_dialectic_turn == 4, "cadence tracker advances on success"

View file

@ -271,7 +271,7 @@ def test_evaluate_all_force_runs_synchronously(plugin_api):
# Synchronous — snapshot is fresh on return.
assert result["scan_meta"].get("sessions_total") == 25
assert result["scan_meta"]["mode"] in ("full", "incremental")
assert result["scan_meta"]["mode"] in {"full", "incremental"}
def test_start_background_scan_is_idempotent_while_running(plugin_api):

View file

@ -110,4 +110,4 @@ def test_xai_no_operation_kwarg():
result = XAIVideoGenProvider().generate("x", operation="generate")
assert result["success"] is False
# auth_required, NOT some signature error
assert result["error_type"] in ("auth_required", "api_error")
assert result["error_type"] in {"auth_required", "api_error"}

View file

@ -106,9 +106,9 @@ class TestContinuationLogicBranching:
def test_all_three_api_modes_hit_continuation_branch(self, api_mode):
# The guard in run_agent.py is:
# if self.api_mode in ("chat_completions", "bedrock_converse", "anthropic_messages"):
assert api_mode in ("chat_completions", "bedrock_converse", "anthropic_messages")
assert api_mode in {"chat_completions", "bedrock_converse", "anthropic_messages"}
def test_codex_responses_still_excluded(self):
# codex_responses has its own truncation path (not continuation-based)
# and should NOT be routed through the shared block.
assert "codex_responses" not in ("chat_completions", "bedrock_converse", "anthropic_messages")
assert "codex_responses" not in {"chat_completions", "bedrock_converse", "anthropic_messages"}

Some files were not shown because too many files have changed in this diff Show more