diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index 4a586d7f0fd..03e8b58e16c 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -2535,3 +2535,56 @@ def sanitize_anthropic_kwargs(api_kwargs: Any, *, log_prefix: str = "") -> Any: sorted(leaked), ) return api_kwargs + + +def _is_stream_unavailable_error(exc: Exception) -> bool: + """Return True when an Anthropic stream call should fall back to create().""" + err_lower = str(exc).lower() + if "stream" in err_lower and "not supported" in err_lower: + return True + if "invokemodelwithresponsestream" in err_lower: + from agent.bedrock_adapter import is_streaming_access_denied_error + + return is_streaming_access_denied_error(exc) + return False + + +def create_anthropic_message( + client: Any, + api_kwargs: dict, + *, + log_prefix: str = "", + prefer_stream: bool = True, +) -> Any: + """Create an Anthropic message, aggregating via stream when available. + + Some Anthropic-compatible gateways are SSE-only: they ignore non-streaming + requests and return ``text/event-stream`` even for ``messages.create()``. + The SDK can surface that as raw text, so callers that expect a Message then + crash on ``.content``. Prefer ``messages.stream().get_final_message()`` to + match the main turn path, falling back to ``create()`` only for providers + that explicitly do not support streaming, such as restricted Bedrock roles. + """ + sanitize_anthropic_kwargs(api_kwargs, log_prefix=log_prefix) + + messages_api = getattr(client, "messages", None) + stream_fn = getattr(messages_api, "stream", None) + if prefer_stream and callable(stream_fn): + stream_kwargs = dict(api_kwargs) + stream_kwargs.pop("stream", None) + try: + with stream_fn(**stream_kwargs) as stream: + return stream.get_final_message() + except Exception as exc: + if not _is_stream_unavailable_error(exc): + raise + logger.debug( + "%sAnthropic Messages stream unavailable; falling back to " + "messages.create(): %s", + log_prefix, + exc, + ) + + create_kwargs = dict(api_kwargs) + create_kwargs.pop("stream", None) + return messages_api.create(**create_kwargs) diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 86a1c765a78..f28b5f60156 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -997,7 +997,7 @@ class _AnthropicCompletionsAdapter: self._is_oauth = is_oauth def create(self, **kwargs) -> Any: - from agent.anthropic_adapter import build_anthropic_kwargs + from agent.anthropic_adapter import build_anthropic_kwargs, create_anthropic_message from agent.transports import get_transport messages = kwargs.get("messages", []) @@ -1041,7 +1041,7 @@ class _AnthropicCompletionsAdapter: if not _forbids_sampling_params(model): anthropic_kwargs["temperature"] = temperature - response = self._client.messages.create(**anthropic_kwargs) + response = create_anthropic_message(self._client, anthropic_kwargs) _transport = get_transport("anthropic_messages") _nr = _transport.normalize_response( response, strip_tool_prefix=self._is_oauth diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 7f175fff97f..4ff67871934 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -290,6 +290,7 @@ def run_codex_app_server_turn( original_user_message=original_user_message, final_response=turn.final_text, interrupted=False, + messages=messages, ) except Exception: logger.debug("external memory sync raised", exc_info=True) diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index ef69ac68329..0ccc9649428 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -3197,15 +3197,22 @@ def run_conversation( # Terminal — flush buffered context so the user sees # what was tried before the abort. agent._flush_status_buffer() + # Summarize once: Cloudflare/proxy HTML challenge pages and + # other raw provider bodies must be collapsed to a short + # one-liner here, otherwise the full page leaks into the + # returned ``error`` field and downstream consumers deliver + # it verbatim (e.g. a cron failure notification dumped a + # ~60KB Cloudflare challenge page as 31 Discord messages). + _nonretryable_summary = agent._summarize_api_error(api_error) if classified.reason == FailoverReason.content_policy_blocked: agent._emit_status( f"❌ Provider safety filter blocked this request: " - f"{agent._summarize_api_error(api_error)}" + f"{_nonretryable_summary}" ) else: agent._emit_status( f"❌ Non-retryable error (HTTP {status_code}): " - f"{agent._summarize_api_error(api_error)}" + f"{_nonretryable_summary}" ) agent._vprint(f"{agent.log_prefix}❌ Non-retryable client error (HTTP {status_code}). Aborting.", force=True) agent._vprint(f"{agent.log_prefix} 🔌 Provider: {_provider} Model: {_model}", force=True) @@ -3290,18 +3297,17 @@ def run_conversation( else: agent._persist_session(messages, conversation_history) if classified.reason == FailoverReason.content_policy_blocked: - _summary = agent._summarize_api_error(api_error) _policy_response = ( "⚠️ The model provider's safety filter blocked this request " "(not a Hermes/gateway failure).\n\n" - f"Provider message: {_summary}\n\n" + f"Provider message: {_nonretryable_summary}\n\n" f"{_CONTENT_POLICY_RECOVERY_HINT}" ) return _content_policy_blocked_result( messages, api_call_count, final_response=_policy_response, - error_detail=_summary, + error_detail=_nonretryable_summary, ) return { "final_response": None, @@ -3309,7 +3315,7 @@ def run_conversation( "api_calls": api_call_count, "completed": False, "failed": True, - "error": str(api_error), + "error": _nonretryable_summary, } if retry_count >= max_retries: diff --git a/agent/credential_pool.py b/agent/credential_pool.py index 04b22c76a68..b791ac4f82c 100644 --- a/agent/credential_pool.py +++ b/agent/credential_pool.py @@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple from hermes_constants import OPENROUTER_BASE_URL from hermes_cli.config import load_env +from agent.secret_scope import get_secret as _get_secret from agent.credential_persistence import ( is_borrowed_credential_source, sanitize_borrowed_credential_payload, @@ -1666,7 +1667,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup _env_file = load_env() def _env_val(key: str) -> str: - return (_env_file.get(key) or os.environ.get(key) or "").strip() + return (_env_file.get(key) or _get_secret(key, "") or "").strip() anthropic_api_key = _env_val("ANTHROPIC_API_KEY") anthropic_oauth_env = ( @@ -1952,7 +1953,7 @@ def _seed_from_env(provider: str, entries: List[PooledCredential]) -> Tuple[bool # changes to the .env file. def _get_env_prefer_dotenv(key: str) -> str: env_file = load_env() - val = env_file.get(key) or os.environ.get(key) or "" + val = env_file.get(key) or _get_secret(key, "") or "" return val.strip() # Honour user suppression — `hermes auth remove ` for an diff --git a/agent/message_content.py b/agent/message_content.py new file mode 100644 index 00000000000..c42bf408550 --- /dev/null +++ b/agent/message_content.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + + +_NON_TEXT_PART_TYPES = {"image", "image_url", "input_image", "audio", "input_audio"} +_TEXT_KEYS = ("text", "content", "input_text", "output_text", "summary_text") + + +def _field(value: Any, key: str) -> Any: + if isinstance(value, Mapping): + return value.get(key) + return getattr(value, key, None) + + +def _text_from_part(part: Any) -> str: + if part is None: + return "" + if isinstance(part, str): + return part + + part_type = str(_field(part, "type") or "").strip().lower() + if part_type in _NON_TEXT_PART_TYPES: + return "" + + for key in _TEXT_KEYS: + text = _field(part, key) + if isinstance(text, str): + return text + return "" + + +def flatten_message_text(content: Any, *, sep: str = "\n") -> str: + """Return the visible text from common chat/Responses message content shapes.""" + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + chunks = [_text_from_part(part) for part in content] + return sep.join(chunk for chunk in chunks if chunk) + + text = _text_from_part(content) + if text: + return text + try: + return str(content) + except Exception: + return "" diff --git a/agent/secret_scope.py b/agent/secret_scope.py new file mode 100644 index 00000000000..26022ca9b0e --- /dev/null +++ b/agent/secret_scope.py @@ -0,0 +1,205 @@ +"""Profile-scoped credential resolution for multi-profile gateway multiplexing. + +The multiplexing gateway serves many profiles from one process. Each profile +has its own ``.env`` with its own provider keys and platform tokens, so we +**cannot** union them into the process-global ``os.environ`` (that would leak +profile A's keys to profile B's turns, and to every subprocess spawned with +``env=dict(os.environ)``). + +This module provides a fail-closed, context-local secret scope: + +- ``set_secret_scope(mapping)`` installs the active profile's secrets for the + current task (a contextvar, so it propagates into the agent's worker thread + via ``copy_context()`` exactly like the HERMES_HOME override). +- ``get_secret(name)`` reads from that scope. When multiplexing is **active** + and no scope is set, it RAISES rather than silently falling back to + ``os.environ`` — an un-migrated or newly-added call site fails loud at that + exact line instead of leaking another profile's value. When multiplexing is + **off** (the default), it transparently reads ``os.environ`` so the + single-profile gateway and every non-gateway caller behave exactly as before. + +Design rationale lives in ``docs/design/multiplexing-gateway.md`` (Workstream A). +""" +from __future__ import annotations + +import os +from contextvars import ContextVar, Token +from pathlib import Path +from typing import Dict, Mapping, Optional + + +# ── multiplex-active flag ──────────────────────────────────────────────── +# Process-global: set once at gateway startup when gateway.multiplex_profiles +# is true. Governs whether get_secret() fails closed on an unscoped read. +# A plain module global (not a contextvar): it describes the deployment mode, +# not a per-task value. +_MULTIPLEX_ACTIVE: bool = False + + +def set_multiplex_active(active: bool) -> None: + """Mark whether the process is running as a profile multiplexer. + + Called once at gateway startup. When True, ``get_secret`` fails closed on + an unscoped read instead of falling back to ``os.environ``. + """ + global _MULTIPLEX_ACTIVE + _MULTIPLEX_ACTIVE = bool(active) + + +def is_multiplex_active() -> bool: + """Return whether the process is running as a profile multiplexer.""" + return _MULTIPLEX_ACTIVE + + +# ── the secret scope contextvar ────────────────────────────────────────── +_SECRET_SCOPE: ContextVar[Optional[Mapping[str, str]]] = ContextVar( + "_SECRET_SCOPE", default=None +) + + +class UnscopedSecretError(RuntimeError): + """Raised when a secret is read in multiplex mode with no scope installed. + + This is the fail-closed signal: it means a credential read reached + ``get_secret`` without a profile scope active, which in a multiplexer would + otherwise leak whichever profile's value happened to be in ``os.environ``. + The fix is to wrap the call path in ``set_secret_scope(...)`` (the per-turn + / per-adapter profile scope), not to widen the allowlist. + """ + + +def set_secret_scope(secrets: Optional[Mapping[str, str]]) -> Token: + """Install the active profile's secret mapping for the current context. + + Returns a token for ``reset_secret_scope``. Pass ``None`` to clear. + """ + return _SECRET_SCOPE.set(secrets) + + +def reset_secret_scope(token: Token) -> None: + """Restore the previous secret scope.""" + _SECRET_SCOPE.reset(token) + + +def current_secret_scope() -> Optional[Mapping[str, str]]: + """Return the active secret mapping, or None when no scope is installed.""" + return _SECRET_SCOPE.get() + + +# ── genuinely-global env vars (NOT per-profile secrets) ────────────────── +# These are process/deployment-level settings, not profile credentials. They +# legitimately live in os.environ and must keep reading from it even in +# multiplex mode — routing them through the fail-closed path would wrongly +# crash. Anything matching is read from os.environ regardless of scope. +# +# Membership test is by exact name OR prefix (see _is_global_env). Keep this +# list tight: when in doubt a value is a profile secret, not a global. +_GLOBAL_ENV_EXACT = frozenset({ + # Hermes runtime / deployment + "HERMES_HOME", "HERMES_PROFILE", "HERMES_GATEWAY_LOCK_DIR", + "HERMES_MAX_ITERATIONS", "HERMES_MAX_TOKENS", "HERMES_API_TIMEOUT", + "HERMES_REDACT_SECRETS", "HERMES_NOUS_TIMEOUT_SECONDS", + "_HERMES_GATEWAY", + # OS / interpreter + "PATH", "HOME", "USER", "LANG", "LC_ALL", "TZ", "PWD", "SHELL", "TMPDIR", + "VIRTUAL_ENV", "PYTHONPATH", "SSL_CERT_FILE", + # Kanban paths (per-board, not per-profile-secret) + "HERMES_KANBAN_DB", "HERMES_KANBAN_WORKSPACES_ROOT", "HERMES_KANBAN_BOARD", +}) +_GLOBAL_ENV_PREFIXES = ( + "HERMES_KANBAN_", + "HERMES_TELEGRAM_", # tuning knobs (batch delays, fallback toggles) — NOT the token + "TERMINAL_", # terminal/sandbox backend settings +) + + +def _is_global_env(name: str) -> bool: + """Return True for genuinely process-global (non-profile-secret) env vars.""" + if name in _GLOBAL_ENV_EXACT: + return True + return any(name.startswith(p) for p in _GLOBAL_ENV_PREFIXES) + + +def get_secret(name: str, default: Optional[str] = None) -> Optional[str]: + """Resolve a credential by env-var name, honoring the active profile scope. + + Resolution order: + + 1. Genuinely-global vars (``_is_global_env``) always read ``os.environ`` — + they are deployment settings, not profile secrets. + 2. When a secret scope is installed (multiplexed turn), read from it; an + absent key returns ``default``. The scope is authoritative — we do NOT + fall through to ``os.environ``, because in a multiplexer ``os.environ`` + may hold another profile's value. + 3. No scope installed: + - multiplex INACTIVE (default deployment): read ``os.environ`` — + identical to the legacy ``os.getenv`` behavior every caller had before. + - multiplex ACTIVE: FAIL CLOSED. Raise ``UnscopedSecretError`` so the + missing scope is caught loudly instead of leaking a cross-profile value. + """ + if _is_global_env(name): + val = os.environ.get(name) + return val if val is not None else default + + scope = _SECRET_SCOPE.get() + if scope is not None: + val = scope.get(name) + return val if val is not None else default + + if _MULTIPLEX_ACTIVE: + raise UnscopedSecretError( + f"get_secret({name!r}) called with no profile secret scope active " + f"while multiplexing is on. This credential read must run inside a " + f"set_secret_scope(...) block (the per-turn / per-adapter profile " + f"scope). Reading os.environ here would risk leaking another " + f"profile's value. See docs/design/multiplexing-gateway.md " + f"(Workstream A)." + ) + + val = os.environ.get(name) + return val if val is not None else default + + +def load_env_file(env_path: Path) -> Dict[str, str]: + """Parse a ``.env`` file into a plain dict WITHOUT touching ``os.environ``. + + Used to load a profile's secrets into an isolated mapping for + ``set_secret_scope``. Mirrors python-dotenv's basic parsing (KEY=VALUE, + ``export`` prefix, ``#`` comments, optional matching quotes) but never + mutates the process environment — that isolation is the whole point. + """ + secrets: Dict[str, str] = {} + try: + text = env_path.read_text(encoding="utf-8") + except (FileNotFoundError, OSError, UnicodeDecodeError): + return secrets + + for raw in text.splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + if line.startswith("export "): + line = line[len("export "):].lstrip() + if "=" not in line: + continue + key, _, value = line.partition("=") + key = key.strip() + if not key: + continue + value = value.strip() + if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'): + value = value[1:-1] + secrets[key] = value + + return secrets + + +def build_profile_secret_scope(hermes_home: Path) -> Dict[str, str]: + """Build a profile's secret mapping from its ``/.env``. + + Returns a fresh dict (safe to install via ``set_secret_scope``). Genuinely + global vars are intentionally NOT copied in — ``get_secret`` reads those + from ``os.environ`` directly, so the scope holds only profile secrets. + """ + return load_env_file(Path(hermes_home) / ".env") + diff --git a/apps/desktop/src/app/desktop-controller.tsx b/apps/desktop/src/app/desktop-controller.tsx index 05dfbbc764f..5ca73061135 100644 --- a/apps/desktop/src/app/desktop-controller.tsx +++ b/apps/desktop/src/app/desktop-controller.tsx @@ -14,6 +14,7 @@ import { useSkinCommand } from '@/themes/use-skin-command' import { formatRefValue } from '../components/assistant-ui/directive-text' import { getCronJobs, getSessionMessages, listAllProfileSessions, type SessionInfo, triggerCronJob } from '../hermes' import { type ChatMessage, chatMessageText, preserveLocalAssistantErrors, toChatMessages } from '../lib/chat-messages' +import { storedSessionIdForNotification } from '../lib/session-ids' import { isMessagingSource, LOCAL_SESSION_SOURCE_IDS, @@ -276,16 +277,20 @@ export function DesktopController() { } }, []) - // Notification click: the main process already focused the window; jump to its session. + // Notification click: the main process already focused the window; jump to its + // session. Notifications are tagged with the gateway *runtime* session id, but + // the chat route is keyed by the *stored* id — navigating with the runtime id + // resumes a non-existent stored session ("session not found") and strands the + // user. Translate runtime -> stored before navigating. useEffect(() => { const unsubscribe = window.hermesDesktop?.onFocusSession?.(sessionId => { if (sessionId) { - navigate(sessionRoute(sessionId)) + navigate(sessionRoute(storedSessionIdForNotification(sessionId, runtimeIdByStoredSessionIdRef.current))) } }) return () => unsubscribe?.() - }, [navigate]) + }, [navigate, runtimeIdByStoredSessionIdRef]) // Notification action button (Approve/Reject) — resolve in place, no navigation. useEffect(() => { diff --git a/apps/desktop/src/app/session/hooks/use-prompt-actions.ts b/apps/desktop/src/app/session/hooks/use-prompt-actions.ts index 829119f65b4..ed3f6498cd1 100644 --- a/apps/desktop/src/app/session/hooks/use-prompt-actions.ts +++ b/apps/desktop/src/app/session/hooks/use-prompt-actions.ts @@ -32,6 +32,7 @@ import { clearComposerAttachments, type ComposerAttachment, setComposerAttachmentUploadState, + setComposerDraft, terminalContextBlocksFromDraft, updateComposerAttachment } from '@/store/composer' @@ -951,8 +952,26 @@ export function usePromptActions({ return } + // send / prefill carry an optional `notice` (e.g. "⊙ Goal set …") + // that the backend wants shown as a system line before the message + // is acted on. Mirrors the TUI's createSlashHandler — without it a + // `/goal ` looked like it did nothing. + if ((dispatch.type === 'send' || dispatch.type === 'prefill') && dispatch.notice?.trim()) { + renderSlashOutput(dispatch.notice.trim()) + } + const message = ('message' in dispatch ? dispatch.message : '')?.trim() ?? '' + // /undo returns a prefill directive: drop the backed-up message into + // the composer for editing instead of submitting it immediately. + if (dispatch.type === 'prefill') { + if (message) { + setComposerDraft(message) + } + + return + } + if (!message) { renderSlashOutput( `/${name}: ${dispatch.type === 'skill' ? 'skill payload missing message' : 'empty message'}` diff --git a/apps/desktop/src/app/settings/providers-settings.test.tsx b/apps/desktop/src/app/settings/providers-settings.test.tsx index 27c029b442c..1909604a07a 100644 --- a/apps/desktop/src/app/settings/providers-settings.test.tsx +++ b/apps/desktop/src/app/settings/providers-settings.test.tsx @@ -2,7 +2,7 @@ import { cleanup, fireEvent, render, screen, waitFor } from '@testing-library/re import { atom } from 'nanostores' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import type { OAuthProvider } from '@/types/hermes' +import type { EnvVarInfo, OAuthProvider } from '@/types/hermes' const listOAuthProviders = vi.fn() const disconnectOAuthProvider = vi.fn() @@ -36,6 +36,25 @@ function provider(id: string, loggedIn: boolean, patch: Partial = } } +// One `/api/env` row (an EnvVarInfo) for the API-keys view. Mirrors the +// `provider()` factory above: a valid base + per-test overrides, typed against +// the real response shape so it can't drift from EnvVarInfo. +function keyVar(patch: Partial = {}): EnvVarInfo { + return { + advanced: false, + category: 'provider', + description: '', + is_password: true, + is_set: false, + provider: '', + provider_label: '', + redacted_value: null, + tools: [], + url: '', + ...patch + } +} + beforeEach(() => { onboarding.set({ manual: false }) getEnvVars.mockResolvedValue({}) @@ -97,4 +116,56 @@ describe('ProvidersSettings', () => { expect(screen.queryByRole('button', { name: 'Remove Qwen Code' })).toBeNull() expect(screen.getByText(/managed by its own CLI/)).toBeTruthy() }) + + it('renders a Keys card for a backend-tagged provider with no PROVIDER_GROUPS prefix', async () => { + // A provider the backend catalog tags (provider/provider_label) but that has + // no desktop PROVIDER_GROUPS prefix row must still render its own card — + // this is the GUI/CLI drift fix: membership comes from the backend, not + // from the hand-maintained prefix list. + getEnvVars.mockResolvedValue({ + WIDGETAI_API_KEY: keyVar({ + provider: 'widgetai', + provider_label: 'WidgetAI', + url: 'https://widgetai.example/keys' + }) + }) + listOAuthProviders.mockResolvedValue({ providers: [] }) + + const { ProvidersSettings } = await import('./providers-settings') + render() + + expect(await screen.findByText('WidgetAI')).toBeTruthy() + }) + + it('orders API-key providers by priority then name, and filters them via search', async () => { + // These three providers have no curated PROVIDER_GROUPS priority, so they + // share the default priority and fall back to alphabetical among themselves + // (Acme, Middle, Zebra) — exercising the name tiebreak of the priority sort. + getEnvVars.mockResolvedValue({ + ZEBRA_API_KEY: keyVar({ provider: 'zebra', provider_label: 'Zebra' }), + ACME_API_KEY: keyVar({ provider: 'acme', provider_label: 'Acme' }), + MIDDLE_API_KEY: keyVar({ provider: 'middle', provider_label: 'Middle' }) + }) + listOAuthProviders.mockResolvedValue({ providers: [] }) + + const { ProvidersSettings } = await import('./providers-settings') + render() + + // Equal priority → alphabetical tiebreak: Acme, Middle, Zebra. + await screen.findByText('Acme') + const labels = screen.getAllByText(/Acme|Middle|Zebra/).map(el => el.textContent) + expect(labels).toEqual(['Acme', 'Middle', 'Zebra']) + + // Typing narrows the list to matching providers only. + const search = screen.getByPlaceholderText('Search providers…') + fireEvent.change(search, { target: { value: 'mid' } }) + + await waitFor(() => expect(screen.queryByText('Acme')).toBeNull()) + expect(screen.getByText('Middle')).toBeTruthy() + expect(screen.queryByText('Zebra')).toBeNull() + + // A non-matching query shows the empty-state copy. + fireEvent.change(search, { target: { value: 'nonesuch-xyz' } }) + expect(await screen.findByText('No providers match your search.')).toBeTruthy() + }) }) diff --git a/apps/desktop/src/app/settings/providers-settings.tsx b/apps/desktop/src/app/settings/providers-settings.tsx index 2585e13995d..31ced164fff 100644 --- a/apps/desktop/src/app/settings/providers-settings.tsx +++ b/apps/desktop/src/app/settings/providers-settings.tsx @@ -12,6 +12,7 @@ import { sortProviders } from '@/components/desktop-onboarding-overlay' import { Button } from '@/components/ui/button' +import { SearchField } from '@/components/ui/search-field' import { disconnectOAuthProvider, listOAuthProviders } from '@/hermes' import { useI18n } from '@/i18n' import { Check, ChevronDown, ChevronRight, KeyRound, Loader2, Terminal, Trash2 } from '@/lib/icons' @@ -45,8 +46,17 @@ export const PROVIDER_VIEWS = ['accounts', 'keys'] as const export type ProviderView = (typeof PROVIDER_VIEWS)[number] // Group the env catalog by provider — one ListRow per vendor plus optional -// advanced overrides (base URL, region, etc.). Groups without a key field and -// the "Other" bucket are skipped. +// advanced overrides (base URL, region, etc.). Groups without a key field are +// skipped. +// +// Grouping key precedence: +// 1. Backend `provider_label` / `provider` (from the unified provider catalog +// in hermes_cli/provider_catalog.py) — the SAME provider identity +// `hermes model` uses. This is authoritative: a provider tagged by the +// backend always renders a card, even with no PROVIDER_GROUPS row. +// 2. Desktop prefix match (`providerGroup`) — legacy fallback for provider +// env vars that predate the backend tagging. +// Only entries that resolve to neither (the "Other" bucket) are skipped. function buildProviderKeyGroups(vars: Record): ProviderKeyGroup[] { const buckets = new Map() @@ -55,7 +65,9 @@ function buildProviderKeyGroups(vars: Record): ProviderKeyGr continue } - const name = providerGroup(key) + // Prefer the backend-supplied provider label/id so the Keys tab groups by + // the same identity the CLI picker uses; fall back to the prefix guess. + const name = info.provider_label?.trim() || info.provider?.trim() || providerGroup(key) if (name === 'Other') { continue @@ -73,6 +85,9 @@ function buildProviderKeyGroups(vars: Record): ProviderKeyGr continue } + // Presentation overlay (priority, blurb, docs) is keyed by the prefix-based + // group name; when the backend introduced this provider it may have no + // overlay entry, so fall back to the backend/env metadata for display. const meta = providerMeta(name) groups.push({ @@ -131,6 +146,7 @@ function OAuthPicker({ const rest = featured ? ordered.filter(p => p.id !== FEATURED_ID) : ordered // Keep connected accounts grouped and always visible; only the unconnected // providers hide behind the disclosure, so the page leads with what's set up. + // Both lists preserve `sortProviders` order (curated priority, then name). const connected = rest.filter(p => p.status?.logged_in) const others = rest.filter(p => !p.status?.logged_in) const collapsible = others.length > 0 @@ -284,6 +300,8 @@ export function ProvidersSettings({ onClose, onViewChange, view }: ProvidersSett const [oauthProviders, setOauthProviders] = useState([]) const [openProvider, setOpenProvider] = useState(null) const [disconnecting, setDisconnecting] = useState(null) + // Free-text filter for the API-keys view (provider name / env-var key / desc). + const [keyQuery, setKeyQuery] = useState('') // The onboarding overlay owns the OAuth flow. Watch its `manual` flag so we // re-read connection state when the user finishes (or dismisses) a sign-in // they launched from this page — otherwise the cards keep their stale status. @@ -372,20 +390,49 @@ export function ProvidersSettings({ onClose, onViewChange, view }: ProvidersSett const keyGroups = buildProviderKeyGroups(vars) if (showApiKeys) { + const q = keyQuery.trim().toLowerCase() + const visibleGroups = q + ? keyGroups.filter(group => { + const haystack = [ + group.name, + group.description ?? '', + group.primary[0], + ...group.advanced.map(([k]) => k) + ] + + return haystack.some(s => s.toLowerCase().includes(q)) + }) + : keyGroups + return ( {keyGroups.length > 0 ? ( -
- {keyGroups.map(group => ( - setOpenProvider(group.name)} - onToggle={() => setOpenProvider(prev => (prev === group.name ? null : group.name))} - rowProps={rowProps} - /> - ))} +
+ + {visibleGroups.length > 0 ? ( +
+ {visibleGroups.map(group => ( + setOpenProvider(group.name)} + onToggle={() => setOpenProvider(prev => (prev === group.name ? null : group.name))} + rowProps={rowProps} + /> + ))} +
+ ) : ( +
+ {t.settings.providers.noKeysMatch} +
+ )}
) : ( diff --git a/apps/desktop/src/app/types.ts b/apps/desktop/src/app/types.ts index 9500468482c..1adc2bdec4e 100644 --- a/apps/desktop/src/app/types.ts +++ b/apps/desktop/src/app/types.ts @@ -106,6 +106,13 @@ export interface SkillCommandDispatchResponse { export interface SendCommandDispatchResponse { type: 'send' message: string + notice?: string +} + +export interface PrefillCommandDispatchResponse { + type: 'prefill' + message: string + notice?: string } export type CommandDispatchResponse = @@ -113,6 +120,7 @@ export type CommandDispatchResponse = | AliasCommandDispatchResponse | SkillCommandDispatchResponse | SendCommandDispatchResponse + | PrefillCommandDispatchResponse export type SidebarNavId = 'artifacts' | 'command-center' | 'messaging' | 'new-session' | 'settings' | 'skills' diff --git a/apps/desktop/src/components/assistant-ui/thread.tsx b/apps/desktop/src/components/assistant-ui/thread.tsx index c5b20cedd3e..1ac97c200ca 100644 --- a/apps/desktop/src/components/assistant-ui/thread.tsx +++ b/apps/desktop/src/components/assistant-ui/thread.tsx @@ -859,7 +859,10 @@ const ProcessNotificationNote: FC<{ text: string }> = ({ text }) => { output -
+          
             {detail}
           
diff --git a/apps/desktop/src/components/chat/terminal-output.tsx b/apps/desktop/src/components/chat/terminal-output.tsx index 946ec2386be..034f20f2a81 100644 --- a/apps/desktop/src/components/chat/terminal-output.tsx +++ b/apps/desktop/src/components/chat/terminal-output.tsx @@ -41,7 +41,11 @@ export function TerminalOutput({ className, text }: TerminalOutputProps) { }, [text]) return ( -
+
         {text}
       
diff --git a/apps/desktop/src/i18n/en.ts b/apps/desktop/src/i18n/en.ts index d27741c44db..158de543c49 100644 --- a/apps/desktop/src/i18n/en.ts +++ b/apps/desktop/src/i18n/en.ts @@ -581,6 +581,8 @@ export const en: Translations = { removedMessage: provider => `${provider} was removed.`, failedRemove: provider => `Could not remove ${provider}`, noProviderKeys: 'No provider API keys available.', + searchKeys: 'Search providers…', + noKeysMatch: 'No providers match your search.', loading: 'Loading providers...' }, sessions: { diff --git a/apps/desktop/src/i18n/ja.ts b/apps/desktop/src/i18n/ja.ts index 194452ed407..244fc12ca49 100644 --- a/apps/desktop/src/i18n/ja.ts +++ b/apps/desktop/src/i18n/ja.ts @@ -700,6 +700,8 @@ export const ja = defineLocale({ removedMessage: provider => `${provider} を削除しました。`, failedRemove: provider => `${provider} を削除できませんでした`, noProviderKeys: '利用可能なプロバイダー API キーがありません。', + searchKeys: 'プロバイダーを検索…', + noKeysMatch: '一致するプロバイダーがありません。', loading: 'プロバイダーを読み込み中...' }, sessions: { diff --git a/apps/desktop/src/i18n/types.ts b/apps/desktop/src/i18n/types.ts index 94489e5de9e..90168d28e86 100644 --- a/apps/desktop/src/i18n/types.ts +++ b/apps/desktop/src/i18n/types.ts @@ -462,6 +462,8 @@ export interface Translations { removedMessage: (provider: string) => string failedRemove: (provider: string) => string noProviderKeys: string + searchKeys: string + noKeysMatch: string loading: string } sessions: { diff --git a/apps/desktop/src/i18n/zh-hant.ts b/apps/desktop/src/i18n/zh-hant.ts index de329631098..c1eb3b8f883 100644 --- a/apps/desktop/src/i18n/zh-hant.ts +++ b/apps/desktop/src/i18n/zh-hant.ts @@ -677,6 +677,8 @@ export const zhHant = defineLocale({ removedMessage: provider => `${provider} 已移除。`, failedRemove: provider => `無法移除 ${provider}`, noProviderKeys: '沒有可用的提供方 API 金鑰。', + searchKeys: '搜尋提供方…', + noKeysMatch: '沒有符合的提供方。', loading: '正在載入提供方...' }, sessions: { diff --git a/apps/desktop/src/i18n/zh.ts b/apps/desktop/src/i18n/zh.ts index ac8c5c0b958..161a438b9e7 100644 --- a/apps/desktop/src/i18n/zh.ts +++ b/apps/desktop/src/i18n/zh.ts @@ -774,6 +774,8 @@ export const zh: Translations = { removedMessage: provider => `${provider} 已移除。`, failedRemove: provider => `无法移除 ${provider}`, noProviderKeys: '没有可用的提供方 API 密钥。', + searchKeys: '搜索提供方…', + noKeysMatch: '没有匹配的提供方。', loading: '正在加载提供方...' }, sessions: { diff --git a/apps/desktop/src/lib/chat-runtime.test.ts b/apps/desktop/src/lib/chat-runtime.test.ts index c2a9099a1a8..1b4efb33ad5 100644 --- a/apps/desktop/src/lib/chat-runtime.test.ts +++ b/apps/desktop/src/lib/chat-runtime.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it } from 'vitest' import type { ComposerAttachment } from '@/store/composer' -import { coerceThinkingText, optimisticAttachmentRef } from './chat-runtime' +import { coerceThinkingText, optimisticAttachmentRef, parseCommandDispatch } from './chat-runtime' const DATA_URL = 'data:image/png;base64,iVBORw0KGgoAAAANS' @@ -52,3 +52,31 @@ describe('coerceThinkingText', () => { ).toBe('') }) }) + +describe('parseCommandDispatch', () => { + it('keeps the notice on a send directive (e.g. /goal set)', () => { + // The backend's /goal set returns {type:send, notice:"⊙ Goal set …", message}. + // Dropping the notice made /goal look like it did nothing in the desktop app. + const parsed = parseCommandDispatch({ type: 'send', notice: '⊙ Goal set', message: 'do the thing' }) + + expect(parsed).toEqual({ type: 'send', message: 'do the thing', notice: '⊙ Goal set' }) + }) + + it('keeps message-only send directives working (no notice)', () => { + expect(parseCommandDispatch({ type: 'send', message: 'hi' })).toEqual({ + type: 'send', + message: 'hi', + notice: undefined + }) + }) + + it('parses a prefill directive with its notice (e.g. /undo)', () => { + const parsed = parseCommandDispatch({ type: 'prefill', notice: 'backed up 1 turn', message: 'edit me' }) + + expect(parsed).toEqual({ type: 'prefill', message: 'edit me', notice: 'backed up 1 turn' }) + }) + + it('rejects a prefill directive missing its message', () => { + expect(parseCommandDispatch({ type: 'prefill', notice: 'x' })).toBeNull() + }) +}) diff --git a/apps/desktop/src/lib/chat-runtime.ts b/apps/desktop/src/lib/chat-runtime.ts index ac5273a2236..c573a1e5899 100644 --- a/apps/desktop/src/lib/chat-runtime.ts +++ b/apps/desktop/src/lib/chat-runtime.ts @@ -238,7 +238,12 @@ export function parseCommandDispatch(raw: unknown): CommandDispatchResponse | nu return typeof row.name === 'string' ? { type: 'skill', name: row.name, message: str(row.message) } : null case 'send': - return typeof row.message === 'string' ? { type: 'send', message: row.message } : null + return typeof row.message === 'string' ? { type: 'send', message: row.message, notice: str(row.notice) } : null + + case 'prefill': + return typeof row.message === 'string' + ? { type: 'prefill', message: row.message, notice: str(row.notice) } + : null default: return null diff --git a/apps/desktop/src/lib/session-ids.test.ts b/apps/desktop/src/lib/session-ids.test.ts new file mode 100644 index 00000000000..b5653c8eecd --- /dev/null +++ b/apps/desktop/src/lib/session-ids.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from 'vitest' + +import { storedSessionIdForNotification } from './session-ids' + +describe('storedSessionIdForNotification', () => { + it('translates a runtime id back to its stored id', () => { + // The route is keyed by the stored id, but notifications carry the runtime + // id. Resolving runtime -> stored keeps notification-click navigation from + // resuming a non-existent stored session ("session not found"). + const map = new Map([['stored-abc', 'runtime-123']]) + + expect(storedSessionIdForNotification('runtime-123', map)).toBe('stored-abc') + }) + + it('returns the id unchanged when no mapping is known', () => { + // A notification for a session this window never opened may already carry a + // stored id; let the resume/REST lookup handle it as-is. + const map = new Map([['stored-abc', 'runtime-123']]) + + expect(storedSessionIdForNotification('stored-xyz', map)).toBe('stored-xyz') + }) + + it('returns the id unchanged for an empty map', () => { + expect(storedSessionIdForNotification('runtime-123', new Map())).toBe('runtime-123') + }) + + it('resolves the correct stored id among several sessions', () => { + const map = new Map([ + ['stored-1', 'runtime-1'], + ['stored-2', 'runtime-2'], + ['stored-3', 'runtime-3'] + ]) + + expect(storedSessionIdForNotification('runtime-2', map)).toBe('stored-2') + }) + + it('does not treat a stored id as a runtime id (keys are not matched)', () => { + // The map is stored -> runtime. A value that only appears as a *key* must + // not be rewritten, otherwise an already-stored id could be mangled. + const map = new Map([['stored-1', 'runtime-1']]) + + expect(storedSessionIdForNotification('stored-1', map)).toBe('stored-1') + }) +}) diff --git a/apps/desktop/src/lib/session-ids.ts b/apps/desktop/src/lib/session-ids.ts new file mode 100644 index 00000000000..c97cadc2628 --- /dev/null +++ b/apps/desktop/src/lib/session-ids.ts @@ -0,0 +1,26 @@ +// The gateway tags every event — and therefore every native notification — +// with the *runtime* session id (the key under which the session lives in the +// gateway's in-memory `_sessions` map). The chat route, however, is keyed by +// the *stored* session id (`stored_session_id`), which is a different value: +// a brand-new chat gets a runtime id immediately but its stored id is assigned +// when the first turn persists. Navigating to a runtime id therefore tries to +// resume a stored session that does not exist ("session not found") and +// strands the user, who experiences it as the running session being destroyed. +// +// `runtimeIdByStoredSessionId` maps stored -> runtime; this resolves the +// reverse so notification-click navigation lands on the real route. The id is +// returned unchanged when no mapping is known — it may already be a stored id +// (e.g. a notification for a session this window never opened), in which case +// the normal resume/REST lookup handles it. +export function storedSessionIdForNotification( + id: string, + runtimeIdByStoredSessionId: ReadonlyMap +): string { + for (const [storedId, runtimeId] of runtimeIdByStoredSessionId) { + if (runtimeId === id) { + return storedId + } + } + + return id +} diff --git a/apps/desktop/src/styles.css b/apps/desktop/src/styles.css index 03b348c9d84..2aff7a21c77 100644 --- a/apps/desktop/src/styles.css +++ b/apps/desktop/src/styles.css @@ -680,6 +680,7 @@ textarea, [contenteditable]:not([contenteditable='false']), [data-slot='aui_user-message-root'], [data-slot='aui_assistant-message-content'], +[data-slot='aui_system-message-root'], [data-selectable-text='true'], [data-selectable-text='true'] * { -webkit-user-select: text; diff --git a/apps/desktop/src/types/hermes.ts b/apps/desktop/src/types/hermes.ts index a497e3f10a9..b67cc3041a7 100644 --- a/apps/desktop/src/types/hermes.ts +++ b/apps/desktop/src/types/hermes.ts @@ -108,6 +108,12 @@ export interface EnvVarInfo { description: string is_password: boolean is_set: boolean + // Backend-derived provider grouping hints (from the unified provider catalog + // in hermes_cli/provider_catalog.py). When present, the Keys tab groups by + // this provider identity — the SAME one `hermes model` uses — instead of + // desktop-only env-var prefix guesses. Empty for non-provider env vars. + provider?: string + provider_label?: string redacted_value: null | string tools: string[] url: null | string diff --git a/cli.py b/cli.py index f6a9393d34a..52bfe6cdb0a 100644 --- a/cli.py +++ b/cli.py @@ -6959,24 +6959,43 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin): self._close_model_picker() def _handle_model_switch(self, cmd_original: str): - """Handle /model command — switch model for this session. + """Handle /model command — switch model. Supports: /model — show current model + usage hints - /model — switch for this session only - /model --global — switch and persist to config.yaml + /model — switch model (persists by default) + /model --session — switch for this session only + /model --global — switch and persist (explicit) /model --provider — switch provider + model /model --provider — switch to provider, auto-detect model + + Persistence defaults to on (``model.persist_switch_by_default`` in + config.yaml, default True). Use ``--session`` for a one-off switch. """ - from hermes_cli.model_switch import switch_model, parse_model_flags + from hermes_cli.model_switch import ( + switch_model, + parse_model_flags, + resolve_persist_behavior, + ) from hermes_cli.providers import get_label # Parse args from the original command parts = cmd_original.split(None, 1) # split off '/model' raw_args = parts[1].strip() if len(parts) > 1 else "" - # Parse --provider, --global, and --refresh flags - model_input, explicit_provider, persist_global, force_refresh = parse_model_flags(raw_args) + # Parse --provider, --global, --session, and --refresh flags + ( + model_input, + explicit_provider, + is_global_flag, + force_refresh, + is_session, + ) = parse_model_flags(raw_args) + # Resolve the effective persistence once: --session overrides the + # config-gated default, --global forces persist, otherwise defer to + # model.persist_switch_by_default (defaults to True so /model survives + # across sessions). + persist_global = resolve_persist_behavior(is_global_flag, is_session) # --refresh: wipe the on-disk picker cache before building the # provider list. Forces a live re-fetch of every authed provider's @@ -7024,7 +7043,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin): if not providers: _cprint(" No authenticated providers found.") _cprint("") - _cprint(" /model switch model") + _cprint(" /model switch model (persists)") + _cprint(" /model --session switch for this session only") _cprint(" /model --provider switch provider") _cprint(" /model --refresh re-fetch live model lists") return @@ -7144,7 +7164,7 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin): save_config_value("model.default", result.new_model) if result.provider_changed: save_config_value("model.provider", result.target_provider) - _cprint(" Saved to config.yaml (--global)") + _cprint(" Saved to config.yaml") else: _cprint(" (session only — add --global to persist)") @@ -11917,7 +11937,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin): # --- /model picker modal --- if self._model_picker_state: try: - self._handle_model_picker_selection() + # Picker selections persist by default (same default as + # /model ); honour model.persist_switch_by_default. + from hermes_cli.model_switch import resolve_persist_behavior + + self._handle_model_picker_selection( + persist_global=resolve_persist_behavior(False, False) + ) except Exception as _exc: _cprint(f" ✗ Model selection failed: {_exc}") self._close_model_picker() @@ -13527,13 +13553,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin): style=style, full_screen=False, mouse_support=False, - # The status bar contains wall-clock read-outs (live prompt elapsed - # and idle-since-last-turn). Once a turn finishes there may be no - # further events to invalidate the app, so prompt_toolkit would keep - # rendering the first post-turn value (usually ``✓ 0s``) forever. - # A low-rate refresh keeps the clock honest without reintroducing a - # custom repaint thread or touching conversation state. - refresh_interval=1.0, + # Read from display.cli_refresh_interval (default 0 = disabled). + # When non-zero, prompt_toolkit redraws the UI on this cadence + # during idle, keeping wall-clock status-bar read-outs ticking. + # Set to 0 to suppress background redraws entirely — avoids + # fighting terminal auto-scroll in non-fullscreen mode (Xshell, + # iTerm2, Windows Terminal). See #48309. + refresh_interval=float(CLI_CONFIG.get("display", {}).get("cli_refresh_interval", 0)), # Erase the live bottom chrome (status bar, input box, separator # rules) on exit instead of freezing a final copy into scrollback. # Without this, prompt_toolkit's render_as_done teardown repaints diff --git a/cron/scheduler.py b/cron/scheduler.py index 4f7940db0b1..51bc4e5721e 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -15,6 +15,7 @@ import contextvars import json import logging import os +import re import shutil import subprocess import sys @@ -45,6 +46,59 @@ from hermes_time import now as _hermes_now logger = logging.getLogger(__name__) +def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str: + """Return a compact one-line failure message for chat delivery. + + Full details stay in the cron output directory and the logs. Chat should + show the operator what broke without dumping provider JSON, retry noise, or + stack traces into the delivery channel. + """ + job_name = job.get("name") or job.get("id") or "cron job" + text = (error or "unknown error").strip() + lower = text.lower() + + # Provider/API failures are the common noisy path. Keep these short. + if "429" in text or "rate limit" in lower or "usage limit" in lower: + reason = "rate limit" + if "weekly usage limit" in lower: + reason = "weekly usage limit" + elif "quota" in lower: + reason = "quota limit" + return ( + f"⚠️ Cron '{job_name}' failed: provider {reason}. " + "Fallback chain was exhausted or unavailable. " + "Full details saved in cron output." + ) + + if "readtimeout" in lower or "timed out" in lower or "timeout" in lower: + return ( + f"⚠️ Cron '{job_name}' failed: provider timeout. " + "Fallback chain was exhausted or unavailable. " + "Full details saved in cron output." + ) + + # Match authentication/authorization wording at a word boundary and the + # 401/403 status codes as whole tokens, so "oauth", "4015" and similar do + # not trip a misleading auth message. + if re.search(r"authenticat|authoriz", lower) or re.search(r"\b(401|403)\b", text): + return ( + f"⚠️ Cron '{job_name}' failed: provider authentication error. " + "Full details saved in cron output." + ) + + # Strip common exception wrappers and collapse provider payloads. Bound + # the input first so a multi-KB provider blob cannot slow the + # substitutions. + cleaned = re.sub( + r"^(RuntimeError|Exception|ValueError|HTTPStatusError):\s*", + "", text[:2000], + ) + cleaned = re.sub(r"\s+", " ", cleaned).strip() + if len(cleaned) > 180: + cleaned = cleaned[:177].rstrip() + "..." + return f"⚠️ Cron '{job_name}' failed: {cleaned}" + + class CronPromptInjectionBlocked(Exception): """Raised by _build_job_prompt when the fully-assembled prompt trips the injection scanner. Caught in run_job so the operator sees a clean @@ -1992,7 +2046,7 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) - # Deliver the final response to the origin/target chat. # If the agent responded with [SILENT], skip delivery (but # output is already saved above). Failed jobs always deliver. - deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" + deliver_content = final_response if success else _summarize_cron_failure_for_delivery(job, error) # Treat whitespace-only final responses the same as empty # responses: do not deliver a blank message, and let the # empty-response guard below mark the run as a soft failure. diff --git a/gateway/config.py b/gateway/config.py index 0ebf23e12d0..5b89c56b375 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -545,6 +545,13 @@ class GatewayConfig: thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants max_concurrent_sessions: Optional[int] = None # Positive int caps simultaneous active chat sessions + # Multi-profile multiplexing (opt-in; default off preserves one-gateway-per-profile). + # When True, the default profile's gateway serves inbound messages for every + # profile on the host: profiles are stamped into session keys and (in later + # phases) per-profile adapters/credentials are resolved. When False, the + # gateway behaves exactly as before — single HERMES_HOME, no profile stamping. + multiplex_profiles: bool = False + # Unauthorized DM policy unauthorized_dm_behavior: str = "pair" # "pair" or "ignore" @@ -650,6 +657,7 @@ class GatewayConfig: "group_sessions_per_user": self.group_sessions_per_user, "thread_sessions_per_user": self.thread_sessions_per_user, "max_concurrent_sessions": self.max_concurrent_sessions, + "multiplex_profiles": self.multiplex_profiles, "unauthorized_dm_behavior": self.unauthorized_dm_behavior, "streaming": self.streaming.to_dict(), "session_store_max_age_days": self.session_store_max_age_days, @@ -695,7 +703,12 @@ class GatewayConfig: group_sessions_per_user = data.get("group_sessions_per_user") thread_sessions_per_user = data.get("thread_sessions_per_user") + multiplex_profiles = data.get("multiplex_profiles") nested_gateway = data.get("gateway") if isinstance(data.get("gateway"), dict) else {} + if multiplex_profiles is None and isinstance(nested_gateway, dict): + # Also honor gateway.multiplex_profiles written by + # ``hermes config set gateway.multiplex_profiles true``. + multiplex_profiles = nested_gateway.get("multiplex_profiles") if "max_concurrent_sessions" in data: max_concurrent_raw = data.get("max_concurrent_sessions") max_concurrent_key = "max_concurrent_sessions" @@ -732,6 +745,7 @@ class GatewayConfig: stt_enabled=_coerce_bool(stt_enabled, True), group_sessions_per_user=_coerce_bool(group_sessions_per_user, True), thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False), + multiplex_profiles=_coerce_bool(multiplex_profiles, False), max_concurrent_sessions=max_concurrent_sessions, unauthorized_dm_behavior=unauthorized_dm_behavior, streaming=StreamingConfig.from_dict(data.get("streaming", {})), @@ -823,6 +837,13 @@ def load_gateway_config() -> GatewayConfig: if "thread_sessions_per_user" in yaml_cfg: gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"] + # Multiplexing flag: accept both the top-level key and the nested + # gateway.multiplex_profiles form (from_dict resolves the nested + # fallback, but surface the top-level key here for parity with the + # other session-scope flags above). + if "multiplex_profiles" in yaml_cfg: + gw_data["multiplex_profiles"] = yaml_cfg["multiplex_profiles"] + gateway_section = yaml_cfg.get("gateway") if isinstance(gateway_section, dict) and "max_concurrent_sessions" in gateway_section: gw_data["max_concurrent_sessions"] = gateway_section["max_concurrent_sessions"] @@ -2143,5 +2164,24 @@ def _apply_env_overrides(config: GatewayConfig) -> None: except Exception as e: logger.debug("Plugin platform enable pass failed: %s", e) + # Relay (generic connector-fronted platform, EXPERIMENTAL). Enabled when a + # connector relay URL is configured via GATEWAY_RELAY_URL (env) or + # gateway.relay_url (config.yaml). The adapter is registered into the + # platform_registry at gateway startup (gateway.relay.register_relay_adapter) + # and dials OUT to the connector — so, like Telegram/Matrix, it has no public + # inbound port and just needs Platform.RELAY present+enabled in + # config.platforms for start_gateway()'s connect loop to bring it up. The + # connected-checker (Platform.RELAY in _PLATFORM_CONNECTED_CHECKERS) keys on + # extra["relay_url"], so mirror the URL into extra here. + relay_url_env = os.getenv("GATEWAY_RELAY_URL", "").strip() + relay_url_yaml = "" + existing_relay = config.platforms.get(Platform.RELAY) + if existing_relay is not None: + relay_url_yaml = str(existing_relay.extra.get("relay_url") or "").strip() + relay_url_val = relay_url_env or relay_url_yaml + if relay_url_val: + relay_config = _enable_from_env(Platform.RELAY) + relay_config.extra["relay_url"] = relay_url_val.rstrip("/") + for platform_config in config.platforms.values(): platform_config.extra.pop("_enabled_explicit", None) diff --git a/gateway/kanban_watchers.py b/gateway/kanban_watchers.py index 328cbd7fb5b..21753054f01 100644 --- a/gateway/kanban_watchers.py +++ b/gateway/kanban_watchers.py @@ -23,6 +23,58 @@ from typing import Any, Optional logger = logging.getLogger("gateway.run") +def _acquire_singleton_lock(lock_path) -> "tuple[Optional[object], str]": + """Take an exclusive, non-blocking advisory lock for the sole dispatcher. + + Only one gateway process machine-wide may run the embedded kanban + dispatcher: concurrent dispatchers double the reclaim frequency (each + runs its own ``release_stale_claims`` → promote → dispatch loop), double + claim-attempt events in the event log, and — with ``wal_autocheckpoint=0`` — + concurrent manual WAL checkpoints can corrupt index pages. The + ``dispatch_in_gateway`` config flag is the primary control; this lock is the + backstop that survives config drift and same-profile restart races. + + Delegates to :func:`gateway.status._try_acquire_file_lock` (``fcntl`` on + POSIX, ``msvcrt`` on Windows) so the guard is cross-platform. + + Returns ``(handle, "held")`` on success — the caller keeps the file handle + for the process lifetime and **must** release it via + :func:`_release_singleton_lock` when done. ``(None, "contended")`` when + another process holds the lock (caller must NOT dispatch). ``(None, + "unavailable")`` when locking cannot be performed (non-POSIX filesystem + without flock, or the status.py helpers are unimportable) — caller falls + back to config-only control. + """ + try: + from gateway.status import _try_acquire_file_lock # deferred; same package + except ImportError: + return None, "unavailable" + try: + Path(lock_path).parent.mkdir(parents=True, exist_ok=True) + handle = open(str(lock_path), "a+", encoding="utf-8") + except OSError: + return None, "unavailable" + if not _try_acquire_file_lock(handle): + handle.close() + return None, "contended" + return handle, "held" + + +def _release_singleton_lock(handle) -> None: + """Release a dispatcher singleton lock acquired via :func:`_acquire_singleton_lock`.""" + if handle is None: + return + try: + from gateway.status import _release_file_lock + _release_file_lock(handle) + except Exception: + pass + try: + handle.close() + except Exception: + pass + + class GatewayKanbanWatchersMixin: """Kanban watcher / notifier / dispatcher loops for GatewayRunner.""" @@ -606,6 +658,31 @@ class GatewayKanbanWatchersMixin: logger.warning("kanban dispatcher: kanban_db not importable; dispatcher disabled") return + # Single-dispatcher backstop. dispatch_in_gateway defaults to true, so a + # new profile gateway (or a same-profile restart race) can silently + # start a second dispatcher; concurrent dispatchers double reclaim + # frequency, double claim-attempt events, and — with + # wal_autocheckpoint=0 — concurrent manual WAL checkpoints can corrupt + # index pages. The lock lives at the machine-global kanban root + # (shared across profiles by design), so it serialises ALL gateways. + self._kanban_dispatcher_lock_handle = None + _lock_path = _kb.kanban_home() / "kanban" / ".dispatcher.lock" + _lock_handle, _lock_state = _acquire_singleton_lock(_lock_path) + if _lock_state == "contended": + logger.info( + "kanban dispatcher: another gateway already holds the dispatcher " + "lock (%s); this gateway will NOT dispatch.", _lock_path, + ) + return + if _lock_state == "held": + self._kanban_dispatcher_lock_handle = _lock_handle # hold for process lifetime + logger.info("kanban dispatcher: holding singleton dispatcher lock (%s)", _lock_path) + else: + logger.warning( + "kanban dispatcher: advisory lock unavailable at %s; proceeding " + "on config control alone.", _lock_path, + ) + try: interval = float(kanban_cfg.get("dispatch_interval_seconds", 60) or 60) except (ValueError, TypeError): @@ -1052,6 +1129,8 @@ class GatewayKanbanWatchersMixin: last_warn_at = now except asyncio.CancelledError: logger.debug("kanban dispatcher: cancelled") + _release_singleton_lock(self._kanban_dispatcher_lock_handle) + self._kanban_dispatcher_lock_handle = None raise except Exception: logger.exception("kanban dispatcher: unexpected watcher error") @@ -1062,3 +1141,6 @@ class GatewayKanbanWatchersMixin: while slept < interval and self._running: await asyncio.sleep(min(1.0, interval - slept)) slept += 1.0 + + _release_singleton_lock(self._kanban_dispatcher_lock_handle) + self._kanban_dispatcher_lock_handle = None diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index f7e1ba42f85..09d0dc227a2 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1043,7 +1043,13 @@ class APIServerAdapter(BasePlatformAdapter): — matching the semantics of the native gateway's ``session_key``. """ from run_agent import AIAgent - from gateway.run import _resolve_runtime_agent_kwargs, _resolve_gateway_model, _load_gateway_config, GatewayRunner + from gateway.run import ( + _current_max_iterations, + _resolve_runtime_agent_kwargs, + _resolve_gateway_model, + _load_gateway_config, + GatewayRunner, + ) from hermes_cli.tools_config import _get_platform_tools runtime_kwargs = _resolve_runtime_agent_kwargs() @@ -1053,7 +1059,7 @@ class APIServerAdapter(BasePlatformAdapter): user_config = _load_gateway_config() enabled_toolsets = sorted(_get_platform_tools(user_config, "api_server")) - max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90")) + max_iterations = _current_max_iterations() # Load fallback provider chain so the API server platform has the # same fallback behaviour as Telegram/Discord/Slack (fixes #4954). diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index 222adf4c2ea..d9f98282a8d 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -57,6 +57,11 @@ from gateway.platforms.base import ( logger = logging.getLogger(__name__) +# Sentinel returned by _resolve_request_profile when a /p// prefix +# names a profile this gateway does not serve (→ 404). Distinct from None +# (no prefix / multiplexing off → handle as the default profile). +_PROFILE_REJECTED = object() + _BUILTIN_DELIVER_PLATFORMS = { "telegram", "discord", "slack", "signal", "sms", "whatsapp", "matrix", "mattermost", "homeassistant", "email", "dingtalk", @@ -189,6 +194,14 @@ class WebhookAdapter(BasePlatformAdapter): app = web.Application() app.router.add_get("/health", self._handle_health) app.router.add_post("/webhooks/{route_name}", self._handle_webhook) + # Multi-profile multiplexing: a /p//webhooks/ prefix + # routes the inbound event to that profile. Same handler; the profile is + # captured from the path and stamped onto the SessionSource so the agent + # turn resolves that profile's config/skills/credentials. Only honored + # when gateway.multiplex_profiles is on (the handler validates). + app.router.add_post( + "/p/{profile}/webhooks/{route_name}", self._handle_webhook + ) # Port conflict detection — fail fast if port is already in use import socket as _socket @@ -397,6 +410,35 @@ class WebhookAdapter(BasePlatformAdapter): except Exception as e: logger.error("[webhook] Failed to reload dynamic routes: %s", e) + def _resolve_request_profile(self, request: "web.Request"): + """Resolve + validate the /p// URL prefix on a webhook request. + + Returns: + - ``None`` when no profile prefix is present, or multiplexing is off + (the prefix is ignored, request handled as the default profile). + - the profile name (str) when present, multiplexing is on, and the + profile is one this gateway serves. + - ``_PROFILE_REJECTED`` when a prefix is present but the profile is + unknown/unconfigured (handler returns 404). + """ + profile = (request.match_info.get("profile") or "").strip() + if not profile: + return None + runner = self.gateway_runner + cfg = getattr(runner, "config", None) + if not getattr(cfg, "multiplex_profiles", False): + # Prefix supplied but multiplexing is off — ignore it, behave as + # the single-profile gateway (don't 404 a would-be valid route). + return None + try: + from hermes_cli.profiles import profiles_to_serve + served = {name for name, _ in profiles_to_serve(multiplex=True)} + except Exception: + return _PROFILE_REJECTED + if profile not in served: + return _PROFILE_REJECTED + return profile + async def _handle_webhook(self, request: "web.Request") -> "web.Response": """POST /webhooks/{route_name} — receive and process a webhook event.""" # Hot-reload dynamic subscriptions on each request (mtime-gated, cheap) @@ -405,6 +447,13 @@ class WebhookAdapter(BasePlatformAdapter): route_name = request.match_info.get("route_name", "") route_config = self._routes.get(route_name) + # Multi-profile: resolve + validate the /p// prefix if present. + profile = self._resolve_request_profile(request) + if profile is _PROFILE_REJECTED: + return web.json_response( + {"error": "Unknown or unconfigured profile"}, status=404 + ) + if not route_config: return web.json_response( {"error": f"Unknown route: {route_name}"}, status=404 @@ -641,6 +690,8 @@ class WebhookAdapter(BasePlatformAdapter): user_id=f"webhook:{route_name}", user_name=route_name, ) + if profile and isinstance(profile, str): + source.profile = profile event = MessageEvent( text=prompt, message_type=MessageType.TEXT, diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index fc4e5f40ee7..a1a7826f8f8 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -57,6 +57,13 @@ class RelayAdapter(BasePlatformAdapter): self._transport = transport # Capability surface read by stream_consumer (getattr(..., 4096)). self.MAX_MESSAGE_LENGTH = descriptor.max_message_length + # chat_id -> guild_id (Discord) / workspace scope, learned from inbound + # events. The connector's egress guard resolves the owning tenant from + # the OUTBOUND action's metadata.guild_id; the gateway's generic delivery + # path (run.py _thread_metadata_for_source) only carries thread_id, so we + # re-attach the scope here from what we saw inbound. Keyed by chat_id + # (channel) since that's what send() receives. See routedEgressGuard.ts. + self._scope_by_chat: Dict[str, str] = {} self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain") # ── capability surface (from descriptor) ───────────────────────────── @@ -108,8 +115,35 @@ class RelayAdapter(BasePlatformAdapter): async def _on_inbound(self, event) -> None: """Bridge a connector-delivered MessageEvent into the normal adapter path.""" + self._capture_scope(event) await self.handle_message(event) + def _capture_scope(self, event) -> None: + """Remember chat_id -> guild scope from an inbound event so our outbound + (the agent's reply) can re-assert it for the connector's egress tenant + resolution. Never raises — scope tracking must not break inbound.""" + try: + src = getattr(event, "source", None) + scope = getattr(src, "guild_id", None) if src else None + chat = getattr(src, "chat_id", None) if src else None + if scope and chat: + self._scope_by_chat[str(chat)] = str(scope) + except Exception: # noqa: BLE001 - scope tracking must never break inbound + pass + + def _with_scope(self, chat_id: str, metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Ensure the outbound metadata carries guild_id for the connector's + egress tenant resolution. The connector resolves the owning tenant from + metadata.guild_id (Discord); without it egress is declined as + 'target not routed to an onboarded tenant'. No-op when we have no scope + for this chat (e.g. DMs) or it's already present.""" + meta: Dict[str, Any] = dict(metadata or {}) + if not meta.get("guild_id"): + scope = self._scope_by_chat.get(str(chat_id)) + if scope: + meta["guild_id"] = scope + return meta + async def on_interrupt(self, session_key: str, chat_id: str) -> None: """Bridge a connector-delivered /stop into the adapter's interrupt path. @@ -140,7 +174,7 @@ class RelayAdapter(BasePlatformAdapter): "chat_id": chat_id, "content": content, "reply_to": reply_to, - "metadata": metadata or {}, + "metadata": self._with_scope(chat_id, metadata), } ) return SendResult( diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index b2e8eda09cd..b091d44faa8 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -54,6 +54,35 @@ _HANDSHAKE_TIMEOUT_S = 30.0 _OUTBOUND_TIMEOUT_S = 30.0 +def _ws_dial_url(url: str) -> str: + """Normalize a connector URL to the ``ws(s)://…/relay`` dial target. + + The relay URL is configured once (``GATEWAY_RELAY_URL`` / ``gateway.relay_url``) + as the connector's BASE URL (e.g. ``https://connector.example``) and shared by + both the provision POST (which needs ``http(s)://…/relay/provision`` — see + ``_provision_url``) and the WS dial (which needs ``ws(s)://…/relay``, the path + the connector mounts its ``WebSocketServer`` on). Two normalizations, both + load-bearing: + + - scheme: ``https -> wss``, ``http -> ws`` (``websockets.connect`` raises + "scheme isn't ws or wss" on an http(s) URL). + - path: ensure it ends in ``/relay`` (the connector returns HTTP 400 on an + upgrade to any other path, since the WS server is mounted at ``/relay``). + + Idempotent: an already-``ws(s)://…/relay`` URL is returned unchanged, so a URL + configured WITH the scheme and/or ``/relay`` still works. + """ + raw = (url or "").strip() + if raw.startswith("https://"): + raw = "wss://" + raw[len("https://"):] + elif raw.startswith("http://"): + raw = "ws://" + raw[len("http://"):] + raw = raw.rstrip("/") + if not raw.endswith("/relay"): + raw = f"{raw}/relay" + return raw + + def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent: """Rebuild a MessageEvent from the connector's normalized inbound payload. @@ -118,7 +147,7 @@ class WebSocketRelayTransport: "WebSocketRelayTransport requires the 'websockets' package " "(install the messaging extra)." ) - self._url = url + self._url = _ws_dial_url(url) self._platform = platform self._bot_id = bot_id self._connect_timeout_s = connect_timeout_s diff --git a/gateway/run.py b/gateway/run.py index b478576546b..0a594609b7f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -195,6 +195,19 @@ def _gateway_platform_value(platform: Any) -> str: return str(getattr(platform, "value", platform) or "").strip().lower() +def _non_conversational_metadata( + metadata: Optional[Dict[str, Any]] = None, + *, + platform: Any = None, +) -> Optional[Dict[str, Any]]: + """Mark Discord lifecycle/status sends without changing other platforms.""" + if _gateway_platform_value(platform) != "discord": + return metadata + merged = dict(metadata or {}) + merged["non_conversational"] = True + return merged + + def _is_transient_network_error(exc: BaseException) -> bool: """Return True for transient network errors safe to log + swallow. @@ -1173,13 +1186,31 @@ def _reload_runtime_env_preserving_config_authority() -> None: pick up rotated API keys. config.yaml remains authoritative for agent budget settings such as agent.max_turns; otherwise a stale HERMES_MAX_ITERATIONS in .env can replace the startup bridge on later turns. + + In multiplex mode this is a NO-OP for the credential reload: secrets come + from the per-turn ``set_secret_scope`` (installed by ``_profile_runtime_scope``) + which loads the routed profile's ``.env`` into an isolated mapping. Mutating + the process-global ``os.environ`` here would defeat that isolation and leak + the default profile's keys to every profile's turns and subprocesses. """ + from agent.secret_scope import is_multiplex_active + if is_multiplex_active(): + # Credentials are resolved from the active profile's secret scope, not + # os.environ. Still honor config.yaml's agent.max_turns bridge below + # using the scoped home, but never reload .env into global env. + _bridge_max_turns_from_config(_hermes_home) + return + load_hermes_dotenv( hermes_home=_hermes_home, project_env=Path(__file__).resolve().parents[1] / '.env', ) + _bridge_max_turns_from_config(_hermes_home) - config_path = _hermes_home / 'config.yaml' + +def _bridge_max_turns_from_config(home: "Path") -> None: + """Bridge config.yaml agent.max_turns into HERMES_MAX_ITERATIONS (a global).""" + config_path = home / 'config.yaml' if not config_path.exists(): return try: @@ -1196,6 +1227,80 @@ def _reload_runtime_env_preserving_config_authority() -> None: os.environ["HERMES_MAX_ITERATIONS"] = str(agent_cfg["max_turns"]) +def _current_max_iterations() -> int: + """Return the current per-turn iteration budget after runtime env refresh.""" + _reload_runtime_env_preserving_config_authority() + try: + return int(os.getenv("HERMES_MAX_ITERATIONS", "90")) + except (TypeError, ValueError): + return 90 + + +from contextlib import contextmanager as _contextmanager + + +# Platforms that bind a host TCP port (HTTP/webhook listeners). In a profile +# multiplexer the default profile owns the single shared listener and serves +# every profile through the /p// URL prefix, so a SECONDARY profile +# enabling one of these is always a misconfiguration: it would try to bind a +# port already held by the default's listener. We hard-error on it rather than +# silently dropping the adapter (see _start_one_profile_adapters). +# Stored as platform .value strings since the Platform enum is imported below. +_PORT_BINDING_PLATFORM_VALUES = frozenset({ + "webhook", + "api_server", + "msgraph_webhook", + "feishu", + "wecom_callback", + "bluebubbles", + "sms", +}) + + +class MultiplexConfigError(RuntimeError): + """A profile multiplexer config is invalid (fail-fast at startup). + + Distinct from a transient adapter-connect failure: a transient error is + logged and the gateway stays alive to retry, but a config error means the + operator must fix config.yaml, so it aborts startup cleanly. + """ + + +@_contextmanager +def _profile_runtime_scope(profile_home: "Path"): + """Scope config/skills/memory AND credentials to a profile for one turn. + + Combines the two seams the multiplexer needs: + 1. ``set_hermes_home_override`` — redirects ``get_hermes_home()`` (config, + skills, memory, SOUL, sessions) to the profile's home. Contextvar, so + it propagates into the agent worker thread via ``copy_context()``. + 2. ``set_secret_scope`` — installs the profile's ``.env`` secrets as the + authoritative credential source, so ``get_secret`` reads this profile's + keys and never the process-global ``os.environ`` (which in a + multiplexer may hold another profile's values). + + Only used on the multiplexed inbound path. Single-profile gateways never + enter this scope, so their behavior is unchanged. Loading the profile's + ``.env`` here does NOT mutate ``os.environ`` — ``build_profile_secret_scope`` + returns an isolated dict — which is what keeps subprocesses (MCP, kanban) + from inheriting cross-profile secrets. + """ + from hermes_constants import set_hermes_home_override, reset_hermes_home_override + from agent.secret_scope import ( + build_profile_secret_scope, + set_secret_scope, + reset_secret_scope, + ) + + home_token = set_hermes_home_override(str(profile_home)) + secret_token = set_secret_scope(build_profile_secret_scope(Path(profile_home))) + try: + yield + finally: + reset_secret_scope(secret_token) + reset_hermes_home_override(home_token) + + _DOCKER_VOLUME_SPEC_RE = re.compile(r"^(?P.+):(?P/[^:]+?)(?::(?P[^:]+))?$") _DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS = {"/output", "/outputs"} @@ -2240,7 +2345,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew def __init__(self, config: Optional[GatewayConfig] = None): global _gateway_runner_ref self.config = config or load_gateway_config() + # Mark the process as a profile multiplexer when configured. This flips + # agent.secret_scope.get_secret() to fail-closed on any unscoped + # credential read, so a missed migration crashes loudly instead of + # leaking a cross-profile value (Workstream A). Inert when off. + try: + from agent.secret_scope import set_multiplex_active + set_multiplex_active(bool(getattr(self.config, "multiplex_profiles", False))) + except Exception: + logger.debug("could not set multiplex-active flag", exc_info=True) self.adapters: Dict[Platform, BasePlatformAdapter] = {} + # Multi-profile multiplexing: adapters for NON-default profiles live + # here, keyed by profile name then Platform. self.adapters stays the + # default/active profile's map so the ~93 existing self.adapters[...] + # sites are untouched when multiplexing is off (this dict is empty). + # Populated by _start_secondary_profile_adapters(). + self._profile_adapters: Dict[str, Dict[Platform, BasePlatformAdapter]] = {} self._warn_if_docker_media_delivery_is_risky() _gateway_runner_ref = _weakref.ref(self) @@ -2792,10 +2912,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: pass config = getattr(self, "config", None) + # Mirror SessionStore._resolve_profile_for_key so this fallback path + # produces the same namespace as the primary path: None (legacy + # agent:main) unless multiplexing is on, then the active profile. + _profile = None + if getattr(config, "multiplex_profiles", False): + if source.profile: + _profile = source.profile + else: + try: + from hermes_cli.profiles import get_active_profile_name + _profile = get_active_profile_name() or "default" + except Exception: + _profile = None return build_session_key( source, group_sessions_per_user=getattr(config, "group_sessions_per_user", True), thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False), + profile=_profile, ) def _telegram_topic_mode_enabled(self, source: SessionSource) -> bool: @@ -5335,7 +5469,30 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew "attempts": 1, "next_retry": time.monotonic() + 30, } - + + # Multi-profile multiplexing: bring up adapters for every OTHER profile + # this gateway serves. Each profile's adapters connect under that + # profile's home + credential scope and stamp their inbound events with + # the profile so the agent turn resolves correctly. No-op when off. + try: + _secondary_connected = await self._start_secondary_profile_adapters() + connected_count += _secondary_connected + except MultiplexConfigError as e: + # Invalid multiplexer config — abort startup cleanly so the operator + # fixes config.yaml rather than running a half-wired gateway. + reason = str(e) + logger.error("Gateway multiplexer config error: %s", reason) + try: + from gateway.status import write_runtime_status + write_runtime_status(gateway_state="startup_failed", exit_reason=reason) + except Exception: + pass + self._request_clean_exit(reason) + self._startup_restore_in_progress = False + return True + except Exception as e: + logger.error("Secondary-profile adapter startup failed: %s", e, exc_info=True) + if connected_count == 0: if startup_nonretryable_errors: reason = "; ".join(startup_nonretryable_errors) @@ -6342,6 +6499,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew time.monotonic() - _adapter_started_at, e, ) + + # Disconnect secondary-profile adapters (multiplex mode). + for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()): + for platform, adapter in list(_amap.items()): + try: + await adapter.cancel_background_tasks() + except Exception as e: + logger.debug("✗ %s bg-cancel error (profile %s): %s", platform.value, _prof, e) + try: + await adapter.disconnect() + logger.info("✓ %s disconnected (profile: %s)", platform.value, _prof) + except Exception as e: + logger.error("✗ %s disconnect error (profile %s): %s", platform.value, _prof, e) + _amap.clear() + if hasattr(self, "_profile_adapters"): + self._profile_adapters.clear() logger.info( "Shutdown phase: all adapters disconnected at +%.2fs", _phase_elapsed(), @@ -6511,6 +6684,175 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew """Wait for shutdown signal.""" await self._shutdown_event.wait() + async def _start_secondary_profile_adapters(self) -> int: + """Bring up adapters for every non-active profile this gateway serves. + + Returns the number of secondary adapters that connected. No-op (returns + 0) unless ``gateway.multiplex_profiles`` is on. + + Each profile's adapters are created and connected under that profile's + HERMES_HOME + secret scope (``_profile_runtime_scope``), stored in + ``self._profile_adapters[profile]``, and given a message handler that + stamps ``source.profile`` before delegating to the shared + ``_handle_message`` — so the agent turn resolves that profile's config, + skills, and credentials. Same-platform credential collisions (two + profiles polling the same bot token) are detected and refused here, the + only point that sees every profile's resolved credentials together. + """ + if not getattr(self.config, "multiplex_profiles", False): + return 0 + + try: + from hermes_cli.profiles import profiles_to_serve, get_active_profile_name + except Exception: + return 0 + + active = get_active_profile_name() or "default" + connected = 0 + # (platform, token-fingerprint) -> profile that claimed it. Detects two + # profiles trying to poll the same bot credential (impossible to do + # concurrently). Seed with the active profile's adapters. + claimed: Dict[tuple, str] = {} + for _plat, _ad in self.adapters.items(): + fp = self._adapter_credential_fingerprint(_ad) + if fp is not None: + claimed[(_plat, fp)] = active + + for profile_name, profile_home in profiles_to_serve(multiplex=True): + if profile_name == active: + continue # handled by the primary startup loop + try: + connected += await self._start_one_profile_adapters( + profile_name, profile_home, claimed + ) + except MultiplexConfigError: + # Config error (e.g. a secondary profile binding a port) is not + # transient — propagate so startup aborts cleanly instead of + # limping along with a half-configured multiplexer. + raise + except Exception as e: + logger.error( + "Failed to start adapters for profile '%s': %s", + profile_name, e, exc_info=True, + ) + + # Record served profiles in runtime status for `hermes status`. + try: + from gateway.status import write_runtime_status + served = [active] + sorted(self._profile_adapters.keys()) + write_runtime_status(served_profiles=served) + except Exception: + logger.debug("could not record served_profiles", exc_info=True) + + return connected + + async def _start_one_profile_adapters( + self, profile_name: str, profile_home: "Path", claimed: Dict[tuple, str] + ) -> int: + """Create+connect one profile's adapters under its runtime scope.""" + from gateway.config import load_gateway_config + + with _profile_runtime_scope(profile_home): + profile_cfg = load_gateway_config() + + profile_map = self._profile_adapters.setdefault(profile_name, {}) + connected = 0 + for platform, platform_config in profile_cfg.platforms.items(): + if not platform_config.enabled: + continue + # A secondary profile must NOT enable a port-binding platform: the + # default profile's listener already serves every profile via the + # /p// prefix, so a second bind can only collide. This is a + # config error, not a transient failure — fail fast and loud. + if platform.value in _PORT_BINDING_PLATFORM_VALUES: + raise MultiplexConfigError( + f"Profile '{profile_name}' enables the port-binding platform " + f"'{platform.value}', but gateway.multiplex_profiles is on. The " + f"default profile owns the single shared HTTP listener and " + f"serves every profile through the /p/{profile_name}/ URL " + f"prefix — a secondary profile cannot bind its own port. " + f"Remove platforms.{platform.value} from profile " + f"'{profile_name}'s config.yaml (configure it only on the " + f"default profile)." + ) + with _profile_runtime_scope(profile_home): + adapter = self._create_adapter(platform, platform_config) + if not adapter: + continue + + # Same-token conflict detection — refuse a duplicate poll. + fp = self._adapter_credential_fingerprint(adapter) + if fp is not None: + owner = claimed.get((platform, fp)) + if owner is not None: + logger.error( + "Profile '%s' and '%s' both configure %s with the same " + "credential — refusing to start the duplicate (a single " + "bot token cannot be polled twice). Give each profile its " + "own %s credential.", + owner, profile_name, platform.value, platform.value, + ) + await self._safe_adapter_disconnect(adapter, platform) + continue + claimed[(platform, fp)] = profile_name + + # Stamp every inbound event from this adapter with its profile so + # the agent turn (and session key) resolve to the right home. + adapter.set_message_handler( + self._make_profile_message_handler(profile_name) + ) + adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) + adapter.set_session_store(self.session_store) + adapter.set_busy_session_handler(self._handle_active_session_busy_message) + adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id) + adapter._busy_text_mode = self._busy_text_mode + + try: + with _profile_runtime_scope(profile_home): + success = await self._connect_adapter_with_timeout(adapter, platform) + if success: + profile_map[platform] = adapter + connected += 1 + logger.info("✓ %s connected (profile: %s)", platform.value, profile_name) + else: + logger.warning("✗ %s failed to connect (profile: %s)", platform.value, profile_name) + await self._safe_adapter_disconnect(adapter, platform) + except Exception as e: + logger.error("✗ %s error (profile: %s): %s", platform.value, profile_name, e) + await self._safe_adapter_disconnect(adapter, platform) + return connected + + def _make_profile_message_handler(self, profile_name: str): + """Return a message handler that stamps source.profile then delegates.""" + async def _handler(event): + try: + if getattr(event, "source", None) is not None and not event.source.profile: + event.source.profile = profile_name + except Exception: + pass + return await self._handle_message(event) + return _handler + + @staticmethod + def _adapter_credential_fingerprint(adapter: Any) -> Optional[str]: + """Return a stable, log-safe fingerprint of an adapter's credential. + + Used only to detect two profiles claiming the same bot token. Returns a + salted hash (never the token itself) of the adapter's primary + credential, or None when no credential is discoverable (in which case + we don't attempt conflict detection for it). + """ + token = None + for attr in ("token", "bot_token", "_token", "api_token", "_bot_token"): + val = getattr(adapter, attr, None) + if isinstance(val, str) and val.strip(): + token = val.strip() + break + if not token: + return None + import hashlib + return hashlib.sha256(("hermes-mux:" + token).encode("utf-8")).hexdigest()[:16] + def _create_adapter( self, platform: Platform, @@ -10633,7 +10975,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew disabled_toolsets = agent_cfg.get("disabled_toolsets") or None pr = self._provider_routing - max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90")) + max_iterations = _current_max_iterations() reasoning_config = self._resolve_session_reasoning_config(source=source) self._reasoning_config = reasoning_config self._service_tier = self._load_service_tier() @@ -11737,7 +12079,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew chunks = [clean[i:i + max_chunk] for i in range(0, len(clean), max_chunk)] for chunk in chunks: try: - await adapter.send(chat_id, f"```\n{chunk}\n```", metadata=metadata) + await adapter.send( + chat_id, + f"```\n{chunk}\n```", + metadata=_non_conversational_metadata(metadata, platform=platform), + ) except Exception as e: logger.debug("Update stream send failed: %s", e) @@ -11760,12 +12106,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew exit_code_raw = exit_code_path.read_text().strip() or "1" exit_code = int(exit_code_raw) if exit_code == 0: - await adapter.send(chat_id, "✅ Hermes update finished.", metadata=metadata) + await adapter.send( + chat_id, + "✅ Hermes update finished.", + metadata=_non_conversational_metadata(metadata, platform=platform), + ) else: await adapter.send( chat_id, "❌ Hermes update failed (exit code {}).".format(exit_code), - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) logger.info("Update finished (exit=%s), notified %s", exit_code, session_key) except Exception as e: @@ -11816,7 +12166,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew prompt=prompt_text, default=default, session_key=session_key, - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) sent_buttons = True except Exception as btn_err: @@ -11830,7 +12180,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew f"{prompt_text}{default_hint}\n\n" f"Reply `{_p}approve` (yes) or `{_p}deny` (no), " f"or type your answer directly.", - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) # Keep the prompt marker on disk until the user # answers. If the gateway restarts mid-prompt, the @@ -11854,7 +12204,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew await adapter.send( chat_id, "❌ Hermes update timed out after 30 minutes.", - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) except Exception: pass @@ -11960,7 +12310,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew msg = "✅ Hermes update finished successfully." else: msg = "❌ Hermes update failed. Check the gateway logs or run `hermes update` manually for details." - await adapter.send(chat_id, msg, metadata=metadata) + await adapter.send( + chat_id, + msg, + metadata=_non_conversational_metadata(metadata, platform=platform), + ) logger.info( "Sent post-update notification to %s:%s (exit=%s)", platform_str, @@ -12023,7 +12377,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew result = await adapter.send( str(chat_id), "♻ Gateway restarted successfully. Your session continues.", - metadata=metadata, + metadata=_non_conversational_metadata(metadata, platform=platform), ) # adapter.send() catches provider errors (e.g. "Chat not found") # and returns SendResult(success=False) rather than raising, so @@ -12090,9 +12444,21 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew adapter=adapter, ) if metadata: - result = await adapter.send(str(home.chat_id), message, metadata=metadata) + result = await adapter.send( + str(home.chat_id), + message, + metadata=_non_conversational_metadata(metadata, platform=platform), + ) else: - result = await adapter.send(str(home.chat_id), message) + _startup_meta = _non_conversational_metadata(platform=platform) + if _startup_meta: + result = await adapter.send( + str(home.chat_id), + message, + metadata=_startup_meta, + ) + else: + result = await adapter.send(str(home.chat_id), message) if result is not None and getattr(result, "success", True) is False: logger.warning( "Home-channel startup notification failed for %s:%s: %s", @@ -12733,7 +13099,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if adapter and chat_id: try: send_meta = {"thread_id": thread_id} if thread_id else None - await adapter.send(chat_id, message_text, metadata=send_meta) + await adapter.send( + chat_id, + message_text, + metadata=_non_conversational_metadata(send_meta, platform=platform_name), + ) except Exception as e: logger.error("Watcher delivery error: %s", e) break @@ -12754,7 +13124,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if adapter and chat_id: try: send_meta = {"thread_id": thread_id} if thread_id else None - await adapter.send(chat_id, message_text, metadata=send_meta) + await adapter.send( + chat_id, + message_text, + metadata=_non_conversational_metadata(send_meta, platform=platform_name), + ) except Exception as e: logger.error("Watcher delivery error: %s", e) @@ -13740,6 +14114,64 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew channel_prompt: Optional[str] = None, persist_user_message: Optional[str] = None, persist_user_timestamp: Optional[float] = None, + ) -> Dict[str, Any]: + """Profile-scoping wrapper around the agent run. + + When multiplexing is active, resolve the inbound source's profile and + run the whole turn inside ``_profile_runtime_scope`` so config/skills/ + memory resolve to that profile's home AND credentials resolve from that + profile's secret scope (never the process-global ``os.environ``). When + multiplexing is off this is a transparent pass-through — zero behavior + change for single-profile gateways. + """ + if not getattr(getattr(self, "config", None), "multiplex_profiles", False): + return await self._run_agent_inner( + message, context_prompt, history, source, session_id, + session_key=session_key, run_generation=run_generation, + _interrupt_depth=_interrupt_depth, event_message_id=event_message_id, + channel_prompt=channel_prompt, persist_user_message=persist_user_message, + persist_user_timestamp=persist_user_timestamp, + ) + + profile_home = self._resolve_profile_home_for_source(source) + with _profile_runtime_scope(profile_home): + return await self._run_agent_inner( + message, context_prompt, history, source, session_id, + session_key=session_key, run_generation=run_generation, + _interrupt_depth=_interrupt_depth, event_message_id=event_message_id, + channel_prompt=channel_prompt, persist_user_message=persist_user_message, + persist_user_timestamp=persist_user_timestamp, + ) + + def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path": + """Resolve which profile's HERMES_HOME should serve this inbound source. + + Prefers the profile the source was routed to (``source.profile`` — set + by the /p// URL prefix or a per-credential adapter), falling + back to the active profile (the multiplexer's own home). + """ + from hermes_cli.profiles import get_active_profile_name, get_profile_dir + try: + name = (source.profile or "").strip() or get_active_profile_name() or "default" + return get_profile_dir(name) + except Exception: + from hermes_constants import get_hermes_home + return get_hermes_home() + + async def _run_agent_inner( + self, + message: str, + context_prompt: str, + history: List[Dict[str, Any]], + source: SessionSource, + session_id: str, + session_key: str = None, + run_generation: Optional[int] = None, + _interrupt_depth: int = 0, + event_message_id: Optional[str] = None, + channel_prompt: Optional[str] = None, + persist_user_message: Optional[str] = None, + persist_user_timestamp: Optional[float] = None, ) -> Dict[str, Any]: """ Run the agent with the given message and context. @@ -14135,6 +14567,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if _progress_thread_id == source.thread_id else {"thread_id": _progress_thread_id} ) if _progress_thread_id else None + _progress_metadata = _non_conversational_metadata(_progress_metadata, platform=source.platform) _progress_reply_to = ( event_message_id if source.platform in (Platform.FEISHU, Platform.MATTERMOST) and source.thread_id and event_message_id @@ -14581,9 +15014,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # session_key is now set via contextvars in _set_session_env() # (concurrency-safe). Keep os.environ as fallback for CLI/cron. os.environ["HERMES_SESSION_KEY"] = session_key or "" - - # Read from env var or use default (same as CLI) - max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90")) # Map platform enum to the platform hint key the agent understands. # Platform.LOCAL ("local") maps to "cli"; others pass through as-is. @@ -14598,10 +15028,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if self._ephemeral_system_prompt: combined_ephemeral = (combined_ephemeral + "\n\n" + self._ephemeral_system_prompt).strip() - # Re-read .env and config for fresh credentials (gateway is long-lived, - # keys may change without restart). Keep config.yaml authoritative for - # runtime budget settings bridged into env vars. - _reload_runtime_env_preserving_config_authority() + max_iterations = _current_max_iterations() try: model, runtime_kwargs = self._resolve_session_agent_runtime( @@ -14799,6 +15226,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except KeyError: pass self._init_cached_agent_for_turn(agent, _interrupt_depth) + # Refresh agent max_iterations from current config + # (cached agent may have been created with old config) + agent.max_iterations = max_iterations logger.debug("Reusing cached agent for session %s", session_key) if agent is None: @@ -14900,7 +15330,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _status_adapter.send( _status_chat_id, message, - metadata=_status_thread_metadata, + metadata=_non_conversational_metadata(_status_thread_metadata, platform=source.platform), ), _loop_for_step, logger=logger, @@ -15742,7 +16172,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _notify_res = await _notify_adapter.send( source.chat_id, _heartbeat_text, - metadata=_status_thread_metadata, + metadata=_non_conversational_metadata(_status_thread_metadata, platform=source.platform), ) if getattr(_notify_res, "success", False) and getattr( _notify_res, "message_id", None diff --git a/gateway/session.py b/gateway/session.py index f48b83fed0c..d07c65ec29f 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -92,6 +92,11 @@ class SessionSource: parent_chat_id: Optional[str] = None # Parent channel when chat_id refers to a thread message_id: Optional[str] = None # ID of the triggering message (for pin/reply/react) role_authorized: bool = False # True when adapter granted access via role (not user ID) + # Profile this inbound message is routed to in a multiplexing gateway + # (from the /p// URL prefix or per-credential adapter ownership). + # None => the gateway's active/default profile. Drives both session-key + # namespacing and the per-turn config/credential scope. + profile: Optional[str] = None @property def description(self) -> str: @@ -135,6 +140,8 @@ class SessionSource: d["parent_chat_id"] = self.parent_chat_id if self.message_id: d["message_id"] = self.message_id + if self.profile: + d["profile"] = self.profile return d @classmethod @@ -153,6 +160,7 @@ class SessionSource: guild_id=data.get("guild_id"), parent_chat_id=data.get("parent_chat_id"), message_id=data.get("message_id"), + profile=data.get("profile"), ) @@ -615,15 +623,41 @@ def is_shared_multi_user_session( return not group_sessions_per_user +def _session_key_namespace(profile: Optional[str]) -> str: + """Return the ``agent:`` namespace prefix for a session key. + + The historical key format is ``agent:main:::...`` where + ``main`` is a static namespace literal (NOT a branch name — branching keys + off ``session_id``, not this slot). Multi-profile multiplexing reuses this + slot to carry the profile: + + - default profile (or ``None``/``""``/``"default"``) → ``agent:main`` — + BYTE-IDENTICAL to every key ever generated, so existing sessions and all + positional parsers (``parts[2]`` == platform, etc.) are unaffected. + - named profile ``coder`` → ``agent:coder`` — keeps the same positional + layout, just a different namespace, so two profiles serving the same + platform/chat never collide. + """ + if not profile or profile == "default": + return "agent:main" + return f"agent:{profile}" + + def build_session_key( source: SessionSource, group_sessions_per_user: bool = True, thread_sessions_per_user: bool = False, + profile: Optional[str] = None, ) -> str: """Build a deterministic session key from a message source. This is the single source of truth for session key construction. + ``profile`` selects the key namespace (see :func:`_session_key_namespace`). + It defaults to ``None`` ⇒ the legacy ``agent:main`` namespace, so callers + that don't multiplex produce byte-identical keys to before. Only the + multiplexing gateway passes a non-default profile. + DM rules: - DMs include chat_id when present, so each private conversation is isolated. - thread_id further differentiates threaded DMs within the same DM chat. @@ -643,6 +677,7 @@ def build_session_key( shared session per chat. - Without identifiers, messages fall back to one session per platform/chat_type. """ + ns = _session_key_namespace(profile) platform = source.platform.value if source.chat_type == "dm": dm_chat_id = source.chat_id @@ -651,12 +686,12 @@ def build_session_key( if dm_chat_id: if source.thread_id: - return f"agent:main:{platform}:dm:{dm_chat_id}:{source.thread_id}" - return f"agent:main:{platform}:dm:{dm_chat_id}" + return f"{ns}:{platform}:dm:{dm_chat_id}:{source.thread_id}" + return f"{ns}:{platform}:dm:{dm_chat_id}" # No chat_id — fall back to the sender's own identifier before the # bare per-platform sink. Without this, every DM from every user that # arrives without a chat_id (non-standard adapters / synthetic sources) - # collapses into one shared "agent:main::dm" session, and a + # collapses into one shared "::dm" session, and a # single cached agent ends up serving multiple people's conversations — # cross-user history bleed. participant_id keeps DMs isolated per user. dm_participant_id = source.user_id_alt or source.user_id @@ -667,11 +702,11 @@ def build_session_key( ) if dm_participant_id: if source.thread_id: - return f"agent:main:{platform}:dm:{dm_participant_id}:{source.thread_id}" - return f"agent:main:{platform}:dm:{dm_participant_id}" + return f"{ns}:{platform}:dm:{dm_participant_id}:{source.thread_id}" + return f"{ns}:{platform}:dm:{dm_participant_id}" if source.thread_id: - return f"agent:main:{platform}:dm:{source.thread_id}" - return f"agent:main:{platform}:dm" + return f"{ns}:{platform}:dm:{source.thread_id}" + return f"{ns}:{platform}:dm" participant_id = source.user_id_alt or source.user_id if participant_id and source.platform == Platform.WHATSAPP: @@ -679,7 +714,7 @@ def build_session_key( # single group member gets two isolated per-user sessions when the # bridge reshuffles alias forms. participant_id = canonical_whatsapp_identifier(str(participant_id)) or participant_id - key_parts = ["agent:main", platform, source.chat_type] + key_parts = [ns, platform, source.chat_type] if source.chat_id: key_parts.append(source.chat_id) @@ -775,12 +810,32 @@ class SessionStore: logger.debug("Could not remove temp file %s: %s", tmp_path, e) raise + def _resolve_profile_for_key(self, source: Optional[SessionSource] = None) -> Optional[str]: + """Return the profile namespace for session keys, or None when off. + + When ``multiplex_profiles`` is disabled (default), returns ``None`` so + keys stay in the legacy ``agent:main`` namespace — byte-identical to + before. When enabled, prefers the profile the inbound source was routed + to (``source.profile`` — set by the /p// URL prefix or + per-credential adapter), falling back to the active profile name. + """ + if not getattr(self.config, "multiplex_profiles", False): + return None + if source is not None and source.profile: + return source.profile + try: + from hermes_cli.profiles import get_active_profile_name + return get_active_profile_name() or "default" + except Exception: + return None + def _generate_session_key(self, source: SessionSource) -> str: """Generate a session key from a source.""" return build_session_key( source, group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True), thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False), + profile=self._resolve_profile_for_key(source), ) def _is_session_expired(self, entry: SessionEntry) -> bool: diff --git a/gateway/slash_commands.py b/gateway/slash_commands.py index 04c3f4ca89f..4b25d96fdbf 100644 --- a/gateway/slash_commands.py +++ b/gateway/slash_commands.py @@ -1030,12 +1030,13 @@ class GatewaySlashCommandsMixin: ) async def _handle_model_command(self, event: MessageEvent) -> Optional[str]: - """Handle /model command — switch model for this session. + """Handle /model command — switch model. Supports: /model — interactive picker (Telegram/Discord) or text list - /model — switch for this session only - /model --global — switch and persist to config.yaml + /model — switch model (persists by default) + /model --session — switch for this session only + /model --global — switch and persist (explicit) /model --provider — switch provider + model /model --provider — switch to provider, auto-detect model """ @@ -1043,6 +1044,7 @@ class GatewaySlashCommandsMixin: import yaml from hermes_cli.model_switch import ( switch_model as _switch_model, parse_model_flags, + resolve_persist_behavior, list_authenticated_providers, list_picker_providers, ) @@ -1050,8 +1052,15 @@ class GatewaySlashCommandsMixin: raw_args = event.get_command_args().strip() - # Parse --provider, --global, and --refresh flags - model_input, explicit_provider, persist_global, force_refresh = parse_model_flags(raw_args) + # Parse --provider, --global, --session, and --refresh flags + ( + model_input, + explicit_provider, + is_global_flag, + force_refresh, + is_session, + ) = parse_model_flags(raw_args) + persist_global = resolve_persist_behavior(is_global_flag, is_session) # --refresh: bust the disk cache so the picker shows live data. if force_refresh: @@ -1362,7 +1371,7 @@ class GatewaySlashCommandsMixin: # override rather than relying on cache signature mismatch detection. self._evict_cached_agent(session_key) - # Persist to config if --global + # Persist to config (default) unless --session opted out if persist_global: try: if config_path.exists(): diff --git a/gateway/status.py b/gateway/status.py index 367ac33c4d7..b4bee42fdad 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -14,6 +14,7 @@ concurrently under distinct configurations). import hashlib import json import os +import shlex import signal import subprocess import sys @@ -164,20 +165,86 @@ def _read_process_cmdline(pid: int) -> Optional[str]: return None +def looks_like_gateway_command_line(command: str | None) -> bool: + """Return True only for a real ``gateway run`` process command line. + + Lifecycle decisions (is the gateway up? did restart relaunch it?) must not + fire on loose substring matches. The previous ``"... gateway" in cmdline`` + test also matched ``hermes_cli.main gateway status`` and even unrelated + processes like ``python -m tui_gateway`` -- which made ``restart()`` race + against a still-draining old process and ``status``/``start`` report false + positives. This requires the actual ``gateway`` subcommand followed by + ``run`` (or one of the gateway-dedicated entrypoints), excluding the other + ``gateway`` management subcommands and any process that merely contains the + word "gateway". + + Tokenizes quote-aware (``shlex``) so quoted Windows paths with spaces + (``"C:\\Program Files\\...\\hermes-gateway.exe"``) survive, and strips + ``--profile``/``-p`` selectors from anywhere in argv -- Hermes's + ``_apply_profile_override`` removes them before argparse, so the profile + flag (and a profile literally named ``gateway``) can legally appear on + either side of the ``gateway`` subcommand. + """ + if not command: + return False + + try: + raw_tokens = shlex.split(command, posix=False) + except ValueError: + raw_tokens = command.split() + # Strip surrounding quotes, normalize slashes + case per token. + tokens = [t.strip("\"'").replace("\\", "/").lower() for t in raw_tokens] + if not tokens: + return False + + # Gateway-dedicated entrypoints carry no subcommand to inspect. + for token in tokens: + if token == "gateway/run.py" or token.endswith("/gateway/run.py"): + return True + basename = token.rsplit("/", 1)[-1] + if basename in ("hermes-gateway", "hermes-gateway.exe"): + return True + + joined = " ".join(tokens) + has_gateway_entry = ( + "hermes_cli.main" in joined + or "hermes_cli/main.py" in joined + or any(t.rsplit("/", 1)[-1] in ("hermes", "hermes.exe") for t in tokens) + ) + if not has_gateway_entry: + return False + + # Drop profile selectors anywhere: --profile X / -p X / --profile=X / -p=X. + # This consumes a profile VALUE of "gateway" too, so the real subcommand + # token is the one we land on below. + filtered: list[str] = [] + skip_next = False + for token in tokens: + if skip_next: + skip_next = False + continue + if token in ("--profile", "-p"): + skip_next = True + continue + if token.startswith("--profile=") or token.startswith("-p="): + continue + filtered.append(token) + + for i, token in enumerate(filtered): + if token != "gateway": + continue + if i + 1 >= len(filtered): + return True # bare `hermes gateway` defaults to `run` + return filtered[i + 1] == "run" + return False + + def _looks_like_gateway_process(pid: int) -> bool: """Return True when the live PID still looks like the Hermes gateway.""" cmdline = _read_process_cmdline(pid) if not cmdline: return False - - patterns = ( - "hermes_cli.main gateway", - "hermes_cli/main.py gateway", - "hermes gateway", - "hermes-gateway", - "gateway/run.py", - ) - return any(pattern in cmdline for pattern in patterns) + return looks_like_gateway_command_line(cmdline) def _record_looks_like_gateway(record: dict[str, Any]) -> bool: @@ -189,15 +256,8 @@ def _record_looks_like_gateway(record: dict[str, Any]) -> bool: if not isinstance(argv, list) or not argv: return False - # Normalize Windows backslashes so patterns match cross-platform. - cmdline = " ".join(str(part) for part in argv).replace("\\", "/") - patterns = ( - "hermes_cli.main gateway", - "hermes_cli/main.py gateway", - "hermes gateway", - "gateway/run.py", - ) - return any(pattern in cmdline for pattern in patterns) + cmdline = " ".join(str(part) for part in argv) + return looks_like_gateway_command_line(cmdline) def _build_pid_record() -> dict: @@ -515,6 +575,7 @@ def write_runtime_status( platform_state: Any = _UNSET, error_code: Any = _UNSET, error_message: Any = _UNSET, + served_profiles: Any = _UNSET, ) -> None: """Persist gateway runtime health information for diagnostics/status.""" path = _get_runtime_status_path() @@ -535,6 +596,11 @@ def write_runtime_status( payload["restart_requested"] = bool(restart_requested) if active_agents is not _UNSET: payload["active_agents"] = max(0, int(active_agents)) + if served_profiles is not _UNSET: + # Profiles this gateway multiplexes (multi-profile mode). Absent/empty + # for a single-profile gateway. Lets `hermes status` show per-profile + # coverage without a second probe. + payload["served_profiles"] = list(served_profiles or []) if platform is not _UNSET: platform_payload = payload["platforms"].get(platform, {}) diff --git a/hermes_cli/backup.py b/hermes_cli/backup.py index 0064881c43f..770a8de4569 100644 --- a/hermes_cli/backup.py +++ b/hermes_cli/backup.py @@ -34,14 +34,38 @@ logger = logging.getLogger(__name__) # ``hermes-agent`` is special-cased to root level only in ``_should_exclude`` # so that skill directories like ``skills/autonomous-ai-agents/hermes-agent/`` # are not accidentally excluded. +# +# The dependency/cache entries below matter for more than tidiness: without +# them a single plugin venv, MCP-server install, or pip/uv cache living under +# HERMES_HOME gets walked file-by-file, ballooning a backup to hundreds of +# thousands of entries that crawl for hours — the exact "backup stuck for +# days / 426543 files" symptom users hit. The dependency/test-env names mostly +# mirror ``agent.skill_utils.EXCLUDED_SKILL_DIRS`` (the project's canonical +# "regeneratable dir" set); ``.cache`` is an additional backup-only entry, as +# it names a broad regeneratable cache convention (pip/uv/etc.) that the skill +# scanner doesn't need to prune but a backup walk does. We deliberately do NOT +# exclude ``.archive`` here because the curator's ``skills/.archive/`` holds +# restorable user skills that must survive a backup. _EXCLUDED_DIRS = { "hermes-agent", # the codebase repo — re-clone instead "__pycache__", # bytecode caches — regenerated on import ".git", # nested git dirs (profiles shouldn't have these, but safety) - "node_modules", # js deps if website/ somehow leaks in + "node_modules", # js deps — reinstalled on demand "backups", # prior auto-backups — don't nest backups exponentially "checkpoints", # session-local trajectory caches — regenerated per-session, # session-hash-keyed so they don't port to another machine anyway + # Python dependency trees (plugin / MCP-server venvs under HERMES_HOME) — + # regenerated by reinstalling; never irreplaceable state. + ".venv", + "venv", + "site-packages", + # Tool / build caches — all regeneratable. + ".cache", + ".tox", + ".nox", + ".pytest_cache", + ".mypy_cache", + ".ruff_cache", } # File-name suffixes to skip diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index 514e7f659b3..42e51f29909 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -123,8 +123,8 @@ COMMAND_REGISTRY: list[CommandDef] = [ # Configuration CommandDef("config", "Show current configuration", "Configuration", cli_only=True), - CommandDef("model", "Switch model for this session", "Configuration", - args_hint="[model] [--provider name] [--global] [--refresh]"), + CommandDef("model", "Switch model (persists by default)", "Configuration", + args_hint="[model] [--provider name] [--global|--session] [--refresh]"), CommandDef("codex-runtime", "Toggle codex app-server runtime for OpenAI/Codex models", "Configuration", aliases=("codex_runtime",), args_hint="[auto|codex_app_server]"), diff --git a/hermes_cli/config.py b/hermes_cli/config.py index bf9dc532630..cb574345d35 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1581,6 +1581,14 @@ DEFAULT_CONFIG = { # TUI busy indicator style: kaomoji (default), emoji, unicode (braille # spinner), or ascii. Live-swappable via `/indicator