Merge branch 'main' of github.com:NousResearch/hermes-agent into bb/gui

# Conflicts:
#	cli.py
#	hermes_cli/main.py
#	run_agent.py
#	tests/hermes_cli/test_cmd_update.py
#	tools/mcp_tool.py
#	web/src/lib/gatewayClient.ts
This commit is contained in:
Brooklyn Nicholson 2026-05-18 01:26:56 -05:00
commit 02aaac8f73
260 changed files with 24547 additions and 13573 deletions

View file

@ -0,0 +1,14 @@
"""Browser Use cloud browser plugin — bundled, auto-loaded.
Mirrors the ``plugins/web/<vendor>/`` layout: ``provider.py`` holds the
provider class; ``__init__.py::register`` instantiates and registers it.
"""
from __future__ import annotations
from plugins.browser.browser_use.provider import BrowserUseBrowserProvider
def register(ctx) -> None:
"""Register the Browser Use provider with the plugin context."""
ctx.register_browser_provider(BrowserUseBrowserProvider())

View file

@ -0,0 +1,7 @@
name: browser-browser-use
version: 1.0.0
description: "Browser Use (https://browser-use.com) cloud browser backend. Supports both direct BROWSER_USE_API_KEY and the managed Nous tool gateway. Also powers the 'Nous Subscription' UX flow that bills usage to a Nous subscription."
author: NousResearch
kind: backend
provides_browser_providers:
- browser-use

View file

@ -0,0 +1,310 @@
"""Browser Use cloud browser provider — plugin form.
Subclasses :class:`agent.browser_provider.BrowserProvider` (the plugin-facing
ABC introduced in PR #25214). The legacy in-tree module
``tools.browser_providers.browser_use`` was removed in the same PR; this file
is now the canonical implementation.
Browser Use is the only browser backend with dual auth: a direct
``BROWSER_USE_API_KEY`` for self-billed users, or the managed Nous tool
gateway (which Hermes uses to bill Browser Use sessions to a Nous
subscription). The dispatch order direct API key first, managed gateway
second preserves the pre-migration behaviour in
``tools.browser_providers.browser_use.BrowserUseProvider._get_config_or_none``.
Config keys this provider responds to::
browser:
cloud_provider: "browser-use" # explicit selection
tool_gateway:
browser: "gateway" # optional: prefer managed gateway
# even when BROWSER_USE_API_KEY is set
Auth env vars (one of)::
BROWSER_USE_API_KEY=... # https://browser-use.com
# OR a managed Nous gateway entry (configured via 'hermes setup')
"""
from __future__ import annotations
import logging
import os
import threading
import uuid
from typing import Any, Dict, Optional
import requests
from agent.browser_provider import BrowserProvider
logger = logging.getLogger(__name__)
# Idempotency tracking for managed-mode session creation. The managed Nous
# gateway returns 409 "already in progress" on retried POSTs; we forward the
# original idempotency key so the gateway can deduplicate. Cleared on
# success or terminal failure.
_pending_create_keys: Dict[str, str] = {}
_pending_create_keys_lock = threading.Lock()
_BASE_URL = "https://api.browser-use.com/api/v3"
_DEFAULT_MANAGED_TIMEOUT_MINUTES = 5
_DEFAULT_MANAGED_PROXY_COUNTRY_CODE = "us"
def _get_or_create_pending_create_key(task_id: str) -> str:
with _pending_create_keys_lock:
existing = _pending_create_keys.get(task_id)
if existing:
return existing
created = f"browser-use-session-create:{uuid.uuid4().hex}"
_pending_create_keys[task_id] = created
return created
def _clear_pending_create_key(task_id: str) -> None:
with _pending_create_keys_lock:
_pending_create_keys.pop(task_id, None)
def _should_preserve_pending_create_key(response: requests.Response) -> bool:
"""Decide whether to keep the idempotency key after a failed create.
Preserve the key when the failure looks retryable (5xx) OR when the
gateway reports the original request is still in flight (409 "already
in progress") — in either case, retrying with the same key lets the
gateway deduplicate.
Drop the key on any other 4xx (auth failure, bad request, etc.) those
won't succeed by being retried.
"""
if response.status_code >= 500:
return True
if response.status_code != 409:
return False
try:
payload = response.json()
except Exception:
return False
if not isinstance(payload, dict):
return False
error = payload.get("error")
if not isinstance(error, dict):
return False
message = str(error.get("message") or "").lower()
return "already in progress" in message
class BrowserUseBrowserProvider(BrowserProvider):
"""Browser Use (https://browser-use.com) cloud browser backend.
Dual auth: prefers a direct BROWSER_USE_API_KEY when set, falling back
to the managed Nous tool gateway when ``tool_gateway.browser`` config
routes through it. Setting ``tool_gateway.browser: gateway`` flips the
order so managed billing wins even when BROWSER_USE_API_KEY is present.
"""
@property
def name(self) -> str:
return "browser-use"
@property
def display_name(self) -> str:
return "Browser Use"
def is_available(self) -> bool:
return self._get_config_or_none() is not None
# ------------------------------------------------------------------
# Config resolution (direct API key OR managed Nous gateway)
# ------------------------------------------------------------------
def _get_config_or_none(self) -> Optional[Dict[str, Any]]:
# Import here to avoid a hard dependency at module-import time —
# managed_tool_gateway pulls in the Nous auth stack which can be
# heavy and is not needed for direct-API-key users.
from tools.managed_tool_gateway import resolve_managed_tool_gateway
from tools.tool_backend_helpers import prefers_gateway
# Direct API key wins unless the user has explicitly opted into the
# managed Nous gateway via ``tool_gateway.browser: gateway``.
api_key = os.environ.get("BROWSER_USE_API_KEY")
if api_key and not prefers_gateway("browser"):
return {
"api_key": api_key,
"base_url": _BASE_URL,
"managed_mode": False,
}
managed = resolve_managed_tool_gateway("browser-use")
if managed is None:
return None
return {
"api_key": managed.nous_user_token,
"base_url": managed.gateway_origin.rstrip("/"),
"managed_mode": True,
}
def _get_config(self) -> Dict[str, Any]:
from tools.tool_backend_helpers import managed_nous_tools_enabled
config = self._get_config_or_none()
if config is None:
message = (
"Browser Use requires a direct BROWSER_USE_API_KEY credential."
)
if managed_nous_tools_enabled():
message = (
"Browser Use requires either a direct BROWSER_USE_API_KEY "
"credential or a managed Browser Use gateway configuration."
)
raise ValueError(message)
return config
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
def _headers(self, config: Dict[str, Any]) -> Dict[str, str]:
return {
"Content-Type": "application/json",
"X-Browser-Use-API-Key": config["api_key"],
}
def create_session(self, task_id: str) -> Dict[str, object]:
config = self._get_config()
managed_mode = bool(config.get("managed_mode"))
headers = self._headers(config)
if managed_mode:
headers["X-Idempotency-Key"] = _get_or_create_pending_create_key(task_id)
# Keep gateway-backed sessions short so billing authorization does not
# default to a long Browser-Use timeout when Hermes only needs a task-
# scoped ephemeral browser.
payload = (
{
"timeout": _DEFAULT_MANAGED_TIMEOUT_MINUTES,
"proxyCountryCode": _DEFAULT_MANAGED_PROXY_COUNTRY_CODE,
}
if managed_mode
else {}
)
try:
response = requests.post(
f"{config['base_url']}/browsers",
headers=headers,
json=payload,
timeout=30,
)
except requests.RequestException as exc:
# Managed mode: propagate raw so callers can retry with the
# preserved idempotency key. Direct mode: wrap network failures
# into a clean RuntimeError for end users.
if managed_mode:
raise
raise RuntimeError(
f"Browser Use API connection failed: {exc}"
) from exc
if not response.ok:
if managed_mode and not _should_preserve_pending_create_key(response):
_clear_pending_create_key(task_id)
raise RuntimeError(
f"Failed to create Browser Use session: "
f"{response.status_code} {response.text}"
)
session_data = response.json()
if managed_mode:
_clear_pending_create_key(task_id)
session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}"
external_call_id = (
response.headers.get("x-external-call-id") if managed_mode else None
)
logger.info("Created Browser Use session %s", session_name)
cdp_url = session_data.get("cdpUrl") or session_data.get("connectUrl") or ""
return {
"session_name": session_name,
"bb_session_id": session_data["id"],
"cdp_url": cdp_url,
"features": {"browser_use": True},
"external_call_id": external_call_id,
}
def close_session(self, session_id: str) -> bool:
try:
config = self._get_config()
except ValueError:
logger.warning(
"Cannot close Browser Use session %s — missing credentials", session_id
)
return False
try:
response = requests.patch(
f"{config['base_url']}/browsers/{session_id}",
headers=self._headers(config),
json={"action": "stop"},
timeout=10,
)
if response.status_code in {200, 201, 204}:
logger.debug("Successfully closed Browser Use session %s", session_id)
return True
else:
logger.warning(
"Failed to close Browser Use session %s: HTTP %s - %s",
session_id,
response.status_code,
response.text[:200],
)
return False
except Exception as e:
logger.error("Exception closing Browser Use session %s: %s", session_id, e)
return False
def emergency_cleanup(self, session_id: str) -> None:
config = self._get_config_or_none()
if config is None:
logger.warning(
"Cannot emergency-cleanup Browser Use session %s — missing credentials",
session_id,
)
return
try:
requests.patch(
f"{config['base_url']}/browsers/{session_id}",
headers=self._headers(config),
json={"action": "stop"},
timeout=5,
)
except Exception as e:
logger.debug(
"Emergency cleanup failed for Browser Use session %s: %s", session_id, e
)
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Browser Use",
"badge": "paid",
"tag": "Cloud browser with remote execution",
"env_vars": [
{
"key": "BROWSER_USE_API_KEY",
"prompt": "Browser Use API key",
"url": "https://browser-use.com",
},
],
"post_setup": "agent_browser",
}

View file

@ -0,0 +1,15 @@
"""Browserbase cloud browser plugin — bundled, auto-loaded.
Mirrors the ``plugins/web/<vendor>/`` and ``plugins/image_gen/openai/``
layout: ``provider.py`` holds the provider class; ``__init__.py::register``
instantiates and registers it via the plugin context.
"""
from __future__ import annotations
from plugins.browser.browserbase.provider import BrowserbaseBrowserProvider
def register(ctx) -> None:
"""Register the Browserbase provider with the plugin context."""
ctx.register_browser_provider(BrowserbaseBrowserProvider())

View file

@ -0,0 +1,7 @@
name: browser-browserbase
version: 1.0.0
description: "Browserbase (https://browserbase.com) cloud browser backend. Requires BROWSERBASE_API_KEY + BROWSERBASE_PROJECT_ID. Supports stealth, proxies, and keep-alive sessions; auto-falls-back when paid features are unavailable."
author: NousResearch
kind: backend
provides_browser_providers:
- browserbase

View file

@ -0,0 +1,297 @@
"""Browserbase cloud browser provider — plugin form.
Subclasses :class:`agent.browser_provider.BrowserProvider` (the plugin-facing
ABC introduced in PR #25214). The legacy in-tree module
``tools.browser_providers.browserbase`` was removed in the same PR; this file
is now the canonical implementation.
Browserbase requires direct ``BROWSERBASE_API_KEY`` and ``BROWSERBASE_PROJECT_ID``
credentials. Managed Nous gateway support has been removed the Nous
subscription now routes through Browser Use instead (see
``plugins/browser/browser_use/``).
Config keys this provider responds to::
browser:
cloud_provider: "browserbase"
Auth env vars::
BROWSERBASE_API_KEY=... # https://browserbase.com
BROWSERBASE_PROJECT_ID=...
Optional feature knobs::
BROWSERBASE_BASE_URL=... # default https://api.browserbase.com
BROWSERBASE_PROXIES=true # default true
BROWSERBASE_ADVANCED_STEALTH=false
BROWSERBASE_KEEP_ALIVE=true # default true
BROWSERBASE_SESSION_TIMEOUT=... (ms, integer)
"""
from __future__ import annotations
import logging
import os
import uuid
from typing import Any, Dict, Optional
import requests
from agent.browser_provider import BrowserProvider
logger = logging.getLogger(__name__)
class BrowserbaseBrowserProvider(BrowserProvider):
"""Browserbase (https://browserbase.com) cloud browser backend.
Direct credentials only managed-Nous-gateway support lives on the
Browser Use provider now.
"""
@property
def name(self) -> str:
return "browserbase"
@property
def display_name(self) -> str:
return "Browserbase"
def is_available(self) -> bool:
return self._get_config_or_none() is not None
# ------------------------------------------------------------------
# Config resolution
# ------------------------------------------------------------------
def _get_config_or_none(self) -> Optional[Dict[str, Any]]:
api_key = os.environ.get("BROWSERBASE_API_KEY")
project_id = os.environ.get("BROWSERBASE_PROJECT_ID")
if api_key and project_id:
return {
"api_key": api_key,
"project_id": project_id,
"base_url": os.environ.get(
"BROWSERBASE_BASE_URL", "https://api.browserbase.com"
).rstrip("/"),
}
return None
def _get_config(self) -> Dict[str, Any]:
config = self._get_config_or_none()
if config is None:
raise ValueError(
"Browserbase requires BROWSERBASE_API_KEY and BROWSERBASE_PROJECT_ID "
"environment variables."
)
return config
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
def create_session(self, task_id: str) -> Dict[str, object]:
config = self._get_config()
# Optional env-var knobs
enable_proxies = os.environ.get("BROWSERBASE_PROXIES", "true").lower() != "false"
enable_advanced_stealth = (
os.environ.get("BROWSERBASE_ADVANCED_STEALTH", "false").lower() == "true"
)
enable_keep_alive = (
os.environ.get("BROWSERBASE_KEEP_ALIVE", "true").lower() != "false"
)
custom_timeout_ms = os.environ.get("BROWSERBASE_SESSION_TIMEOUT")
features_enabled = {
"basic_stealth": True,
"proxies": False,
"advanced_stealth": False,
"keep_alive": False,
"custom_timeout": False,
}
session_config: Dict[str, object] = {"projectId": config["project_id"]}
if enable_keep_alive:
session_config["keepAlive"] = True
if custom_timeout_ms:
try:
timeout_val = int(custom_timeout_ms)
if timeout_val > 0:
session_config["timeout"] = timeout_val
except ValueError:
logger.warning(
"Invalid BROWSERBASE_SESSION_TIMEOUT value: %s", custom_timeout_ms
)
if enable_proxies:
session_config["proxies"] = True
if enable_advanced_stealth:
session_config["browserSettings"] = {"advancedStealth": True}
# --- Create session via API ---
headers = {
"Content-Type": "application/json",
"X-BB-API-Key": config["api_key"],
}
try:
response = requests.post(
f"{config['base_url']}/v1/sessions",
headers=headers,
json=session_config,
timeout=30,
)
proxies_fallback = False
keepalive_fallback = False
# Handle 402 — paid features unavailable
if response.status_code == 402:
if enable_keep_alive:
keepalive_fallback = True
logger.warning(
"keepAlive may require paid plan (402), retrying without it. "
"Sessions may timeout during long operations."
)
session_config.pop("keepAlive", None)
response = requests.post(
f"{config['base_url']}/v1/sessions",
headers=headers,
json=session_config,
timeout=30,
)
if response.status_code == 402 and enable_proxies:
proxies_fallback = True
logger.warning(
"Proxies unavailable (402), retrying without proxies. "
"Bot detection may be less effective."
)
session_config.pop("proxies", None)
response = requests.post(
f"{config['base_url']}/v1/sessions",
headers=headers,
json=session_config,
timeout=30,
)
except requests.RequestException as exc:
raise RuntimeError(
f"Browserbase API connection failed: {exc}"
) from exc
if not response.ok:
raise RuntimeError(
f"Failed to create Browserbase session: "
f"{response.status_code} {response.text}"
)
session_data = response.json()
session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}"
if enable_proxies and not proxies_fallback:
features_enabled["proxies"] = True
if enable_advanced_stealth:
features_enabled["advanced_stealth"] = True
if enable_keep_alive and not keepalive_fallback:
features_enabled["keep_alive"] = True
if custom_timeout_ms and "timeout" in session_config:
features_enabled["custom_timeout"] = True
feature_str = ", ".join(k for k, v in features_enabled.items() if v)
logger.info(
"Created Browserbase session %s with features: %s", session_name, feature_str
)
return {
"session_name": session_name,
"bb_session_id": session_data["id"],
"cdp_url": session_data["connectUrl"],
"features": features_enabled,
}
def close_session(self, session_id: str) -> bool:
try:
config = self._get_config()
except ValueError:
logger.warning(
"Cannot close Browserbase session %s — missing credentials", session_id
)
return False
try:
response = requests.post(
f"{config['base_url']}/v1/sessions/{session_id}",
headers={
"X-BB-API-Key": config["api_key"],
"Content-Type": "application/json",
},
json={
"projectId": config["project_id"],
"status": "REQUEST_RELEASE",
},
timeout=10,
)
if response.status_code in {200, 201, 204}:
logger.debug("Successfully closed Browserbase session %s", session_id)
return True
else:
logger.warning(
"Failed to close session %s: HTTP %s - %s",
session_id,
response.status_code,
response.text[:200],
)
return False
except Exception as e:
logger.error("Exception closing Browserbase session %s: %s", session_id, e)
return False
def emergency_cleanup(self, session_id: str) -> None:
config = self._get_config_or_none()
if config is None:
logger.warning(
"Cannot emergency-cleanup Browserbase session %s — missing credentials",
session_id,
)
return
try:
requests.post(
f"{config['base_url']}/v1/sessions/{session_id}",
headers={
"X-BB-API-Key": config["api_key"],
"Content-Type": "application/json",
},
json={
"projectId": config["project_id"],
"status": "REQUEST_RELEASE",
},
timeout=5,
)
except Exception as e:
logger.debug(
"Emergency cleanup failed for Browserbase session %s: %s", session_id, e
)
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Browserbase",
"badge": "paid",
"tag": "Cloud browser with stealth and proxies",
"env_vars": [
{
"key": "BROWSERBASE_API_KEY",
"prompt": "Browserbase API key",
"url": "https://browserbase.com",
},
{
"key": "BROWSERBASE_PROJECT_ID",
"prompt": "Browserbase project ID",
},
],
"post_setup": "agent_browser",
}

View file

@ -0,0 +1,16 @@
"""Firecrawl cloud browser plugin — bundled, auto-loaded.
Distinct from ``plugins/web/firecrawl/`` (the web search/extract/crawl
plugin); both share the FIRECRAWL_API_KEY but speak to different endpoints
(``/v2/browser`` here vs ``/v2/search`` / ``/v2/scrape`` / ``/v2/crawl``
over there).
"""
from __future__ import annotations
from plugins.browser.firecrawl.provider import FirecrawlBrowserProvider
def register(ctx) -> None:
"""Register the Firecrawl cloud-browser provider with the plugin context."""
ctx.register_browser_provider(FirecrawlBrowserProvider())

View file

@ -0,0 +1,7 @@
name: browser-firecrawl
version: 1.0.0
description: "Firecrawl (https://firecrawl.dev) cloud browser backend. Requires FIRECRAWL_API_KEY. Distinct from the firecrawl WEB search/extract plugin — the two share an API key but operate on different endpoints."
author: NousResearch
kind: backend
provides_browser_providers:
- firecrawl

View file

@ -0,0 +1,168 @@
"""Firecrawl cloud browser provider — plugin form.
Subclasses :class:`agent.browser_provider.BrowserProvider` (the plugin-facing
ABC introduced in PR #25214). The legacy in-tree module
``tools.browser_providers.firecrawl`` was removed in the same PR; this file
is now the canonical implementation.
This is the cloud-browser path distinct from the firecrawl WEB plugin at
``plugins/web/firecrawl/`` which handles search/extract/crawl on
``/v2/search`` / ``/v2/scrape`` / ``/v2/crawl``. The two plugins share the
``FIRECRAWL_API_KEY`` env var but talk to different endpoints (this one
hits ``/v2/browser``).
Config keys this provider responds to::
browser:
cloud_provider: "firecrawl" # explicit selection only — not in the
# legacy auto-detect walk
Auth env vars::
FIRECRAWL_API_KEY=... # https://firecrawl.dev
FIRECRAWL_API_URL=... # optional override (default https://api.firecrawl.dev)
FIRECRAWL_BROWSER_TTL=... # optional, default 300 seconds
"""
from __future__ import annotations
import logging
import os
import uuid
from typing import Any, Dict
import requests
from agent.browser_provider import BrowserProvider
logger = logging.getLogger(__name__)
_BASE_URL = "https://api.firecrawl.dev"
class FirecrawlBrowserProvider(BrowserProvider):
"""Firecrawl (https://firecrawl.dev) cloud browser backend.
Cloud-browser path only search/extract/crawl live in the separate
``plugins/web/firecrawl/`` plugin.
"""
@property
def name(self) -> str:
return "firecrawl"
@property
def display_name(self) -> str:
return "Firecrawl"
def is_available(self) -> bool:
return bool(os.environ.get("FIRECRAWL_API_KEY"))
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
def _api_url(self) -> str:
return os.environ.get("FIRECRAWL_API_URL", _BASE_URL)
def _headers(self) -> Dict[str, str]:
api_key = os.environ.get("FIRECRAWL_API_KEY")
if not api_key:
raise ValueError(
"FIRECRAWL_API_KEY environment variable is required. "
"Get your key at https://firecrawl.dev"
)
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
def create_session(self, task_id: str) -> Dict[str, object]:
ttl = int(os.environ.get("FIRECRAWL_BROWSER_TTL", "300"))
body: Dict[str, object] = {"ttl": ttl}
try:
response = requests.post(
f"{self._api_url()}/v2/browser",
headers=self._headers(),
json=body,
timeout=30,
)
except requests.RequestException as exc:
raise RuntimeError(
f"Firecrawl API connection failed: {exc}"
) from exc
if not response.ok:
raise RuntimeError(
f"Failed to create Firecrawl browser session: "
f"{response.status_code} {response.text}"
)
data = response.json()
session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}"
logger.info("Created Firecrawl browser session %s", session_name)
return {
"session_name": session_name,
"bb_session_id": data["id"],
"cdp_url": data["cdpUrl"],
"features": {"firecrawl": True},
}
def close_session(self, session_id: str) -> bool:
try:
response = requests.delete(
f"{self._api_url()}/v2/browser/{session_id}",
headers=self._headers(),
timeout=10,
)
if response.status_code in {200, 201, 204}:
logger.debug("Successfully closed Firecrawl session %s", session_id)
return True
else:
logger.warning(
"Failed to close Firecrawl session %s: HTTP %s - %s",
session_id,
response.status_code,
response.text[:200],
)
return False
except Exception as e:
logger.error("Exception closing Firecrawl session %s: %s", session_id, e)
return False
def emergency_cleanup(self, session_id: str) -> None:
if not self.is_available():
logger.warning(
"Cannot emergency-cleanup Firecrawl session %s — missing credentials",
session_id,
)
return
try:
requests.delete(
f"{self._api_url()}/v2/browser/{session_id}",
headers=self._headers(),
timeout=5,
)
except Exception as e:
logger.debug(
"Emergency cleanup failed for Firecrawl session %s: %s", session_id, e
)
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Firecrawl",
"badge": "paid",
"tag": "Cloud browser with remote execution",
"env_vars": [
{
"key": "FIRECRAWL_API_KEY",
"prompt": "Firecrawl API key",
"url": "https://firecrawl.dev",
},
],
"post_setup": "agent_browser",
}

View file

@ -222,7 +222,7 @@ def _fmt_summary(summary: Dict[str, Any]) -> str:
def _handle_slash(raw_args: str) -> Optional[str]:
argv = raw_args.strip().split()
if not argv or argv[0] in ("help", "-h", "--help"):
if not argv or argv[0] in {"help", "-h", "--help"}:
return _HELP_TEXT
sub = argv[0]

View file

@ -72,7 +72,7 @@ def register(ctx) -> None:
# tested path there and guest-join Chromium is flakier. Refuse to register
# rather than half-working.
system = platform.system().lower()
if system not in ("linux", "darwin"):
if system not in {"linux", "darwin"}:
logger.info(
"google_meet plugin: platform=%s not supported (linux/macos only)",
system,

View file

@ -159,7 +159,7 @@ def _cmd_setup() -> int:
print("---------------------")
system = _p.system()
system_ok = system in ("Linux", "Darwin")
system_ok = system in {"Linux", "Darwin"}
print(f" platform : {system} [{'ok' if system_ok else 'unsupported'}]")
try:
@ -231,7 +231,7 @@ def _cmd_install(*, realtime: bool, assume_yes: bool) -> int:
import subprocess as _sp
system = _p.system()
if system not in ("Linux", "Darwin"):
if system not in {"Linux", "Darwin"}:
print(f"google_meet install: {system} is not supported (linux/macos only)")
return 1
@ -242,7 +242,7 @@ def _cmd_install(*, realtime: bool, assume_yes: bool) -> int:
ans = input(f"{prompt} [y/N] ").strip().lower()
except EOFError:
return False
return ans in ("y", "yes")
return ans in {"y", "yes"}
print("google_meet install")
print("-------------------")

View file

@ -447,7 +447,7 @@ def _mac_audio_device_index(device_name: str) -> str:
def run_bot() -> int: # noqa: C901 — orchestration, explicit branches
url = os.environ.get("HERMES_MEET_URL", "").strip()
out_dir_env = os.environ.get("HERMES_MEET_OUT_DIR", "").strip()
headed = os.environ.get("HERMES_MEET_HEADED", "").lower() in ("1", "true", "yes")
headed = os.environ.get("HERMES_MEET_HEADED", "").lower() in {"1", "true", "yes"}
auth_state = os.environ.get("HERMES_MEET_AUTH_STATE", "").strip()
guest_name = os.environ.get("HERMES_MEET_GUEST_NAME", "Hermes Agent")
duration_s = _parse_duration(os.environ.get("HERMES_MEET_DURATION", ""))
@ -808,7 +808,7 @@ def _looks_like_human_speaker(speaker: str, bot_guest_name: str) -> bool:
if not speaker or not speaker.strip():
return False
spk = speaker.strip().lower()
if spk in ("unknown", "you", bot_guest_name.strip().lower()):
if spk in {"unknown", "you", bot_guest_name.strip().lower()}:
return False
return True

View file

@ -103,7 +103,7 @@ def node_command(args: argparse.Namespace) -> int:
print(f"removed {args.name!r}" if ok else f"no such node: {args.name!r}")
return 0 if ok else 1
if cmd in ("status", "ping"):
if cmd in {"status", "ping"}:
entry = reg.get(args.name)
if entry is None:
print(f"no such node: {args.name!r}", file=sys.stderr)

View file

@ -183,7 +183,7 @@ class RealtimeSession:
rid = (frame.get("response") or {}).get("id")
if rid:
self._last_response_id = rid
elif ftype in ("response.done", "response.completed", "response.cancelled"):
elif ftype in {"response.done", "response.completed", "response.cancelled"}:
break
elif ftype == "error":
err = frame.get("error") or frame

View file

@ -36,7 +36,7 @@ def check_meet_requirements() -> bool:
handlers relax the requirement when a node is addressed.
"""
import platform as _p
if _p.system().lower() not in ("linux", "darwin"):
if _p.system().lower() not in {"linux", "darwin"}:
return False
try:
import playwright # noqa: F401
@ -238,7 +238,7 @@ def handle_meet_join(args: Dict[str, Any], **_kw) -> str:
if not url:
return _err("url is required")
mode = (args.get("mode") or "transcribe").strip().lower()
if mode not in ("transcribe", "realtime"):
if mode not in {"transcribe", "realtime"}:
return _err(f"mode must be 'transcribe' or 'realtime' (got {mode!r})")
node = args.get("node")

View file

@ -908,6 +908,7 @@
return createNewBoard(payload).then(function () { setShowNewBoard(false); });
},
}) : null,
h(OrchestrationPanel, null),
h(AttentionStrip, {
boardData,
onOpen: setSelectedTaskId,
@ -1386,6 +1387,288 @@
}, "?");
}
// ---------------------------------------------------------------------
// OrchestrationPanel — collapsible settings panel for the kanban
// orchestrator (orchestrator profile picker, default assignee picker,
// auto-decompose toggle, plus per-profile description editing with
// auto-generate). Backed by /orchestration + /profiles endpoints.
// ---------------------------------------------------------------------
function OrchestrationPanel() {
const [expanded, setExpanded] = useState(false);
const [settings, setSettings] = useState(null);
const [profiles, setProfiles] = useState([]);
const [busy, setBusy] = useState({});
const [msg, setMsg] = useState(null);
const loadAll = useCallback(function () {
Promise.all([
SDK.fetchJSON(`${API}/orchestration`),
SDK.fetchJSON(`${API}/profiles`),
]).then(function (results) {
setSettings(results[0] || null);
setProfiles((results[1] && results[1].profiles) || []);
setMsg(null);
}).catch(function (err) {
setMsg({ ok: false, text: "Failed to load: " + (err.message || String(err)) });
});
}, []);
useEffect(function () {
// Load on mount so the collapsed pill shows the real mode without
// requiring the user to expand the panel first.
if (settings === null) loadAll();
}, [settings, loadAll]);
const saveSettings = function (patch) {
setMsg(null);
return SDK.fetchJSON(`${API}/orchestration`, {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(patch),
}).then(function (res) {
setSettings(res);
setMsg({ ok: true, text: "Settings saved." });
return res;
}).catch(function (err) {
setMsg({ ok: false, text: "Save failed: " + (err.message || String(err)) });
});
};
const saveProfileDescription = function (name, description) {
setBusy(function (b) { return Object.assign({}, b, { [name]: "save" }); });
return SDK.fetchJSON(`${API}/profiles/${encodeURIComponent(name)}`, {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ description: description }),
}).then(function () {
loadAll();
setMsg({ ok: true, text: `Description saved for ${name}.` });
}).catch(function (err) {
setMsg({ ok: false, text: "Save failed: " + (err.message || String(err)) });
}).then(function () {
setBusy(function (b) {
const next = Object.assign({}, b); delete next[name]; return next;
});
});
};
const autoGenerateDescription = function (name, overwrite) {
setBusy(function (b) { return Object.assign({}, b, { [name]: "auto" }); });
return SDK.fetchJSON(`${API}/profiles/${encodeURIComponent(name)}/describe-auto`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ overwrite: !!overwrite }),
}).then(function (res) {
if (res && res.ok) {
loadAll();
setMsg({ ok: true, text: `Auto-generated description for ${name}.` });
} else {
setMsg({
ok: false,
text: "Auto-generate failed: " + ((res && res.reason) || "unknown error"),
});
}
}).catch(function (err) {
setMsg({ ok: false, text: "Auto-generate failed: " + (err.message || String(err)) });
}).then(function () {
setBusy(function (b) {
const next = Object.assign({}, b); delete next[name]; return next;
});
});
};
const headerLabel = expanded
? "▾ Orchestration settings"
: "▸ Orchestration settings";
// Mode pill — always visible (collapsed or expanded). One click flips
// between Auto and Manual. Auto = dispatcher decomposes new triage tasks
// every tick. Manual = pre-PR behavior, the user clicks ⚗ Decompose on
// each triage card (or runs `hermes kanban decompose <id>`) and tasks
// stay in triage until then.
const autoOn = !!(settings && settings.auto_decompose);
const modePillTitle = settings === null
? "Loading mode…"
: (autoOn
? "Orchestration: Auto — the dispatcher decomposes new triage tasks automatically every tick. Click to switch to Manual (pre-PR behavior)."
: "Orchestration: Manual — triage tasks stay in triage until you click ⚗ Decompose on each card. Click to switch to Auto.");
const modePill = h("button", {
type: "button",
onClick: function () {
if (settings === null) return; // not loaded yet
saveSettings({ auto_decompose: !autoOn });
},
disabled: settings === null,
title: modePillTitle,
className: "inline-flex items-center gap-1 rounded-full border px-2 py-0.5 "
+ "text-xs font-medium "
+ (autoOn
? "border-emerald-500/40 bg-emerald-500/10 text-emerald-700 dark:text-emerald-300"
: "border-muted-foreground/30 bg-muted/30 text-muted-foreground"),
},
"Orchestration: ",
h("span", { className: "ml-1 font-semibold" },
settings === null ? "…" : (autoOn ? "Auto" : "Manual"))
);
if (!expanded) {
return h("div", { className: "flex items-center gap-3 text-xs" },
modePill,
h("button", {
type: "button",
onClick: function () { setExpanded(true); },
className: "underline text-muted-foreground hover:text-foreground",
title: "Configure the kanban orchestrator (profile picker, default assignee, auto-decompose, profile descriptions)",
}, headerLabel),
);
}
const profileOptions = profiles.map(function (p) {
const tag = p.is_default ? " (default)" : "";
return h(SelectOption, { key: p.name, value: p.name }, p.name + tag);
});
return h(Card, { className: "p-3" },
h(CardContent, { className: "p-2 flex flex-col gap-3" },
h("div", { className: "flex items-center justify-between" },
h("button", {
type: "button",
onClick: function () { setExpanded(false); },
className: "text-sm font-medium underline-offset-2 hover:underline",
}, headerLabel),
modePill,
h(Button, { onClick: loadAll, size: "sm" }, "Reload"),
),
msg ? h("div", {
className: msg.ok ? "hermes-kanban-msg-ok" : "hermes-kanban-msg-err",
}, msg.text) : null,
settings ? h("div", { className: "grid gap-3 sm:grid-cols-3" },
h("div", { className: "flex flex-col gap-1" },
h(Label, { className: "text-xs text-muted-foreground" },
"Orchestrator profile"),
h(Select, {
value: settings.orchestrator_profile || "",
className: "h-8",
onChange: function (e) {
const v = (e && e.target ? e.target.value : e) || "";
saveSettings({ orchestrator_profile: v });
},
},
h(SelectOption, { value: "" },
"(default: " + (settings.active_profile || "default") + ")"),
profileOptions,
),
h("div", { className: "text-[10px] text-muted-foreground" },
"Resolved: " + (settings.resolved_orchestrator_profile || "default")),
),
h("div", { className: "flex flex-col gap-1" },
h(Label, { className: "text-xs text-muted-foreground" },
"Default assignee"),
h(Select, {
value: settings.default_assignee || "",
className: "h-8",
onChange: function (e) {
const v = (e && e.target ? e.target.value : e) || "";
saveSettings({ default_assignee: v });
},
},
h(SelectOption, { value: "" },
"(default: " + (settings.active_profile || "default") + ")"),
profileOptions,
),
h("div", { className: "text-[10px] text-muted-foreground" },
"Resolved: " + (settings.resolved_default_assignee || "default")),
),
h("div", { className: "flex flex-col gap-1" },
h(Label, { className: "text-xs text-muted-foreground" },
"Orchestration mode"),
h("label", { className: "flex items-center gap-2 text-xs h-8" },
h("input", {
type: "checkbox",
checked: !!settings.auto_decompose,
onChange: function (e) {
saveSettings({ auto_decompose: !!e.target.checked });
},
}),
settings.auto_decompose ? "Auto (default)" : "Manual",
),
h("div", { className: "text-[10px] text-muted-foreground" },
"When on, the dispatcher decomposes new triage tasks automatically."),
),
) : h("div", { className: "text-xs text-muted-foreground" },
"Loading…"),
h("div", { className: "border-t pt-3" },
h(Label, { className: "text-xs text-muted-foreground" },
"Profile descriptions"),
h("div", { className: "text-[10px] text-muted-foreground pb-2" },
"Descriptions guide the orchestrator's routing. Click ⚗ to auto-generate, or edit and save."),
profiles.length === 0
? h("div", { className: "text-xs text-muted-foreground" }, "No profiles installed.")
: h("div", { className: "flex flex-col gap-2" },
profiles.map(function (p) {
return h(ProfileDescriptionRow, {
key: p.name,
profile: p,
busy: busy[p.name] || null,
onSave: saveProfileDescription,
onAuto: autoGenerateDescription,
});
}),
),
),
),
);
}
function ProfileDescriptionRow(props) {
const p = props.profile;
const [draft, setDraft] = useState(p.description || "");
const busy = props.busy;
// Re-sync the local draft if the server-side description changes (e.g.
// after auto-generate). Cheap because re-runs only happen on prop change.
useEffect(function () {
setDraft(p.description || "");
}, [p.description]);
const tag = p.description_auto && p.description ? " [auto, review]" : "";
return h("div", { className: "flex flex-col gap-1 border-l-2 pl-2",
style: { borderColor: p.description ? "#888" : "#cc6" } },
h("div", { className: "flex items-center gap-2 text-xs" },
h("span", { className: "font-medium" }, p.name),
p.is_default ? h("span", { className: "text-[10px] text-muted-foreground" }, "(default)") : null,
p.description_auto && p.description
? h("span", { className: "text-[10px] text-yellow-600" }, "auto — review")
: null,
!p.description
? h("span", { className: "text-[10px] text-yellow-600" }, "⚠ no description")
: null,
),
h("div", { className: "flex items-center gap-2" },
h(Input, {
value: draft,
onChange: function (e) { setDraft(e.target.value); },
placeholder: "What is this profile good at?",
className: "h-7 text-xs flex-1",
}),
h(Button, {
onClick: function () { props.onSave(p.name, draft); },
size: "sm",
disabled: !!busy || draft === (p.description || ""),
title: "Save the description above as user-authored",
}, busy === "save" ? "Saving…" : "Save"),
h(Button, {
onClick: function () { props.onAuto(p.name, true); },
size: "sm",
disabled: !!busy,
title: "Auto-generate a description from this profile's skills and model",
}, busy === "auto" ? "Generating…" : "⚗ Auto"),
),
);
}
function BoardSwitcher(props) {
const { t } = useI18n();
const list = props.boardList || [];
@ -2395,6 +2678,25 @@
});
};
// POST /tasks/:id/decompose — fan a triage task out into a graph
// of child tasks routed to specialist profiles by description.
// Refreshes both the drawer (so the user sees the root flip to
// todo) and the board (so the new children appear in the columns).
const doDecompose = function () {
return SDK.fetchJSON(
withBoard(`${API}/tasks/${encodeURIComponent(props.taskId)}/decompose`, boardSlug),
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({}),
}
).then(function (res) {
load();
props.onRefresh();
return res;
});
};
const addLink = function (parentId) {
return SDK.fetchJSON(withBoard(`${API}/links`, boardSlug), {
method: "POST",
@ -2486,6 +2788,7 @@
boardSlug: boardSlug,
onPatch: doPatch,
onSpecify: doSpecify,
onDecompose: doDecompose,
onAddParent: addLink,
onRemoveParent: removeLink,
onAddChild: addChild,
@ -2559,6 +2862,7 @@
task: t,
onPatch: props.onPatch,
onSpecify: props.onSpecify,
onDecompose: props.onDecompose,
}),
h(DiagnosticsSection, {
task: t,
@ -3023,6 +3327,8 @@
const task = props.task;
const [specifyBusy, setSpecifyBusy] = useState(false);
const [specifyMsg, setSpecifyMsg] = useState(null);
const [decomposeBusy, setDecomposeBusy] = useState(false);
const [decomposeMsg, setDecomposeMsg] = useState(null);
const b = function (label, patch, enabled, confirmMsg) {
return h(Button, {
onClick: function () { if (enabled !== false) props.onPatch(patch, { confirm: confirmMsg }); },
@ -3067,9 +3373,57 @@
}, specifyBusy ? "Specifying…" : "✨ Specify")
: null;
// "Decompose" is the orchestrator-driven fan-out. Like Specify, only
// makes sense on triage-column tasks — elsewhere the backend short-
// circuits with ok:false. When the orchestrator returns fanout:false
// we render the same single-task message as Specify; when it fans
// out we report the child count for quick at-a-glance verification.
const decomposeButton = (task.status === "triage" && props.onDecompose)
? h(Button, {
onClick: function () {
if (decomposeBusy) return;
setDecomposeBusy(true);
setDecomposeMsg(null);
props.onDecompose().then(function (res) {
if (res && res.ok) {
if (res.fanout && res.child_ids && res.child_ids.length) {
setDecomposeMsg({
ok: true,
text: `Decomposed into ${res.child_ids.length} children: ${res.child_ids.join(", ")}`,
});
} else {
const suffix = res.new_title
? ` — retitled: ${res.new_title}`
: "";
setDecomposeMsg({
ok: true,
text: `Single task (no fanout)${suffix}`,
});
}
} else {
setDecomposeMsg({
ok: false,
text: "Decompose failed: " + ((res && res.reason) || "unknown error"),
});
}
}).catch(function (err) {
setDecomposeMsg({
ok: false,
text: "Decompose failed: " + (err.message || String(err)),
});
}).then(function () {
setDecomposeBusy(false);
});
},
disabled: decomposeBusy,
size: "sm",
}, decomposeBusy ? "Decomposing…" : "⚗ Decompose")
: null;
return h("div", null,
h("div", { className: "hermes-kanban-actions" },
specifyButton,
decomposeButton,
b("→ triage", { status: "triage" }, task.status !== "triage"),
b("→ ready", { status: "ready" }, task.status !== "ready"),
// No direct → running button: /tasks/:id PATCH rejects status=running
@ -3091,6 +3445,11 @@
? "hermes-kanban-msg-ok"
: "hermes-kanban-msg-err",
}, specifyMsg.text) : null,
decomposeMsg ? h("div", {
className: decomposeMsg.ok
? "hermes-kanban-msg-ok"
: "hermes-kanban-msg-err",
}, decomposeMsg.text) : null,
);
}

View file

@ -628,7 +628,7 @@ def update_task(task_id: str, payload: UpdateTaskBody, board: Optional[str] = Qu
status_code=400,
detail="Cannot set status to 'running' directly; use the dispatcher/claim path",
)
elif s in ("todo", "triage"):
elif s in {"todo", "triage"}:
ok = _set_status_direct(conn, task_id, s)
else:
raise HTTPException(status_code=400, detail=f"unknown status: {s}")
@ -742,7 +742,7 @@ def _set_status_direct(
(task_id, run_id, json.dumps({"status": new_status}), int(time.time())),
)
# If we re-opened something, children may have gone stale.
if new_status in ("done", "ready"):
if new_status in {"done", "ready"}:
kanban_db.recompute_ready(conn)
return True
@ -868,7 +868,7 @@ def bulk_update(payload: BulkTaskBody, board: Optional[str] = Query(None)):
ok = kanban_db.unblock_task(conn, tid)
else:
ok = _set_status_direct(conn, tid, "ready")
elif s in ("todo", "running", "triage"):
elif s in {"todo", "running", "triage"}:
ok = _set_status_direct(conn, tid, s)
else:
entry.update(ok=False, error=f"unknown status {s!r}")
@ -1535,6 +1535,279 @@ def switch_board(slug: str):
_EVENT_POLL_SECONDS = 0.3
# ---------------------------------------------------------------------------
# Profile metadata & description editing (consumed by the kanban orchestrator)
# ---------------------------------------------------------------------------
class DescribeBody(BaseModel):
description: Optional[str] = None # explicit user-authored text
class DescribeAutoBody(BaseModel):
overwrite: bool = False
@router.get("/profiles")
def list_profile_roster():
"""Return every installed profile with its description.
Consumed by the dashboard's settings panel (orchestrator picker)
and the profile-description editing UI. Profiles without a
description still appear here they're routable on name alone,
just less precisely.
"""
try:
from hermes_cli import profiles as profiles_mod
profiles = profiles_mod.list_profiles()
except Exception as exc:
raise HTTPException(status_code=500, detail=f"failed to list profiles: {exc}")
return {
"profiles": [
{
"name": p.name,
"is_default": bool(p.is_default),
"model": p.model or "",
"provider": p.provider or "",
"description": p.description or "",
"description_auto": bool(p.description_auto),
"skill_count": int(p.skill_count or 0),
}
for p in profiles
],
}
@router.patch("/profiles/{profile_name}")
def update_profile_description(profile_name: str, payload: DescribeBody):
"""Set or clear the description of a profile.
Empty string clears the description; non-empty stores it as a
user-authored description (``description_auto: false``) so the
auto-describer won't overwrite it on a sweep without
``--overwrite``.
"""
try:
from hermes_cli import profiles as profiles_mod
canon = profiles_mod.normalize_profile_name(profile_name)
if canon == "default":
from hermes_constants import get_hermes_home # type: ignore
from pathlib import Path as _Path
profile_dir = _Path(get_hermes_home())
else:
profile_dir = profiles_mod.get_profile_dir(canon)
if not profile_dir.is_dir():
raise HTTPException(status_code=404, detail=f"profile '{profile_name}' not found")
text = (payload.description or "").strip()
profiles_mod.write_profile_meta(
profile_dir,
description=text,
description_auto=False,
)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=500, detail=f"failed to update profile: {exc}")
return {"ok": True, "profile": canon, "description": text}
@router.post("/profiles/{profile_name}/describe-auto")
def auto_describe_profile(profile_name: str, payload: DescribeAutoBody):
"""Generate a description for the named profile via the auxiliary
LLM (``auxiliary.profile_describer``). Persists with
``description_auto: true`` so the dashboard can surface a "review"
badge.
Maps 1:1 to ``hermes profile describe <name> --auto``. Non-OK
outcomes are NOT HTTP errors the UI renders the reason inline
(e.g. "no auxiliary client configured") so the operator can fix
config and retry without a page reload.
"""
try:
from hermes_cli import profile_describer # noqa: WPS433 (intentional)
outcome = profile_describer.describe_profile(
profile_name,
overwrite=bool(payload.overwrite),
)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"describer crashed: {exc}")
return {
"ok": bool(outcome.ok),
"profile": outcome.profile_name,
"reason": outcome.reason,
"description": outcome.description,
}
# ---------------------------------------------------------------------------
# Decompose endpoint (orchestrator-driven fan-out)
# ---------------------------------------------------------------------------
class DecomposeBody(BaseModel):
author: Optional[str] = None
@router.post("/tasks/{task_id}/decompose")
def decompose_task_endpoint(
task_id: str,
payload: DecomposeBody,
board: Optional[str] = Query(None),
):
"""Fan a triage-column task out into a graph of child tasks via the
auxiliary LLM, routed to specialist profiles by description. Maps
1:1 to ``hermes kanban decompose <task_id>``.
Returns the outcome shape used by the CLI: ``{ok, task_id, reason,
fanout, child_ids, new_title}``. A non-OK outcome is NOT an HTTP
error the UI renders the reason inline.
Runs in FastAPI's threadpool (sync ``def``) because the LLM call
can take minutes on reasoning models.
"""
board = _resolve_board(board)
prev_env = os.environ.get("HERMES_KANBAN_BOARD")
try:
os.environ["HERMES_KANBAN_BOARD"] = board or kanban_db.DEFAULT_BOARD
from hermes_cli import kanban_decompose # noqa: WPS433 (intentional)
outcome = kanban_decompose.decompose_task(
task_id,
author=(payload.author or None),
)
finally:
if prev_env is None:
os.environ.pop("HERMES_KANBAN_BOARD", None)
else:
os.environ["HERMES_KANBAN_BOARD"] = prev_env
return {
"ok": bool(outcome.ok),
"task_id": outcome.task_id,
"reason": outcome.reason,
"fanout": bool(outcome.fanout),
"child_ids": outcome.child_ids or [],
"new_title": outcome.new_title,
}
# ---------------------------------------------------------------------------
# Orchestration settings (kanban.orchestrator_profile / default_assignee /
# auto_decompose) — surfaced to the dashboard's settings panel
# ---------------------------------------------------------------------------
class OrchestrationSettingsBody(BaseModel):
orchestrator_profile: Optional[str] = None
default_assignee: Optional[str] = None
auto_decompose: Optional[bool] = None
@router.get("/orchestration")
def get_orchestration_settings():
"""Return the current kanban orchestration knobs from config.yaml
plus the resolved effective values (filling in fallbacks)."""
try:
from hermes_cli.config import load_config
cfg = load_config() or {}
except Exception:
cfg = {}
kanban_cfg = (cfg.get("kanban") or {}) if isinstance(cfg, dict) else {}
explicit_orch = (kanban_cfg.get("orchestrator_profile") or "").strip()
explicit_default = (kanban_cfg.get("default_assignee") or "").strip()
auto_decompose = bool(kanban_cfg.get("auto_decompose", True))
# Resolve fallbacks the same way the decomposer does.
resolved_orch = explicit_orch
resolved_default = explicit_default
try:
from hermes_cli import profiles as profiles_mod
active_default = profiles_mod.get_active_profile_name() or "default"
if not resolved_orch or not profiles_mod.profile_exists(resolved_orch):
resolved_orch = active_default
if not resolved_default or not profiles_mod.profile_exists(resolved_default):
resolved_default = active_default
except Exception:
active_default = "default"
if not resolved_orch:
resolved_orch = active_default
if not resolved_default:
resolved_default = active_default
return {
"orchestrator_profile": explicit_orch,
"default_assignee": explicit_default,
"auto_decompose": auto_decompose,
"resolved_orchestrator_profile": resolved_orch,
"resolved_default_assignee": resolved_default,
"active_profile": active_default,
}
@router.put("/orchestration")
def set_orchestration_settings(payload: OrchestrationSettingsBody):
"""Update the kanban orchestration knobs in ~/.hermes/config.yaml.
Each field is optional only fields explicitly passed are
written. ``orchestrator_profile`` / ``default_assignee`` accept
empty strings to clear the override and fall back to the default
profile.
"""
try:
from hermes_cli.config import load_config, save_config
cfg = load_config() or {}
except Exception as exc:
raise HTTPException(status_code=500, detail=f"failed to load config: {exc}")
kanban_section = cfg.setdefault("kanban", {})
if not isinstance(kanban_section, dict):
kanban_section = {}
cfg["kanban"] = kanban_section
# Validate any non-empty profile names exist before saving.
try:
from hermes_cli import profiles as profiles_mod
except Exception:
profiles_mod = None # type: ignore
if payload.orchestrator_profile is not None:
name = (payload.orchestrator_profile or "").strip()
if name and profiles_mod is not None:
try:
if not profiles_mod.profile_exists(name):
raise HTTPException(
status_code=400,
detail=f"profile '{name}' does not exist",
)
except HTTPException:
raise
except Exception:
pass # fail open if the lookup itself errors
kanban_section["orchestrator_profile"] = name
if payload.default_assignee is not None:
name = (payload.default_assignee or "").strip()
if name and profiles_mod is not None:
try:
if not profiles_mod.profile_exists(name):
raise HTTPException(
status_code=400,
detail=f"profile '{name}' does not exist",
)
except HTTPException:
raise
except Exception:
pass
kanban_section["default_assignee"] = name
if payload.auto_decompose is not None:
kanban_section["auto_decompose"] = bool(payload.auto_decompose)
try:
save_config(cfg)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"failed to save config: {exc}")
# Echo back the resolved state (callers usually re-render from it).
return get_orchestration_settings()
@router.websocket("/events")
async def stream_events(ws: WebSocket):
# Enforce the dashboard session token as a query param — browsers can't

View file

@ -263,7 +263,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes to ByteRover."""
if action not in ("add", "replace") or not content:
if action not in {"add", "replace"} or not content:
return
def _write():
@ -289,7 +289,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
for msg in messages[-10:]: # last 10 messages
role = msg.get("role", "")
content = msg.get("content", "")
if isinstance(content, str) and content.strip() and role in ("user", "assistant"):
if isinstance(content, str) and content.strip() and role in {"user", "assistant"}:
parts.append(f"{role}: {content[:500]}")
if not parts:

View file

@ -416,7 +416,7 @@ def _build_embedded_profile_env(config: dict[str, Any], *, llm_api_key: str | No
current_base_url = config.get("llm_base_url") or os.environ.get("HINDSIGHT_API_LLM_BASE_URL", "")
# The embedded daemon expects OpenAI wire format for these providers.
daemon_provider = "openai" if current_provider in ("openai_compatible", "openrouter") else current_provider
daemon_provider = "openai" if current_provider in {"openai_compatible", "openrouter"} else current_provider
env_values = {
"HINDSIGHT_API_LLM_PROVIDER": str(daemon_provider),
@ -596,7 +596,7 @@ class HindsightMemoryProvider(MemoryProvider):
try:
cfg = _load_config()
mode = cfg.get("mode", "cloud")
if mode in ("local", "local_embedded"):
if mode in {"local", "local_embedded"}:
available, _ = _check_local_runtime()
return available
if mode == "local_external":
@ -888,7 +888,7 @@ class HindsightMemoryProvider(MemoryProvider):
from hindsight import HindsightEmbedded
HindsightEmbedded.__del__ = lambda self: None
llm_provider = self._config.get("llm_provider", "")
if llm_provider in ("openai_compatible", "openrouter"):
if llm_provider in {"openai_compatible", "openrouter"}:
llm_provider = "openai"
logger.debug("Creating HindsightEmbedded client (profile=%s, provider=%s)",
self._config.get("profile", "hermes"), llm_provider)
@ -1132,7 +1132,7 @@ class HindsightMemoryProvider(MemoryProvider):
self._mode = "disabled"
return
self._api_key = self._config.get("apiKey") or self._config.get("api_key") or os.environ.get("HINDSIGHT_API_KEY", "")
default_url = _DEFAULT_LOCAL_URL if self._mode in ("local_embedded", "local_external") else _DEFAULT_API_URL
default_url = _DEFAULT_LOCAL_URL if self._mode in {"local_embedded", "local_external"} else _DEFAULT_API_URL
self._api_url = self._config.get("api_url") or os.environ.get("HINDSIGHT_API_URL", default_url)
self._llm_base_url = self._config.get("llm_base_url", "")
@ -1152,10 +1152,10 @@ class HindsightMemoryProvider(MemoryProvider):
self._budget = budget if budget in _VALID_BUDGETS else "mid"
memory_mode = self._config.get("memory_mode", "hybrid")
self._memory_mode = memory_mode if memory_mode in ("context", "tools", "hybrid") else "hybrid"
self._memory_mode = memory_mode if memory_mode in {"context", "tools", "hybrid"} else "hybrid"
prefetch_method = self._config.get("recall_prefetch_method") or self._config.get("prefetch_method", "recall")
self._prefetch_method = prefetch_method if prefetch_method in ("recall", "reflect") else "recall"
self._prefetch_method = prefetch_method if prefetch_method in {"recall", "reflect"} else "recall"
# Bank options
self._bank_mission = self._config.get("bank_mission", "")

View file

@ -283,7 +283,7 @@ class HonchoMemoryProvider(MemoryProvider):
# ----- Port #4053: cron guard -----
agent_context = kwargs.get("agent_context", "")
platform = kwargs.get("platform", "cli")
if agent_context in ("cron", "flush") or platform == "cron":
if agent_context in {"cron", "flush"} or platform == "cron":
logger.debug("Honcho skipped: cron/flush context (agent_context=%s, platform=%s)",
agent_context, platform)
self._cron_skipped = True
@ -404,7 +404,7 @@ class HonchoMemoryProvider(MemoryProvider):
# pop_context_result() in prefetch(). Dialectic prewarm runs the
# full configured depth and writes into _prefetch_result so turn 1
# consumes the result directly.
if self._recall_mode in ("context", "hybrid"):
if self._recall_mode in {"context", "hybrid"}:
try:
self._manager.prefetch_context(self._session_key)
except Exception as e:

View file

@ -233,7 +233,7 @@ _profile_override: str | None = None
def _host_key() -> str:
"""Return the active Honcho host key, derived from the current Hermes profile."""
if _profile_override:
if _profile_override in ("default", "custom"):
if _profile_override in {"default", "custom"}:
return HOST
return f"{HOST}.{_profile_override}"
return resolve_active_host()
@ -295,13 +295,13 @@ def _resolve_api_key(cfg: dict) -> str:
parsed = urlparse(base_url)
except (TypeError, ValueError):
parsed = None
if parsed and parsed.scheme in ("http", "https") and parsed.netloc:
if parsed and parsed.scheme in {"http", "https"} and parsed.netloc:
return "local"
# Schemeless but looks like a host (contains '.' or ':' and isn't
# a boolean literal): let it through so legacy configs don't
# regress into "no API key configured" when they previously worked.
lowered = base_url.lower()
if lowered not in ("true", "false", "none", "null") and any(
if lowered not in {"true", "false", "none", "null"} and any(
c in base_url for c in ".:"
) and not base_url.isdigit():
return "local"
@ -334,7 +334,7 @@ def _ensure_sdk_installed() -> bool:
print(" honcho-ai is not installed.")
answer = _prompt("Install it now? (honcho-ai>=2.0.1)", default="y")
if answer.lower() not in ("y", "yes"):
if answer.lower() not in {"y", "yes"}:
print(" Skipping install. Run: pip install 'honcho-ai>=2.0.1'\n")
return False
@ -382,7 +382,7 @@ def cmd_setup(args) -> None:
for h in ("localhost", "127.0.0.1", "::1")
) else "cloud"
deploy = _prompt("Cloud or local?", default=current_deploy)
is_local = deploy.lower() in ("local", "l")
is_local = deploy.lower() in {"local", "l"}
# Clean up legacy snake_case key
cfg.pop("base_url", None)
@ -441,7 +441,7 @@ def cmd_setup(args) -> None:
print(" directional -- all observations on, each AI peer builds its own view (default)")
print(" unified -- shared pool, user observes self, AI observes others only")
new_obs = _prompt("Observation mode", default=current_obs)
if new_obs in ("unified", "directional"):
if new_obs in {"unified", "directional"}:
hermes_host["observationMode"] = new_obs
else:
hermes_host["observationMode"] = "directional"
@ -457,17 +457,17 @@ def cmd_setup(args) -> None:
try:
hermes_host["writeFrequency"] = int(new_wf)
except (ValueError, TypeError):
hermes_host["writeFrequency"] = new_wf if new_wf in ("async", "turn", "session") else "async"
hermes_host["writeFrequency"] = new_wf if new_wf in {"async", "turn", "session"} else "async"
# --- 6. Recall mode ---
_raw_recall = hermes_host.get("recallMode") or cfg.get("recallMode", "hybrid")
current_recall = "hybrid" if _raw_recall not in ("hybrid", "context", "tools") else _raw_recall
current_recall = "hybrid" if _raw_recall not in {"hybrid", "context", "tools"} else _raw_recall
print("\n Recall mode:")
print(" hybrid -- auto-injected context + Honcho tools available (default)")
print(" context -- auto-injected context only, Honcho tools hidden")
print(" tools -- Honcho tools only, no auto-injected context")
new_recall = _prompt("Recall mode", default=current_recall)
if new_recall in ("hybrid", "context", "tools"):
if new_recall in {"hybrid", "context", "tools"}:
hermes_host["recallMode"] = new_recall
# --- 7. Context token budget ---
@ -477,7 +477,7 @@ def cmd_setup(args) -> None:
print(" uncapped -- no limit (default)")
print(" N -- token limit per turn (e.g. 1200)")
new_ctx_tokens = _prompt("Context tokens", default=current_display)
if new_ctx_tokens.strip().lower() in ("none", "uncapped", "no limit"):
if new_ctx_tokens.strip().lower() in {"none", "uncapped", "no limit"}:
hermes_host.pop("contextTokens", None)
elif new_ctx_tokens.strip() == "":
pass # keep current
@ -517,7 +517,7 @@ def cmd_setup(args) -> None:
print(" high -- complex behavioral patterns")
print(" max -- thorough audit-level analysis")
new_reasoning = _prompt("Reasoning level", default=current_reasoning)
if new_reasoning in ("minimal", "low", "medium", "high", "max"):
if new_reasoning in {"minimal", "low", "medium", "high", "max"}:
hermes_host["dialecticReasoningLevel"] = new_reasoning
else:
hermes_host["dialecticReasoningLevel"] = "low"
@ -530,7 +530,7 @@ def cmd_setup(args) -> None:
print(" per-repo -- one session per git repository")
print(" global -- single session across all directories")
new_strat = _prompt("Session strategy", default=current_strat)
if new_strat in ("per-session", "per-repo", "per-directory", "global"):
if new_strat in {"per-session", "per-repo", "per-directory", "global"}:
hermes_host["sessionStrategy"] = new_strat
hermes_host["enabled"] = True
@ -1130,7 +1130,7 @@ def cmd_migrate(args) -> None:
print(" Paste the key when prompted.")
print()
answer = _prompt(" Run 'hermes honcho setup' now?", default="y")
if answer.lower() in ("y", "yes"):
if answer.lower() in {"y", "yes"}:
cmd_setup(args)
cfg = _read_config()
has_key = bool(cfg.get("apiKey", ""))
@ -1176,7 +1176,7 @@ def cmd_migrate(args) -> None:
print(" hermes honcho migrate — this step handles it interactively")
if has_key:
answer = _prompt(" Upload user memory files to Honcho now?", default="y")
if answer.lower() in ("y", "yes"):
if answer.lower() in {"y", "yes"}:
try:
from plugins.memory.honcho.client import (
HonchoClientConfig,
@ -1226,7 +1226,7 @@ def cmd_migrate(args) -> None:
print()
if has_key:
answer = _prompt(" Seed AI identity from all detected files now?", default="y")
if answer.lower() in ("y", "yes"):
if answer.lower() in {"y", "yes"}:
try:
from plugins.memory.honcho.client import (
HonchoClientConfig,

View file

@ -47,7 +47,7 @@ def resolve_active_host() -> str:
try:
from hermes_cli.profiles import get_active_profile_name
profile = get_active_profile_name()
if profile and profile not in ("default", "custom"):
if profile and profile not in {"default", "custom"}:
return f"{HOST}.{profile}"
except Exception:
pass
@ -653,7 +653,7 @@ class HonchoClientConfig:
return base
# per-directory: one Honcho session per working directory (default)
if self.session_strategy in ("per-directory", "per-session"):
if self.session_strategy in {"per-directory", "per-session"}:
base = Path(cwd).name
if self.session_peer_prefix and self.peer_name:
return f"{self.peer_name}-{base}"

View file

@ -357,7 +357,7 @@ def _is_windows_absolute_path(value: str) -> bool:
len(value) >= 3
and value[0].isalpha()
and value[1] == ":"
and value[2] in ("/", "\\")
and value[2] in {"/", "\\"}
)
@ -381,7 +381,7 @@ def _is_local_path_reference(value: str) -> bool:
def _path_from_file_uri(uri: str) -> Path | str:
parsed = urlparse(uri)
if parsed.netloc not in ("", "localhost"):
if parsed.netloc not in {"", "localhost"}:
return f"Unsupported non-local file URI: {uri}"
return Path(url2pathname(parsed.path)).expanduser()
@ -755,7 +755,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
level = args.get("level", "overview")
summary_level = level in ("abstract", "overview")
summary_level = level in {"abstract", "overview"}
# OpenViking expects directory URIs for pseudo summary files
# (e.g. viking://user/hermes/.overview.md).
resolved_uri = self._normalize_summary_uri(uri) if summary_level else uri
@ -832,7 +832,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
result = self._unwrap_result(resp)
# Format list/tree results for readability
if action in ("list", "tree"):
if action in {"list", "tree"}:
raw_entries = result
if isinstance(result, dict):
raw_entries = result.get("entries") or result.get("items") or result.get("children") or []
@ -887,7 +887,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
payload: Dict[str, Any] = {}
for key in ("reason", "to", "parent", "instruction", "wait", "timeout"):
if key in args and args[key] not in (None, ""):
if key in args and args[key] not in {None, ""}:
payload[key] = args[key]
parsed_url = urlparse(url)

View file

@ -88,9 +88,9 @@ def _as_bool(value: Any, default: bool) -> bool:
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("true", "1", "yes", "y", "on"):
if lowered in {"true", "1", "yes", "y", "on"}:
return True
if lowered in ("false", "0", "no", "n", "off"):
if lowered in {"false", "0", "no", "n", "off"}:
return False
return default
@ -508,7 +508,7 @@ class SupermemoryMemoryProvider(MemoryProvider):
self._allowed_containers = [self._container_tag] + list(self._custom_containers)
agent_context = kwargs.get("agent_context", "")
self._write_enabled = agent_context not in ("cron", "flush", "subagent")
self._write_enabled = agent_context not in {"cron", "flush", "subagent"}
self._active = bool(self._api_key)
self._client = None
if self._active:
@ -598,7 +598,7 @@ class SupermemoryMemoryProvider(MemoryProvider):
cleaned = []
for message in messages or []:
role = message.get("role")
if role not in ("user", "assistant"):
if role not in {"user", "assistant"}:
continue
content = _clean_text_for_capture(str(message.get("content", "")))
if content:

View file

@ -74,9 +74,9 @@ class DeepSeekProfile(ProviderProfile):
# its server default (currently high).
if isinstance(reasoning_config, dict):
effort = (reasoning_config.get("effort") or "").strip().lower()
if effort in ("xhigh", "max"):
if effort in {"xhigh", "max"}:
top_level["reasoning_effort"] = "max"
elif effort in ("low", "medium", "high"):
elif effort in {"low", "medium", "high"}:
top_level["reasoning_effort"] = effort
return extra_body, top_level

View file

@ -37,7 +37,7 @@ class KimiProfile(ProviderProfile):
# Enabled
extra_body["thinking"] = {"type": "enabled"}
effort = (reasoning_config.get("effort") or "").strip().lower()
if effort in ("low", "medium", "high"):
if effort in {"low", "medium", "high"}:
top_level["reasoning_effort"] = effort
else:
top_level["reasoning_effort"] = "medium"

View file

@ -1539,7 +1539,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
if sender_email and space_name:
self._last_sender_by_chat[space_name] = sender_email.strip().lower()
chat_type = "dm" if space_type in ("DIRECT_MESSAGE", "DM") else "group"
chat_type = "dm" if space_type in {"DIRECT_MESSAGE", "DM"} else "group"
text = msg.get("argumentText") or msg.get("text") or ""
text = text.strip()
@ -1935,7 +1935,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
return True
except HttpError as exc:
status = getattr(getattr(exc, "resp", None), "status", None)
if status in (403, 404):
if status in {403, 404}:
return False
logger.debug(
"[GoogleChat] delete_message failed: %s",
@ -1958,7 +1958,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
update_mask = ",".join(update_mask_fields) or "text"
# Patch body cannot carry thread (immutable).
patch_body = {k: v for k, v in body.items() if k not in ("thread",)}
patch_body = {k: v for k, v in body.items() if k not in {"thread",}}
def _do_patch() -> Dict[str, Any]:
return (
@ -2791,7 +2791,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
upload_resp = await asyncio.to_thread(_upload)
except HttpError as exc:
status = getattr(getattr(exc, "resp", None), "status", None)
if status in (401, 403):
if status in {401, 403}:
logger.warning(
"[GoogleChat] media.upload auth failure for identity=%s "
"(token revoked or scope missing) — falling back to "
@ -2927,7 +2927,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
display = info.get("displayName") or chat_id
return {
"name": display,
"type": "dm" if space_type in ("DIRECT_MESSAGE", "DM") else "group",
"type": "dm" if space_type in {"DIRECT_MESSAGE", "DM"} else "group",
"chat_id": chat_id,
}
@ -3246,7 +3246,7 @@ async def _standalone_send(
return {"error": "Google Chat standalone send: aiohttp not installed"}
try:
async with _aiohttp.ClientSession(timeout=_aiohttp.ClientTimeout(total=30.0)) as session:
async with _aiohttp.ClientSession(timeout=_aiohttp.ClientTimeout(total=30.0), trust_env=True) as session:
async with session.post(
url,
json=body,

View file

@ -112,7 +112,7 @@ class IRCAdapter(BasePlatformAdapter):
self.nickname = os.getenv("IRC_NICKNAME") or extra.get("nickname", "hermes-bot")
self.channel = os.getenv("IRC_CHANNEL") or extra.get("channel", "")
self.use_tls = (
os.getenv("IRC_USE_TLS", "").lower() in ("1", "true", "yes")
os.getenv("IRC_USE_TLS", "").lower() in {"1", "true", "yes"}
if os.getenv("IRC_USE_TLS")
else extra.get("use_tls", True)
)
@ -680,7 +680,7 @@ def _env_enablement() -> dict | None:
seed["nickname"] = nickname
use_tls = os.getenv("IRC_USE_TLS", "").strip().lower()
if use_tls:
seed["use_tls"] = use_tls in ("1", "true", "yes")
seed["use_tls"] = use_tls in {"1", "true", "yes"}
# Passwords live in PlatformConfig.extra as well for back-compat with
# existing config.yaml users; env-reads at construct time still win.
if os.getenv("IRC_SERVER_PASSWORD"):
@ -756,7 +756,7 @@ async def _standalone_send(
nickname = os.getenv("IRC_NICKNAME") or extra.get("nickname", "hermes-bot")
use_tls_env = os.getenv("IRC_USE_TLS")
if use_tls_env is not None:
use_tls = use_tls_env.lower() in ("1", "true", "yes")
use_tls = use_tls_env.lower() in {"1", "true", "yes"}
else:
use_tls = bool(extra.get("use_tls", True))
@ -821,7 +821,7 @@ async def _standalone_send(
await _raw(f"PONG :{payload}")
elif cmd == "001":
registered = True
elif cmd in ("432", "433"):
elif cmd in {"432", "433"}:
nick_attempts += 1
if nick_attempts > max_nick_attempts:
return {"error": "IRC standalone send: too many nick collisions"}
@ -829,7 +829,7 @@ async def _standalone_send(
# mutated value, so the suffix stays bounded.
standalone_nick = f"{nick_base}-cron-{nick_attempts}"[:30]
await _raw(f"NICK {standalone_nick}")
elif cmd in ("464", "465"):
elif cmd in {"464", "465"}:
return {"error": f"IRC standalone send: server rejected client ({cmd})"}
if nickserv_password:
@ -860,9 +860,9 @@ async def _standalone_send(
if jcmd == "PING":
payload = jmsg["params"][0] if jmsg["params"] else ""
await _raw(f"PONG :{payload}")
elif jcmd in ("366", "JOIN"):
elif jcmd in {"366", "JOIN"}:
joined = True
elif jcmd in ("403", "405", "471", "473", "474", "475"):
elif jcmd in {"403", "405", "471", "473", "474", "475"}:
return {"error": f"IRC standalone send: JOIN {target} rejected ({jcmd})"}
# Bytes-aware per-line splitting so multi-line plain text never

View file

@ -325,7 +325,7 @@ class RequestCache:
def mark_delivered(self, request_id: str) -> None:
entry = self._entries.get(request_id)
if entry is None or entry.state not in (State.READY, State.ERROR):
if entry is None or entry.state not in {State.READY, State.ERROR}:
return
entry.state = State.DELIVERED
entry.updated_at = time.time()
@ -447,7 +447,7 @@ class _LineClient:
async def reply(self, reply_token: str, messages: List[Dict[str, Any]]) -> None:
import aiohttp
timeout = aiohttp.ClientTimeout(total=self._timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
async with session.post(
LINE_REPLY_URL,
headers=self._headers,
@ -460,7 +460,7 @@ class _LineClient:
async def push(self, chat_id: str, messages: List[Dict[str, Any]]) -> None:
import aiohttp
timeout = aiohttp.ClientTimeout(total=self._timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
async with session.post(
LINE_PUSH_URL,
headers=self._headers,
@ -479,7 +479,7 @@ class _LineClient:
clamped = max(5, min(60, (seconds // 5) * 5 or 5))
try:
timeout = aiohttp.ClientTimeout(total=5.0)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
await session.post(
LINE_LOADING_URL,
headers=self._headers,
@ -493,7 +493,7 @@ class _LineClient:
import aiohttp
url = LINE_CONTENT_URL_FMT.format(message_id=message_id)
timeout = aiohttp.ClientTimeout(total=30.0)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
async with session.get(url, headers={"Authorization": f"Bearer {self._token}"}) as resp:
if resp.status >= 400:
raise RuntimeError(f"LINE content {resp.status}")
@ -504,7 +504,7 @@ class _LineClient:
import aiohttp
timeout = aiohttp.ClientTimeout(total=10.0)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiohttp.ClientSession(timeout=timeout, trust_env=True) as session:
async with session.get(LINE_BOT_INFO_URL, headers=self._headers) as resp:
if resp.status >= 400:
return None
@ -614,7 +614,7 @@ def _truthy_env(name: str, default: bool = False) -> bool:
v = os.getenv(name)
if v is None:
return default
return v.strip().lower() in ("1", "true", "yes", "on")
return v.strip().lower() in {"1", "true", "yes", "on"}
# ---------------------------------------------------------------------------
@ -910,7 +910,7 @@ class LineAdapter(BasePlatformAdapter):
await self._handle_message_event(event)
elif event_type == "postback":
await self._handle_postback_event(event)
elif event_type in ("follow", "unfollow", "join", "leave"):
elif event_type in {"follow", "unfollow", "join", "leave"}:
logger.info("LINE: lifecycle event %s from %s", event_type, source)
else:
logger.debug("LINE: ignoring event type %r", event_type)
@ -939,7 +939,7 @@ class LineAdapter(BasePlatformAdapter):
if msg_type == "text":
text = msg.get("text", "") or ""
elif msg_type in ("image", "audio", "video", "file"):
elif msg_type in {"image", "audio", "video", "file"}:
local_path = await self._download_media(message_id, msg_type)
if local_path:
media_urls.append(local_path)

View file

@ -101,11 +101,11 @@ def _guess_extension(data: bytes) -> str:
def _is_image_ext(ext: str) -> bool:
return ext.lower() in (".jpg", ".jpeg", ".png", ".gif", ".webp")
return ext.lower() in {".jpg", ".jpeg", ".png", ".gif", ".webp"}
def _is_audio_ext(ext: str) -> bool:
return ext.lower() in (".mp3", ".wav", ".ogg", ".m4a", ".aac")
return ext.lower() in {".mp3", ".wav", ".ogg", ".m4a", ".aac"}
# ---------------------------------------------------------------------------
@ -326,12 +326,12 @@ class SimplexAdapter(BasePlatformAdapter):
# Filter out messages sent by us (direction == "snd")
meta = chat_item.get("meta") or {}
direction = (meta.get("itemStatus") or {}).get("type", "")
if direction in ("sndSent", "sndSentDirect", "sndSentViaProxy", "sndNew"):
if direction in {"sndSent", "sndSentDirect", "sndSentViaProxy", "sndNew"}:
return
# Determine chat type and IDs
chat_type_raw = chat_info.get("type", "")
is_group = chat_type_raw in ("group", "groupInfo")
is_group = chat_type_raw in {"group", "groupInfo"}
if is_group:
group_info = chat_info.get("groupInfo") or chat_info.get("group") or {}
@ -374,7 +374,7 @@ class SimplexAdapter(BasePlatformAdapter):
media_urls: List[str] = []
media_types: List[str] = []
file_info = chat_item.get("file") or {}
if file_info and file_info.get("fileStatus") not in ("cancelled", "error"):
if file_info and file_info.get("fileStatus") not in {"cancelled", "error"}:
file_id = file_info.get("fileId")
file_name = file_info.get("fileName", "file")
if file_id:

View file

@ -566,7 +566,7 @@ async def _standalone_send(
# Per-request timeouts so a slow STS endpoint cannot starve the
# subsequent activity POST of its budget.
per_request_timeout = _aiohttp.ClientTimeout(total=15.0)
async with _aiohttp.ClientSession() as session:
async with _aiohttp.ClientSession(trust_env=True) as session:
async with session.post(
token_url,
data={
@ -841,7 +841,7 @@ class TeamsAdapter(BasePlatformAdapter):
# bot silently treated every clicker as authorized — meaning any
# Teams user who could message the bot could approve dangerous commands.
allowed_csv = os.getenv("TEAMS_ALLOWED_USERS", "").strip()
allow_all = os.getenv("TEAMS_ALLOW_ALL_USERS", "").strip().lower() in ("1", "true", "yes")
allow_all = os.getenv("TEAMS_ALLOW_ALL_USERS", "").strip().lower() in {"1", "true", "yes"}
if not allow_all:
if not allowed_csv:

View file

@ -99,15 +99,15 @@ def teams_pipeline_command(args: argparse.Namespace) -> int:
return 2
try:
if action in ("list", "ls"):
if action in {"list", "ls"}:
_cmd_list(args)
elif action == "show":
_cmd_show(args)
elif action in ("run", "replay"):
elif action in {"run", "replay"}:
_cmd_run(args)
elif action in ("fetch", "test"):
elif action in {"fetch", "test"}:
_cmd_fetch(args)
elif action in ("subscriptions", "subs"):
elif action in {"subscriptions", "subs"}:
_cmd_subscriptions(args)
elif action == "subscribe":
_cmd_subscribe(args)
@ -117,7 +117,7 @@ def teams_pipeline_command(args: argparse.Namespace) -> int:
_cmd_delete_subscription(args)
elif action == "maintain-subscriptions":
_cmd_maintain_subscriptions(args)
elif action in ("token-health", "token"):
elif action in {"token-health", "token"}:
_cmd_token_health(args)
elif action == "validate":
_cmd_validate(args)

View file

@ -33,7 +33,7 @@ def _meeting_path(meeting_ref: TeamsMeetingRef | str) -> str:
def _wrap_graph_error(exc: MicrosoftGraphAPIError, *, missing_message: str) -> TeamsMeetingError:
if exc.status_code in (401, 403):
if exc.status_code in {401, 403}:
return TeamsMeetingPermissionError(str(exc))
if exc.status_code == 404:
return TeamsMeetingNotFoundError(missing_message)
@ -286,7 +286,7 @@ async def fetch_call_record_artifact(
try:
payload = await client.get_json(f"/communications/callRecords/{quote(call_record_id, safe='')}")
except MicrosoftGraphAPIError as exc:
if exc.status_code in (401, 403) and allow_permission_errors:
if exc.status_code in {401, 403} and allow_permission_errors:
return None
if exc.status_code == 404:
return None

View file

@ -145,7 +145,7 @@ class MeetingArtifact:
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if self.artifact_type not in ("transcript", "recording", "call_record"):
if self.artifact_type not in {"transcript", "recording", "call_record"}:
raise ValueError(
"MeetingArtifact.artifact_type must be transcript, recording, or call_record."
)

View file

@ -62,7 +62,7 @@ def build_pipeline_runtime_config(gateway_config: Any) -> dict[str, Any]:
"chat_id",
):
value = teams_extra.get(key)
if value not in (None, ""):
if value not in {None, ""}:
teams_delivery[key] = value
if teams_delivery: