diff --git a/agent/context_compressor.py b/agent/context_compressor.py index 069a5b65e1..5aa95dc01b 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -20,6 +20,7 @@ from typing import Any, Dict, List, Optional from agent.auxiliary_client import call_llm from agent.context_engine import ContextEngine from agent.model_metadata import ( + MINIMUM_CONTEXT_LENGTH, get_model_context_length, estimate_messages_tokens_rough, ) @@ -87,7 +88,10 @@ class ContextCompressor(ContextEngine): self.api_key = api_key self.provider = provider self.context_length = context_length - self.threshold_tokens = int(context_length * self.threshold_percent) + self.threshold_tokens = max( + int(context_length * self.threshold_percent), + MINIMUM_CONTEXT_LENGTH, + ) def __init__( self, @@ -118,7 +122,14 @@ class ContextCompressor(ContextEngine): config_context_length=config_context_length, provider=provider, ) - self.threshold_tokens = int(self.context_length * threshold_percent) + # Floor: never compress below MINIMUM_CONTEXT_LENGTH tokens even if + # the percentage would suggest a lower value. This prevents premature + # compression on large-context models at 50% while keeping the % sane + # for models right at the minimum. + self.threshold_tokens = max( + int(self.context_length * threshold_percent), + MINIMUM_CONTEXT_LENGTH, + ) self.compression_count = 0 # Derive token budgets: ratio is relative to the threshold, not total context diff --git a/agent/model_metadata.py b/agent/model_metadata.py index f12801777d..03f70b3fe4 100644 --- a/agent/model_metadata.py +++ b/agent/model_metadata.py @@ -85,6 +85,11 @@ CONTEXT_PROBE_TIERS = [ # Default context length when no detection method succeeds. DEFAULT_FALLBACK_CONTEXT = CONTEXT_PROBE_TIERS[0] +# Minimum context length required to run Hermes Agent. Models with fewer +# tokens cannot maintain enough working memory for tool-calling workflows. +# Sessions, model switches, and cron jobs should reject models below this. +MINIMUM_CONTEXT_LENGTH = 64_000 + # Thin fallback defaults — only broad model family patterns. # These fire only when provider is unknown AND models.dev/OpenRouter/Anthropic # all miss. Replaced the previous 80+ entry dict. @@ -1040,16 +1045,21 @@ def get_model_context_length( def estimate_tokens_rough(text: str) -> int: - """Rough token estimate (~4 chars/token) for pre-flight checks.""" + """Rough token estimate (~4 chars/token) for pre-flight checks. + + Uses ceiling division so short texts (1-3 chars) never estimate as + 0 tokens, which would cause the compressor and pre-flight checks to + systematically undercount when many short tool results are present. + """ if not text: return 0 - return len(text) // 4 + return (len(text) + 3) // 4 def estimate_messages_tokens_rough(messages: List[Dict[str, Any]]) -> int: """Rough token estimate for a message list (pre-flight only).""" total_chars = sum(len(str(msg)) for msg in messages) - return total_chars // 4 + return (total_chars + 3) // 4 def estimate_request_tokens_rough( @@ -1072,4 +1082,4 @@ def estimate_request_tokens_rough( total_chars += sum(len(str(msg)) for msg in messages) if tools: total_chars += len(str(tools)) - return total_chars // 4 + return (total_chars + 3) // 4 diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 12e2b39995..31d829de06 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -774,6 +774,11 @@ display: # Toggle at runtime with /verbose in the CLI tool_progress: all + # Gateway-only natural mid-turn assistant updates. + # When true, completed assistant status messages are sent as separate chat + # messages. This is independent of tool_progress and gateway streaming. + interim_assistant_messages: true + # What Enter does when Hermes is already busy in the CLI. # interrupt: Interrupt the current run and redirect Hermes (default) # queue: Queue your message for the next turn diff --git a/docs/specs/container-cli-review-fixes.md b/docs/specs/container-cli-review-fixes.md new file mode 100644 index 0000000000..0eb9070dbf --- /dev/null +++ b/docs/specs/container-cli-review-fixes.md @@ -0,0 +1,329 @@ +# Container-Aware CLI Review Fixes Spec + +**PR:** NousResearch/hermes-agent#7543 +**Review:** cursor[bot] bugbot review (4094049442) + two prior rounds +**Date:** 2026-04-12 +**Branch:** `feat/container-aware-cli-clean` + +## Review Issues Summary + +Six issues were raised across three bugbot review rounds. Three were fixed in intermediate commits (38277a6a, 726cf90f). This spec addresses remaining design concerns surfaced by those reviews and simplifies the implementation based on interview decisions. + +| # | Issue | Severity | Status | +|---|-------|----------|--------| +| 1 | `os.execvp` retry loop unreachable | Medium | Fixed in 79e8cd12 (switched to subprocess.run) | +| 2 | Redundant `shutil.which("sudo")` | Medium | Fixed in 38277a6a (reuses `sudo` var) | +| 3 | Missing `chown -h` on symlink update | Low | Fixed in 38277a6a | +| 4 | Container routing after `parse_args()` | High | Fixed in 726cf90f | +| 5 | Hardcoded `/home/${user}` | Medium | Fixed in 726cf90f | +| 6 | Group membership not gated on `container.enable` | Low | Fixed in 726cf90f | + +The mechanical fixes are in place but the overall design needs revision. The retry loop, error swallowing, and process model have deeper issues than what the bugbot flagged. + +--- + +## Spec: Revised `_exec_in_container` + +### Design Principles + +1. **Let it crash.** No silent fallbacks. If `.container-mode` exists but something goes wrong, the error propagates naturally (Python traceback). The only case where container routing is skipped is when `.container-mode` doesn't exist or `HERMES_DEV=1`. +2. **No retries.** Probe once for sudo, exec once. If it fails, docker/podman's stderr reaches the user verbatim. +3. **Completely transparent.** No error wrapping, no prefixes, no spinners. Docker's output goes straight through. +4. **`os.execvp` on the happy path.** Replace the Python process entirely so there's no idle parent during interactive sessions. Note: `execvp` never returns on success (process is replaced) and raises `OSError` on failure (it does not return a value). The container process's exit code becomes the process exit code by definition — no explicit propagation needed. +5. **One human-readable exception to "let it crash".** `subprocess.TimeoutExpired` from the sudo probe gets a specific catch with a readable message, since a raw traceback for "your Docker daemon is slow" is confusing. All other exceptions propagate naturally. + +### Execution Flow + +``` +1. get_container_exec_info() + - HERMES_DEV=1 → return None (skip routing) + - Inside container → return None (skip routing) + - .container-mode doesn't exist → return None (skip routing) + - .container-mode exists → parse and return dict + - .container-mode exists but malformed/unreadable → LET IT CRASH (no try/except) + +2. _exec_in_container(container_info, sys.argv[1:]) + a. shutil.which(backend) → if None, print "{backend} not found on PATH" and sys.exit(1) + b. Sudo probe: subprocess.run([runtime, "inspect", "--format", "ok", container_name], timeout=15) + - If succeeds → needs_sudo = False + - If fails → try subprocess.run([sudo, "-n", runtime, "inspect", ...], timeout=15) + - If succeeds → needs_sudo = True + - If fails → print error with sudoers hint (including why -n is required) and sys.exit(1) + - If TimeoutExpired → catch specifically, print human-readable message about slow daemon + c. Build exec_cmd: [sudo? + runtime, "exec", tty_flags, "-u", exec_user, env_flags, container, hermes_bin, *cli_args] + d. os.execvp(exec_cmd[0], exec_cmd) + - On success: process is replaced — Python is gone, container exit code IS the process exit code + - On OSError: let it crash (natural traceback) +``` + +### Changes to `hermes_cli/main.py` + +#### `_exec_in_container` — rewrite + +Remove: +- The entire retry loop (`max_retries`, `for attempt in range(...)`) +- Spinner logic (`"Waiting for container..."`, dots) +- Exit code classification (125/126/127 handling) +- `subprocess.run` for the exec call (keep it only for the sudo probe) +- Special TTY vs non-TTY retry counts +- The `time` import (no longer needed) + +Change: +- Use `os.execvp(exec_cmd[0], exec_cmd)` as the final call +- Keep the `subprocess` import only for the sudo probe +- Keep TTY detection for the `-it` vs `-i` flag +- Keep env var forwarding (TERM, COLORTERM, LANG, LC_ALL) +- Keep the sudo probe as-is (it's the one "smart" part) +- Bump probe `timeout` from 5s to 15s — cold podman on a loaded machine needs headroom +- Catch `subprocess.TimeoutExpired` specifically on both probe calls — print a readable message about the daemon being unresponsive instead of a raw traceback +- Expand the sudoers hint error message to explain *why* `-n` (non-interactive) is required: a password prompt would hang the CLI or break piped commands + +The function becomes roughly: + +```python +def _exec_in_container(container_info: dict, cli_args: list): + """Replace the current process with a command inside the managed container. + + Probes whether sudo is needed (rootful containers), then os.execvp + into the container. If exec fails, the OS error propagates naturally. + """ + import shutil + import subprocess + + backend = container_info["backend"] + container_name = container_info["container_name"] + exec_user = container_info["exec_user"] + hermes_bin = container_info["hermes_bin"] + + runtime = shutil.which(backend) + if not runtime: + print(f"Error: {backend} not found on PATH. Cannot route to container.", + file=sys.stderr) + sys.exit(1) + + # Probe whether we need sudo to see the rootful container. + # Timeout is 15s — cold podman on a loaded machine can take a while. + # TimeoutExpired is caught specifically for a human-readable message; + # all other exceptions propagate naturally. + needs_sudo = False + sudo = None + try: + probe = subprocess.run( + [runtime, "inspect", "--format", "ok", container_name], + capture_output=True, text=True, timeout=15, + ) + except subprocess.TimeoutExpired: + print( + f"Error: timed out waiting for {backend} to respond.\n" + f"The {backend} daemon may be unresponsive or starting up.", + file=sys.stderr, + ) + sys.exit(1) + + if probe.returncode != 0: + sudo = shutil.which("sudo") + if sudo: + try: + probe2 = subprocess.run( + [sudo, "-n", runtime, "inspect", "--format", "ok", container_name], + capture_output=True, text=True, timeout=15, + ) + except subprocess.TimeoutExpired: + print( + f"Error: timed out waiting for sudo {backend} to respond.", + file=sys.stderr, + ) + sys.exit(1) + + if probe2.returncode == 0: + needs_sudo = True + else: + print( + f"Error: container '{container_name}' not found via {backend}.\n" + f"\n" + f"The NixOS service runs the container as root. Your user cannot\n" + f"see it because {backend} uses per-user namespaces.\n" + f"\n" + f"Fix: grant passwordless sudo for {backend}. The -n (non-interactive)\n" + f"flag is required because the CLI calls sudo non-interactively —\n" + f"a password prompt would hang or break piped commands:\n" + f"\n" + f' security.sudo.extraRules = [{{\n' + f' users = [ "{os.getenv("USER", "your-user")}" ];\n' + f' commands = [{{ command = "{runtime}"; options = [ "NOPASSWD" ]; }}];\n' + f' }}];\n' + f"\n" + f"Or run: sudo hermes {' '.join(cli_args)}", + file=sys.stderr, + ) + sys.exit(1) + else: + print( + f"Error: container '{container_name}' not found via {backend}.\n" + f"The container may be running under root. Try: sudo hermes {' '.join(cli_args)}", + file=sys.stderr, + ) + sys.exit(1) + + is_tty = sys.stdin.isatty() + tty_flags = ["-it"] if is_tty else ["-i"] + + env_flags = [] + for var in ("TERM", "COLORTERM", "LANG", "LC_ALL"): + val = os.environ.get(var) + if val: + env_flags.extend(["-e", f"{var}={val}"]) + + cmd_prefix = [sudo, "-n", runtime] if needs_sudo else [runtime] + exec_cmd = ( + cmd_prefix + ["exec"] + + tty_flags + + ["-u", exec_user] + + env_flags + + [container_name, hermes_bin] + + cli_args + ) + + # execvp replaces this process entirely — it never returns on success. + # On failure it raises OSError, which propagates naturally. + os.execvp(exec_cmd[0], exec_cmd) +``` + +#### Container routing call site in `main()` — remove try/except + +Current: +```python +try: + from hermes_cli.config import get_container_exec_info + container_info = get_container_exec_info() + if container_info: + _exec_in_container(container_info, sys.argv[1:]) + sys.exit(1) # exec failed if we reach here +except SystemExit: + raise +except Exception: + pass # Container routing unavailable, proceed locally +``` + +Revised: +```python +from hermes_cli.config import get_container_exec_info +container_info = get_container_exec_info() +if container_info: + _exec_in_container(container_info, sys.argv[1:]) + # Unreachable: os.execvp never returns on success (process is replaced) + # and raises OSError on failure (which propagates as a traceback). + # This line exists only as a defensive assertion. + sys.exit(1) +``` + +No try/except. If `.container-mode` doesn't exist, `get_container_exec_info()` returns `None` and we skip routing. If it exists but is broken, the exception propagates with a natural traceback. + +Note: `sys.exit(1)` after `_exec_in_container` is dead code in all paths — `os.execvp` either replaces the process or raises. It's kept as a belt-and-suspenders assertion with a comment marking it unreachable, not as actual error handling. + +### Changes to `hermes_cli/config.py` + +#### `get_container_exec_info` — remove inner try/except + +Current code catches `(OSError, IOError)` and returns `None`. This silently hides permission errors, corrupt files, etc. + +Change: Remove the try/except around file reading. Keep the early returns for `HERMES_DEV=1` and `_is_inside_container()`. The `FileNotFoundError` from `open()` when `.container-mode` doesn't exist should still return `None` (this is the "container mode not enabled" case). All other exceptions propagate. + +```python +def get_container_exec_info() -> Optional[dict]: + if os.environ.get("HERMES_DEV") == "1": + return None + if _is_inside_container(): + return None + + container_mode_file = get_hermes_home() / ".container-mode" + + try: + with open(container_mode_file, "r") as f: + # ... parse key=value lines ... + except FileNotFoundError: + return None + # All other exceptions (PermissionError, malformed data, etc.) propagate + + return { ... } +``` + +--- + +## Spec: NixOS Module Changes + +### Symlink creation — simplify to two branches + +Current: 4 branches (symlink exists, directory exists, other file, doesn't exist). + +Revised: 2 branches. + +```bash +if [ -d "${symlinkPath}" ] && [ ! -L "${symlinkPath}" ]; then + # Real directory — back it up, then create symlink + _backup="${symlinkPath}.bak.$(date +%s)" + echo "hermes-agent: backing up existing ${symlinkPath} to $_backup" + mv "${symlinkPath}" "$_backup" +fi +# For everything else (symlink, doesn't exist, etc.) — just force-create +ln -sfn "${target}" "${symlinkPath}" +chown -h ${user}:${cfg.group} "${symlinkPath}" +``` + +`ln -sfn` handles: existing symlink (replaces), doesn't exist (creates), and after the `mv` above (creates). The only case that needs special handling is a real directory, because `ln -sfn` cannot atomically replace a directory. + +Note: there is a theoretical race between the `[ -d ... ]` check and the `mv` (something could create/remove the directory in between). In practice this is a NixOS activation script running as root during `nixos-rebuild switch` — no other process should be touching `~/.hermes` at that moment. Not worth adding locking for. + +### Sudoers — document, don't auto-configure + +Do NOT add `security.sudo.extraRules` to the module. Document the sudoers requirement in the module's description/comments and in the error message the CLI prints when sudo probe fails. + +### Group membership gating — keep as-is + +The fix in 726cf90f (`cfg.container.enable && cfg.container.hostUsers != []`) is correct. Leftover group membership when container mode is disabled is harmless. No cleanup needed. + +--- + +## Spec: Test Rewrite + +The existing test file (`tests/hermes_cli/test_container_aware_cli.py`) has 16 tests. With the simplified exec model, several are obsolete. + +### Tests to keep (update as needed) + +- `test_is_inside_container_dockerenv` — unchanged +- `test_is_inside_container_containerenv` — unchanged +- `test_is_inside_container_cgroup_docker` — unchanged +- `test_is_inside_container_false_on_host` — unchanged +- `test_get_container_exec_info_returns_metadata` — unchanged +- `test_get_container_exec_info_none_inside_container` — unchanged +- `test_get_container_exec_info_none_without_file` — unchanged +- `test_get_container_exec_info_skipped_when_hermes_dev` — unchanged +- `test_get_container_exec_info_not_skipped_when_hermes_dev_zero` — unchanged +- `test_get_container_exec_info_defaults` — unchanged +- `test_get_container_exec_info_docker_backend` — unchanged + +### Tests to add + +- `test_get_container_exec_info_crashes_on_permission_error` — verify that `PermissionError` propagates (no silent `None` return) +- `test_exec_in_container_calls_execvp` — verify `os.execvp` is called with correct args (runtime, tty flags, user, env, container, binary, cli args) +- `test_exec_in_container_sudo_probe_sets_prefix` — verify that when first probe fails and sudo probe succeeds, `os.execvp` is called with `sudo -n` prefix +- `test_exec_in_container_no_runtime_hard_fails` — keep existing, verify `sys.exit(1)` when `shutil.which` returns None +- `test_exec_in_container_non_tty_uses_i_only` — update to check `os.execvp` args instead of `subprocess.run` args +- `test_exec_in_container_probe_timeout_prints_message` — verify that `subprocess.TimeoutExpired` from the probe produces a human-readable error and `sys.exit(1)`, not a raw traceback +- `test_exec_in_container_container_not_running_no_sudo` — verify the path where runtime exists (`shutil.which` returns a path) but probe returns non-zero and no sudo is available. Should print the "container may be running under root" error. This is distinct from `no_runtime_hard_fails` which covers `shutil.which` returning None. + +### Tests to delete + +- `test_exec_in_container_tty_retries_on_container_failure` — retry loop removed +- `test_exec_in_container_non_tty_retries_silently_exits_126` — retry loop removed +- `test_exec_in_container_propagates_hermes_exit_code` — no subprocess.run to check exit codes; execvp replaces the process. Note: exit code propagation still works correctly — when `os.execvp` succeeds, the container's process *becomes* this process, so its exit code is the process exit code by OS semantics. No application code needed, no test needed. A comment in the function docstring documents this intent for future readers. + +--- + +## Out of Scope + +- Auto-configuring sudoers rules in the NixOS module +- Any changes to `get_container_exec_info` parsing logic beyond the try/except narrowing +- Changes to `.container-mode` file format +- Changes to the `HERMES_DEV=1` bypass +- Changes to container detection logic (`_is_inside_container`) diff --git a/gateway/run.py b/gateway/run.py index 1ba7fc8470..5ca1a7d7f7 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -76,7 +76,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) # Resolve Hermes home directory (respects HERMES_HOME override) from hermes_constants import get_hermes_home -from utils import atomic_yaml_write +from utils import atomic_yaml_write, is_truthy_value _hermes_home = get_hermes_home() # Load environment variables from ~/.hermes/.env first. @@ -7079,10 +7079,14 @@ class GatewayRunner: from hermes_cli.tools_config import _get_platform_tools enabled_toolsets = sorted(_get_platform_tools(user_config, platform_key)) + display_config = user_config.get("display", {}) + if not isinstance(display_config, dict): + display_config = {} + # Apply tool preview length config (0 = no limit) try: from agent.display import set_tool_preview_max_len - _tpl = user_config.get("display", {}).get("tool_preview_length", 0) + _tpl = display_config.get("tool_preview_length", 0) set_tool_preview_max_len(int(_tpl) if _tpl else 0) except Exception: pass @@ -7095,11 +7099,12 @@ class GatewayRunner: # Per-platform overrides (display.tool_progress_overrides) take # priority over the global setting — e.g. Signal users can set # tool_progress to "off" while keeping Telegram on "all". - _display_cfg = user_config.get("display", {}) - _overrides = _display_cfg.get("tool_progress_overrides", {}) + _overrides = display_config.get("tool_progress_overrides", {}) + if not isinstance(_overrides, dict): + _overrides = {} _raw_tp = _overrides.get(platform_key) if _raw_tp is None: - _raw_tp = _display_cfg.get("tool_progress") + _raw_tp = display_config.get("tool_progress") if _raw_tp is False: _raw_tp = "off" progress_mode = ( @@ -7111,6 +7116,16 @@ class GatewayRunner: # so each progress line would be sent as a separate message. from gateway.config import Platform tool_progress_enabled = progress_mode != "off" and source.platform != Platform.WEBHOOK + # Natural assistant status messages are intentionally independent from + # tool progress and token streaming. Users can keep tool_progress quiet + # in chat platforms while opting into concise mid-turn updates. + interim_assistant_messages_enabled = ( + source.platform != Platform.WEBHOOK + and is_truthy_value( + display_config.get("interim_assistant_messages"), + default=True, + ) + ) # Queue for progress messages (thread-safe) progress_queue = queue.Queue() if tool_progress_enabled else None @@ -7423,7 +7438,7 @@ class GatewayRunner: reasoning_config = self._load_reasoning_config() self._reasoning_config = reasoning_config self._service_tier = self._load_service_tier() - # Set up streaming consumer if enabled + # Set up stream consumer for token streaming or interim commentary. _stream_consumer = None _stream_delta_cb = None _scfg = getattr(getattr(self, 'config', None), 'streaming', None) @@ -7431,7 +7446,10 @@ class GatewayRunner: from gateway.config import StreamingConfig _scfg = StreamingConfig() - if _scfg.enabled and _scfg.transport != "off": + _want_stream_deltas = _scfg.enabled and _scfg.transport != "off" + _want_interim_messages = interim_assistant_messages_enabled + _want_interim_consumer = _want_interim_messages + if _want_stream_deltas or _want_interim_consumer: try: from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig _adapter = self.adapters.get(source.platform) @@ -7447,11 +7465,33 @@ class GatewayRunner: config=_consumer_cfg, metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, ) - _stream_delta_cb = _stream_consumer.on_delta + if _want_stream_deltas: + _stream_delta_cb = _stream_consumer.on_delta stream_consumer_holder[0] = _stream_consumer except Exception as _sc_err: logger.debug("Could not set up stream consumer: %s", _sc_err) + def _interim_assistant_cb(text: str, *, already_streamed: bool = False) -> None: + if _stream_consumer is not None: + if already_streamed: + _stream_consumer.on_segment_break() + else: + _stream_consumer.on_commentary(text) + return + if already_streamed or not _status_adapter or not str(text or "").strip(): + return + try: + asyncio.run_coroutine_threadsafe( + _status_adapter.send( + _status_chat_id, + text, + metadata=_status_thread_metadata, + ), + _loop_for_step, + ) + except Exception as _e: + logger.debug("interim_assistant_callback error: %s", _e) + turn_route = self._resolve_turn_agent_config(message, model, runtime_kwargs) # Check agent cache — reuse the AIAgent from the previous message @@ -7509,6 +7549,7 @@ class GatewayRunner: agent.tool_progress_callback = progress_callback if tool_progress_enabled else None agent.step_callback = _step_callback_sync if _hooks_ref.loaded_hooks else None agent.stream_delta_callback = _stream_delta_cb + agent.interim_assistant_callback = _interim_assistant_cb if _want_interim_messages else None agent.status_callback = _status_callback_sync agent.reasoning_config = reasoning_config agent.service_tier = self._service_tier @@ -7812,6 +7853,7 @@ class GatewayRunner: "output_tokens": _output_toks, "model": _resolved_model, "session_id": effective_session_id, + "response_previewed": result.get("response_previewed", False), } # Start progress message sender if enabled @@ -8134,12 +8176,36 @@ class GatewayRunner: # response before processing the queued follow-up. # Skip if streaming already delivered it. _sc = stream_consumer_holder[0] - _already_streamed = _sc and getattr(_sc, "already_sent", False) + if _sc and stream_task: + try: + await asyncio.wait_for(stream_task, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + stream_task.cancel() + try: + await stream_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("Stream consumer wait before queued message failed: %s", e) + _response_previewed = bool(result.get("response_previewed")) + _already_streamed = bool( + _sc + and ( + getattr(_sc, "final_response_sent", False) + or ( + _response_previewed + and getattr(_sc, "already_sent", False) + ) + ) + ) first_response = result.get("final_response", "") if first_response and not _already_streamed: try: - await adapter.send(source.chat_id, first_response, - metadata={"thread_id": source.thread_id} if source.thread_id else None) + await adapter.send( + source.chat_id, + first_response, + metadata=_status_thread_metadata, + ) except Exception as e: logger.warning("Failed to send first response before queued message: %s", e) # else: interrupted — discard the interrupted response ("Operation @@ -8212,8 +8278,15 @@ class GatewayRunner: # message is new content the user hasn't seen, and it must reach # them even if streaming had sent earlier partial output. _sc = stream_consumer_holder[0] - if _sc and _sc.already_sent and isinstance(response, dict): - if not response.get("failed"): + if _sc and isinstance(response, dict) and not response.get("failed"): + _response_previewed = bool(response.get("response_previewed")) + if ( + getattr(_sc, "final_response_sent", False) + or ( + _response_previewed + and getattr(_sc, "already_sent", False) + ) + ): response["already_sent"] = True return response diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index de0a1453b9..486d179de9 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -32,6 +32,10 @@ _DONE = object() # new one so that subsequent text appears below tool progress messages. _NEW_SEGMENT = object() +# Queue marker for a completed assistant commentary message emitted between +# API/tool iterations (for example: "I'll inspect the repo first."). +_COMMENTARY = object() + @dataclass class StreamConsumerConfig: @@ -75,20 +79,43 @@ class GatewayStreamConsumer: self._accumulated = "" self._message_id: Optional[str] = None self._already_sent = False - self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA) + self._edit_supported = True # Disabled when progressive edits are no longer usable self._last_edit_time = 0.0 self._last_sent_text = "" # Track last-sent text to skip redundant edits self._fallback_final_send = False self._fallback_prefix = "" self._flood_strikes = 0 # Consecutive flood-control edit failures self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff + self._final_response_sent = False @property def already_sent(self) -> bool: - """True if at least one message was sent/edited — signals the base - adapter to skip re-sending the final response.""" + """True if at least one message was sent or edited during the run.""" return self._already_sent + @property + def final_response_sent(self) -> bool: + """True when the stream consumer delivered the final assistant reply.""" + return self._final_response_sent + + def on_segment_break(self) -> None: + """Finalize the current stream segment and start a fresh message.""" + self._queue.put(_NEW_SEGMENT) + + def on_commentary(self, text: str) -> None: + """Queue a completed interim assistant commentary message.""" + if text: + self._queue.put((_COMMENTARY, text)) + + def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None: + if preserve_no_edit and self._message_id == "__no_edit__": + return + self._message_id = None + self._accumulated = "" + self._last_sent_text = "" + self._fallback_final_send = False + self._fallback_prefix = "" + def on_delta(self, text: str) -> None: """Thread-safe callback — called from the agent's worker thread. @@ -99,7 +126,7 @@ class GatewayStreamConsumer: if text: self._queue.put(text) elif text is None: - self._queue.put(_NEW_SEGMENT) + self.on_segment_break() def finish(self) -> None: """Signal that the stream is complete.""" @@ -116,6 +143,7 @@ class GatewayStreamConsumer: # Drain all available items from the queue got_done = False got_segment_break = False + commentary_text = None while True: try: item = self._queue.get_nowait() @@ -125,6 +153,9 @@ class GatewayStreamConsumer: if item is _NEW_SEGMENT: got_segment_break = True break + if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY: + commentary_text = item[1] + break self._accumulated += item except queue.Empty: break @@ -135,11 +166,13 @@ class GatewayStreamConsumer: should_edit = ( got_done or got_segment_break + or commentary_text is not None or (elapsed >= self._current_edit_interval and self._accumulated) or len(self._accumulated) >= self.cfg.buffer_threshold ) + current_update_visible = False if should_edit and self._accumulated: # Split overflow: if accumulated text exceeds the platform # limit, split into properly sized chunks. @@ -161,6 +194,7 @@ class GatewayStreamConsumer: self._last_sent_text = "" self._last_edit_time = time.monotonic() if got_done: + self._final_response_sent = self._already_sent return if got_segment_break: self._message_id = None @@ -192,10 +226,10 @@ class GatewayStreamConsumer: self._last_sent_text = "" display_text = self._accumulated - if not got_done and not got_segment_break: + if not got_done and not got_segment_break and commentary_text is None: display_text += self.cfg.cursor - await self._send_or_edit(display_text) + current_update_visible = await self._send_or_edit(display_text) self._last_edit_time = time.monotonic() if got_done: @@ -206,12 +240,20 @@ class GatewayStreamConsumer: if self._accumulated: if self._fallback_final_send: await self._send_fallback_final(self._accumulated) + elif current_update_visible: + self._final_response_sent = True elif self._message_id: - await self._send_or_edit(self._accumulated) + self._final_response_sent = await self._send_or_edit(self._accumulated) elif not self._already_sent: - await self._send_or_edit(self._accumulated) + self._final_response_sent = await self._send_or_edit(self._accumulated) return + if commentary_text is not None: + self._reset_segment_state() + await self._send_commentary(commentary_text) + self._last_edit_time = time.monotonic() + self._reset_segment_state() + # Tool boundary: reset message state so the next text chunk # creates a fresh message below any tool-progress messages. # @@ -220,17 +262,14 @@ class GatewayStreamConsumer: # github_comment delivery). Resetting to None would re-enter # the "first send" path on every tool boundary and post one # platform message per tool call — that is what caused 155 - # comments under a single PR. Instead, keep all state so the - # full continuation is delivered once via _send_fallback_final. + # comments under a single PR. Instead, preserve the sentinel + # so the full continuation is delivered once via + # _send_fallback_final. # (When editing fails mid-stream due to flood control the id is # a real string like "msg_1", not "__no_edit__", so that case # still resets and creates a fresh segment as intended.) - if got_segment_break and self._message_id != "__no_edit__": - self._message_id = None - self._accumulated = "" - self._last_sent_text = "" - self._fallback_final_send = False - self._fallback_prefix = "" + if got_segment_break: + self._reset_segment_state(preserve_no_edit=True) await asyncio.sleep(0.05) # Small yield to not busy-loop @@ -339,6 +378,7 @@ class GatewayStreamConsumer: if not continuation.strip(): # Nothing new to send — the visible partial already matches final text. self._already_sent = True + self._final_response_sent = True return raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) @@ -373,6 +413,7 @@ class GatewayStreamConsumer: # the base gateway final-send path so we don't resend the # full response and create another duplicate. self._already_sent = True + self._final_response_sent = True self._message_id = last_message_id self._last_sent_text = last_successful_chunk self._fallback_prefix = "" @@ -390,6 +431,7 @@ class GatewayStreamConsumer: self._message_id = last_message_id self._already_sent = True + self._final_response_sent = True self._last_sent_text = chunks[-1] self._fallback_prefix = "" @@ -420,6 +462,24 @@ class GatewayStreamConsumer: except Exception: pass # best-effort — don't let this block the fallback path + async def _send_commentary(self, text: str) -> bool: + """Send a completed interim assistant commentary message.""" + text = self._clean_for_display(text) + if not text.strip(): + return False + try: + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + if result.success: + self._already_sent = True + return True + except Exception as e: + logger.error("Commentary send error: %s", e) + return False + async def _send_or_edit(self, text: str) -> bool: """Send or edit the streaming message. @@ -501,23 +561,21 @@ class GatewayStreamConsumer: content=text, metadata=self.metadata, ) - if result.success and result.message_id: - self._message_id = result.message_id + if result.success: + if result.message_id: + self._message_id = result.message_id + else: + self._edit_supported = False self._already_sent = True self._last_sent_text = text + if not result.message_id: + self._fallback_prefix = self._visible_prefix() + self._fallback_final_send = True + # Sentinel prevents re-entering the first-send path on + # every delta/tool boundary when platforms accept a + # message but do not return an editable message id. + self._message_id = "__no_edit__" return True - elif result.success: - # Platform accepted the message but returned no message_id - # (e.g. Signal). Can't edit without an ID — switch to - # fallback mode: suppress intermediate deltas, send only - # the missing tail once the final response is ready. - self._already_sent = True - self._edit_supported = False - self._fallback_prefix = self._clean_for_display(text) - self._fallback_final_send = True - # Sentinel prevents re-entering this branch on every delta - self._message_id = "__no_edit__" - return True # platform accepted, just can't edit else: # Initial send failed — disable streaming for this session self._edit_supported = False diff --git a/hermes_cli/config.py b/hermes_cli/config.py index f551a195d0..a4eee56f94 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -143,6 +143,73 @@ def managed_error(action: str = "modify configuration"): print(format_managed_message(action), file=sys.stderr) +# ============================================================================= +# Container-aware CLI (NixOS container mode) +# ============================================================================= + +def _is_inside_container() -> bool: + """Detect if we're already running inside a Docker/Podman container.""" + # Standard Docker/Podman indicators + if os.path.exists("/.dockerenv"): + return True + # Podman uses /run/.containerenv + if os.path.exists("/run/.containerenv"): + return True + # Check cgroup for container runtime evidence (works for both Docker & Podman) + try: + with open("/proc/1/cgroup", "r") as f: + cgroup = f.read() + if "docker" in cgroup or "podman" in cgroup or "/lxc/" in cgroup: + return True + except OSError: + pass + return False + + +def get_container_exec_info() -> Optional[dict]: + """Read container mode metadata from HERMES_HOME/.container-mode. + + Returns a dict with keys: backend, container_name, exec_user, hermes_bin + or None if container mode is not active, we're already inside the + container, or HERMES_DEV=1 is set. + + The .container-mode file is written by the NixOS activation script when + container.enable = true. It tells the host CLI to exec into the container + instead of running locally. + """ + if os.environ.get("HERMES_DEV") == "1": + return None + + if _is_inside_container(): + return None + + container_mode_file = get_hermes_home() / ".container-mode" + + try: + info = {} + with open(container_mode_file, "r") as f: + for line in f: + line = line.strip() + if "=" in line and not line.startswith("#"): + key, _, value = line.partition("=") + info[key.strip()] = value.strip() + except FileNotFoundError: + return None + # All other exceptions (PermissionError, malformed data, etc.) propagate + + backend = info.get("backend", "docker") + container_name = info.get("container_name", "hermes-agent") + exec_user = info.get("exec_user", "hermes") + hermes_bin = info.get("hermes_bin", "/data/current-package/bin/hermes") + + return { + "backend": backend, + "container_name": container_name, + "exec_user": exec_user, + "hermes_bin": hermes_bin, + } + + # ============================================================================= # Config paths # ============================================================================= @@ -448,6 +515,7 @@ DEFAULT_CONFIG = { "inline_diffs": True, # Show inline diff previews for write actions (write_file, patch, skill_manage) "show_cost": False, # Show $ cost in the status bar (off by default) "skin": "default", + "interim_assistant_messages": True, # Gateway: show natural mid-turn assistant status messages "tool_progress_command": False, # Enable /verbose command in messaging gateway "tool_progress_overrides": {}, # Per-platform overrides: {"signal": "off", "telegram": "all"} "tool_preview_length": 0, # Max chars for tool call previews (0 = no limit, show full paths/commands) @@ -638,7 +706,7 @@ DEFAULT_CONFIG = { }, # Config schema version - bump this when adding new required fields - "_config_version": 14, + "_config_version": 15, } # ============================================================================= @@ -1865,6 +1933,20 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A if not quiet: print(f" ✓ Migrated legacy stt.model to provider-specific config") + # ── Version 14 → 15: add explicit gateway interim-message gate ── + if current_ver < 15: + config = read_raw_config() + display = config.get("display", {}) + if not isinstance(display, dict): + display = {} + if "interim_assistant_messages" not in display: + display["interim_assistant_messages"] = True + config["display"] = display + results["config_added"].append("display.interim_assistant_messages=true (default)") + save_config(config) + if not quiet: + print(" ✓ Added display.interim_assistant_messages=true") + if current_ver < latest_ver and not quiet: print(f"Config version: {current_ver} → {latest_ver}") diff --git a/hermes_cli/main.py b/hermes_cli/main.py index c74b7945e2..be0d4b0933 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -528,6 +528,113 @@ def _resolve_last_cli_session() -> Optional[str]: return None +def _probe_container(cmd: list, backend: str, via_sudo: bool = False): + """Run a container inspect probe, returning the CompletedProcess. + + Catches TimeoutExpired specifically for a human-readable message; + all other exceptions propagate naturally. + """ + try: + return subprocess.run(cmd, capture_output=True, text=True, timeout=15) + except subprocess.TimeoutExpired: + label = f"sudo {backend}" if via_sudo else backend + print( + f"Error: timed out waiting for {label} to respond.\n" + f"The {backend} daemon may be unresponsive or starting up.", + file=sys.stderr, + ) + sys.exit(1) + + +def _exec_in_container(container_info: dict, cli_args: list): + """Replace the current process with a command inside the managed container. + + Probes whether sudo is needed (rootful containers), then os.execvp + into the container. On success the Python process is replaced entirely + and the container's exit code becomes the process exit code (OS semantics). + On failure, OSError propagates naturally. + + Args: + container_info: dict with backend, container_name, exec_user, hermes_bin + cli_args: the original CLI arguments (everything after 'hermes') + """ + import shutil + + backend = container_info["backend"] + container_name = container_info["container_name"] + exec_user = container_info["exec_user"] + hermes_bin = container_info["hermes_bin"] + + runtime = shutil.which(backend) + if not runtime: + print(f"Error: {backend} not found on PATH. Cannot route to container.", + file=sys.stderr) + sys.exit(1) + + # Rootful containers (NixOS systemd service) are invisible to unprivileged + # users — Podman uses per-user namespaces, Docker needs group access. + # Probe whether the runtime can see the container; if not, try via sudo. + sudo_path = None + probe = _probe_container( + [runtime, "inspect", "--format", "ok", container_name], backend, + ) + if probe.returncode != 0: + sudo_path = shutil.which("sudo") + if sudo_path: + probe2 = _probe_container( + [sudo_path, "-n", runtime, "inspect", "--format", "ok", container_name], + backend, via_sudo=True, + ) + if probe2.returncode != 0: + print( + f"Error: container '{container_name}' not found via {backend}.\n" + f"\n" + f"The container is likely running as root. Your user cannot see it\n" + f"because {backend} uses per-user namespaces. Grant passwordless\n" + f"sudo for {backend} — the -n (non-interactive) flag is required\n" + f"because a password prompt would hang or break piped commands.\n" + f"\n" + f"On NixOS:\n" + f"\n" + f' security.sudo.extraRules = [{{\n' + f' users = [ "{os.getenv("USER", "your-user")}" ];\n' + f' commands = [{{ command = "{runtime}"; options = [ "NOPASSWD" ]; }}];\n' + f' }}];\n' + f"\n" + f"Or run: sudo hermes {' '.join(cli_args)}", + file=sys.stderr, + ) + sys.exit(1) + else: + print( + f"Error: container '{container_name}' not found via {backend}.\n" + f"The container may be running under root. Try: sudo hermes {' '.join(cli_args)}", + file=sys.stderr, + ) + sys.exit(1) + + is_tty = sys.stdin.isatty() + tty_flags = ["-it"] if is_tty else ["-i"] + + env_flags = [] + for var in ("TERM", "COLORTERM", "LANG", "LC_ALL"): + val = os.environ.get(var) + if val: + env_flags.extend(["-e", f"{var}={val}"]) + + cmd_prefix = [sudo_path, "-n", runtime] if sudo_path else [runtime] + exec_cmd = ( + cmd_prefix + ["exec"] + + tty_flags + + ["-u", exec_user] + + env_flags + + [container_name, hermes_bin] + + cli_args + ) + + os.execvp(exec_cmd[0], exec_cmd) + + def _resolve_session_by_name_or_id(name_or_id: str) -> Optional[str]: """Resolve a session name (title) or ID to a session ID. @@ -5667,9 +5774,22 @@ Examples: # Pre-process argv so unquoted multi-word session names after -c / -r # are merged into a single token before argparse sees them. # e.g. ``hermes -c Pokemon Agent Dev`` → ``hermes -c 'Pokemon Agent Dev'`` + # ── Container-aware routing ──────────────────────────────────────── + # When NixOS container mode is active, route ALL subcommands into + # the managed container. This MUST run before parse_args() so that + # --help, unrecognised flags, and every subcommand are forwarded + # transparently instead of being intercepted by argparse on the host. + from hermes_cli.config import get_container_exec_info + container_info = get_container_exec_info() + if container_info: + _exec_in_container(container_info, sys.argv[1:]) + # Unreachable: os.execvp never returns on success (process is replaced) + # and raises OSError on failure (which propagates as a traceback). + sys.exit(1) + _processed_argv = _coalesce_session_name_args(sys.argv[1:]) args = parser.parse_args(_processed_argv) - + # Handle --version flag if args.version: cmd_version(args) diff --git a/nix/nixosModules.nix b/nix/nixosModules.nix index b1be031df2..75b3dca31b 100644 --- a/nix/nixosModules.nix +++ b/nix/nixosModules.nix @@ -499,6 +499,16 @@ default = "ubuntu:24.04"; description = "OCI container image. The container pulls this at runtime via Docker/Podman."; }; + + hostUsers = mkOption { + type = types.listOf types.str; + default = [ ]; + description = '' + Interactive users who get a ~/.hermes symlink to the service + stateDir. These users are automatically added to the hermes group. + ''; + example = [ "sidbin" ]; + }; }; }; @@ -557,6 +567,25 @@ environment.variables.HERMES_HOME = "${cfg.stateDir}/.hermes"; }) + # ── Host user group membership ───────────────────────────────────── + (lib.mkIf (cfg.container.enable && cfg.container.hostUsers != []) { + users.users = lib.genAttrs cfg.container.hostUsers (user: { + extraGroups = [ cfg.group ]; + }); + }) + + # ── Warnings ────────────────────────────────────────────────────── + (lib.mkIf (cfg.container.enable && !cfg.addToSystemPackages && cfg.container.hostUsers != []) { + warnings = [ + '' + services.hermes-agent: container.enable is true and container.hostUsers + is set, but addToSystemPackages is false. Without a host-installed hermes + binary, container routing will not work for interactive users. + Set addToSystemPackages = true or ensure hermes is on PATH. + '' + ]; + }) + # ── Directories ─────────────────────────────────────────────────── { systemd.tmpfiles.rules = [ @@ -611,6 +640,59 @@ chown ${cfg.user}:${cfg.group} ${cfg.stateDir}/.hermes/.managed chmod 0644 ${cfg.stateDir}/.hermes/.managed + # Container mode metadata — tells the host CLI to exec into the + # container instead of running locally. Removed when container mode + # is disabled so the host CLI falls back to native execution. + ${if cfg.container.enable then '' + cat > ${cfg.stateDir}/.hermes/.container-mode <<'HERMES_CONTAINER_MODE_EOF' +# Written by NixOS activation script. Do not edit manually. +backend=${cfg.container.backend} +container_name=${containerName} +exec_user=${cfg.user} +hermes_bin=${containerDataDir}/current-package/bin/hermes +HERMES_CONTAINER_MODE_EOF + chown ${cfg.user}:${cfg.group} ${cfg.stateDir}/.hermes/.container-mode + chmod 0644 ${cfg.stateDir}/.hermes/.container-mode + '' else '' + rm -f ${cfg.stateDir}/.hermes/.container-mode + + # Remove symlink bridge for hostUsers + ${lib.concatStringsSep "\n" (map (user: + let + userHome = config.users.users.${user}.home; + symlinkPath = "${userHome}/.hermes"; + in '' + if [ -L "${symlinkPath}" ] && [ "$(readlink "${symlinkPath}")" = "${cfg.stateDir}/.hermes" ]; then + rm -f "${symlinkPath}" + echo "hermes-agent: removed symlink ${symlinkPath}" + fi + '') cfg.container.hostUsers)} + ''} + + # ── Symlink bridge for interactive users ─────────────────────── + # Create ~/.hermes -> stateDir/.hermes for each hostUser so the + # host CLI shares state with the container service. + # Only runs when container mode is enabled. + ${lib.optionalString cfg.container.enable + (lib.concatStringsSep "\n" (map (user: + let + userHome = config.users.users.${user}.home; + symlinkPath = "${userHome}/.hermes"; + target = "${cfg.stateDir}/.hermes"; + in '' + if [ -d "${symlinkPath}" ] && [ ! -L "${symlinkPath}" ]; then + # Real directory — back it up, then create symlink. + # (ln -sfn cannot atomically replace a directory.) + _backup="${symlinkPath}.bak.$(date +%s)" + echo "hermes-agent: backing up existing ${symlinkPath} to $_backup" + mv "${symlinkPath}" "$_backup" + fi + # For everything else (existing symlink, doesn't exist, etc.) + # ln -sfn handles it: replaces symlinks, creates new ones. + ln -sfn "${target}" "${symlinkPath}" + chown -h ${user}:${cfg.group} "${symlinkPath}" + '') cfg.container.hostUsers))} + # Seed auth file if provided ${lib.optionalString (cfg.authFile != null) '' ${if cfg.authFileForceOverwrite then '' diff --git a/run_agent.py b/run_agent.py index cc93594d68..2fed5c36ce 100644 --- a/run_agent.py +++ b/run_agent.py @@ -339,10 +339,7 @@ def _paths_overlap(left: Path, right: Path) -> bool: _SURROGATE_RE = re.compile(r'[\ud800-\udfff]') -_BUDGET_WARNING_RE = re.compile( - r"\[BUDGET(?:\s+WARNING)?:\s+Iteration\s+\d+/\d+\..*?\]", - re.DOTALL, -) + def _sanitize_surrogates(text: str) -> str: @@ -463,34 +460,7 @@ def _sanitize_messages_non_ascii(messages: list) -> bool: return found -def _strip_budget_warnings_from_history(messages: list) -> None: - """Remove budget pressure warnings from tool-result messages in-place. - Budget warnings are turn-scoped signals that must not leak into replayed - history. They live in tool-result ``content`` either as a JSON key - (``_budget_warning``) or appended plain text. - """ - for msg in messages: - if not isinstance(msg, dict) or msg.get("role") != "tool": - continue - content = msg.get("content") - if not isinstance(content, str) or "_budget_warning" not in content and "[BUDGET" not in content: - continue - - # Try JSON first (the common case: _budget_warning key in a dict) - try: - parsed = json.loads(content) - if isinstance(parsed, dict) and "_budget_warning" in parsed: - del parsed["_budget_warning"] - msg["content"] = json.dumps(parsed, ensure_ascii=False) - continue - except (json.JSONDecodeError, TypeError): - pass - - # Fallback: strip the text pattern from plain-text tool results - cleaned = _BUDGET_WARNING_RE.sub("", content).strip() - if cleaned != content: - msg["content"] = cleaned # ========================================================================= @@ -579,6 +549,7 @@ class AIAgent: clarify_callback: callable = None, step_callback: callable = None, stream_delta_callback: callable = None, + interim_assistant_callback: callable = None, tool_gen_callback: callable = None, status_callback: callable = None, max_tokens: int = None, @@ -728,6 +699,7 @@ class AIAgent: self.clarify_callback = clarify_callback self.step_callback = step_callback self.stream_delta_callback = stream_delta_callback + self.interim_assistant_callback = interim_assistant_callback self.status_callback = status_callback self.tool_gen_callback = tool_gen_callback @@ -775,12 +747,14 @@ class AIAgent: self._use_prompt_caching = (is_openrouter and is_claude) or is_native_anthropic self._cache_ttl = "5m" # Default 5-minute TTL (1.25x write cost) - # Iteration budget pressure: warn the LLM as it approaches max_iterations. - # Warnings are injected into the last tool result JSON (not as separate - # messages) so they don't break message structure or invalidate caching. - self._budget_caution_threshold = 0.7 # 70% — nudge to start wrapping up - self._budget_warning_threshold = 0.9 # 90% — urgent, respond now - self._budget_pressure_enabled = True + # Iteration budget: the LLM is only notified when it actually exhausts + # the iteration budget (api_call_count >= max_iterations). At that + # point we inject ONE message, allow one final API call, and if the + # model doesn't produce a text response, force a user-message asking + # it to summarise. No intermediate pressure warnings — they caused + # models to "give up" prematurely on complex tasks (#7915). + self._budget_exhausted_injected = False + self._budget_grace_call = False # Context pressure warnings: notify the USER (not the LLM) as context # fills up. Purely informational — displayed in CLI output and sent via @@ -831,6 +805,11 @@ class AIAgent: # Deferred paragraph break flag — set after tool iterations so a # single "\n\n" is prepended to the next real text delta. self._stream_needs_break = False + # Visible assistant text already delivered through live token callbacks + # during the current model response. Used to avoid re-sending the same + # commentary when the provider later returns it as a completed interim + # assistant message. + self._current_streamed_assistant_text = "" # Optional current-turn user-message override used when the API-facing # user message intentionally differs from the persisted transcript @@ -1331,6 +1310,19 @@ class AIAgent: ) self.compression_enabled = compression_enabled + # Reject models whose context window is below the minimum required + # for reliable tool-calling workflows (64K tokens). + from agent.model_metadata import MINIMUM_CONTEXT_LENGTH + _ctx = getattr(self.context_compressor, "context_length", 0) + if _ctx and _ctx < MINIMUM_CONTEXT_LENGTH: + raise ValueError( + f"Model {self.model} has a context window of {_ctx:,} tokens, " + f"which is below the minimum {MINIMUM_CONTEXT_LENGTH:,} required " + f"by Hermes Agent. Choose a model with at least " + f"{MINIMUM_CONTEXT_LENGTH // 1000}K context, or set " + f"model.context_length in config.yaml to override." + ) + # Inject context engine tool schemas (e.g. lcm_grep, lcm_describe, lcm_expand) self._context_engine_tool_names: set = set() if hasattr(self, "context_compressor") and self.context_compressor and self.tools is not None: @@ -3190,7 +3182,7 @@ class AIAgent: if platform_key in PLATFORM_HINTS: prompt_parts.append(PLATFORM_HINTS[platform_key]) - return "\n\n".join(prompt_parts) + return "\n\n".join(p.strip() for p in prompt_parts if p.strip()) # ========================================================================= # Pre/post-call guardrails (inspired by PR #1321 — @alireza78a) @@ -4730,6 +4722,49 @@ class AIAgent: # ── Unified streaming API call ───────────────────────────────────────── + def _reset_stream_delivery_tracking(self) -> None: + """Reset tracking for text delivered during the current model response.""" + self._current_streamed_assistant_text = "" + + def _record_streamed_assistant_text(self, text: str) -> None: + """Accumulate visible assistant text emitted through stream callbacks.""" + if isinstance(text, str) and text: + self._current_streamed_assistant_text = ( + getattr(self, "_current_streamed_assistant_text", "") + text + ) + + @staticmethod + def _normalize_interim_visible_text(text: str) -> str: + if not isinstance(text, str): + return "" + return re.sub(r"\s+", " ", text).strip() + + def _interim_content_was_streamed(self, content: str) -> bool: + visible_content = self._normalize_interim_visible_text( + self._strip_think_blocks(content or "") + ) + if not visible_content: + return False + streamed = self._normalize_interim_visible_text( + self._strip_think_blocks(getattr(self, "_current_streamed_assistant_text", "") or "") + ) + return bool(streamed) and streamed == visible_content + + def _emit_interim_assistant_message(self, assistant_msg: Dict[str, Any]) -> None: + """Surface a real mid-turn assistant commentary message to the UI layer.""" + cb = getattr(self, "interim_assistant_callback", None) + if cb is None or not isinstance(assistant_msg, dict): + return + content = assistant_msg.get("content") + visible = self._strip_think_blocks(content or "").strip() + if not visible or visible == "(empty)": + return + already_streamed = self._interim_content_was_streamed(visible) + try: + cb(visible, already_streamed=already_streamed) + except Exception: + logger.debug("interim_assistant_callback error", exc_info=True) + def _fire_stream_delta(self, text: str) -> None: """Fire all registered stream delta callbacks (display + TTS).""" # If a tool iteration set the break flag, prepend a single paragraph @@ -4739,12 +4774,16 @@ class AIAgent: if getattr(self, "_stream_needs_break", False) and text and text.strip(): self._stream_needs_break = False text = "\n\n" + text - for cb in (self.stream_delta_callback, self._stream_callback): - if cb is not None: - try: - cb(text) - except Exception: - pass + callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None] + delivered = False + for cb in callbacks: + try: + cb(text) + delivered = True + except Exception: + pass + if delivered: + self._record_streamed_assistant_text(text) def _fire_reasoning_delta(self, text: str) -> None: """Fire reasoning callback if registered.""" @@ -4928,6 +4967,7 @@ class AIAgent: if self.stream_delta_callback: try: self.stream_delta_callback(delta.content) + self._record_streamed_assistant_text(delta.content) except Exception: pass @@ -6919,24 +6959,6 @@ class AIAgent: turn_tool_msgs = messages[-num_tools:] enforce_turn_budget(turn_tool_msgs, env=get_active_env(effective_task_id)) - # ── Budget pressure injection ──────────────────────────────────── - budget_warning = self._get_budget_warning(api_call_count) - if budget_warning and messages and messages[-1].get("role") == "tool": - last_content = messages[-1]["content"] - try: - parsed = json.loads(last_content) - if isinstance(parsed, dict): - parsed["_budget_warning"] = budget_warning - messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False) - else: - messages[-1]["content"] = last_content + f"\n\n{budget_warning}" - except (json.JSONDecodeError, TypeError): - messages[-1]["content"] = last_content + f"\n\n{budget_warning}" - if not self.quiet_mode: - remaining = self.max_iterations - api_call_count - tier = "⚠️ WARNING" if remaining <= self.max_iterations * 0.1 else "💡 CAUTION" - print(f"{self.log_prefix}{tier}: {remaining} iterations remaining") - def _execute_tool_calls_sequential(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None: """Execute tool calls sequentially (original behavior). Used for single calls or interactive tools.""" for i, tool_call in enumerate(assistant_message.tool_calls, 1): @@ -6985,6 +7007,15 @@ class AIAgent: self._current_tool = function_name self._touch_activity(f"executing tool: {function_name}") + # Set activity callback for long-running tool execution (terminal + # commands, etc.) so the gateway's inactivity monitor doesn't kill + # the agent while a command is running. + try: + from tools.environments.base import set_activity_callback + set_activity_callback(self._touch_activity) + except Exception: + pass + if self.tool_progress_callback: try: preview = _build_tool_preview(function_name, function_args) @@ -7274,50 +7305,7 @@ class AIAgent: if num_tools_seq > 0: enforce_turn_budget(messages[-num_tools_seq:], env=get_active_env(effective_task_id)) - # ── Budget pressure injection ───────────────────────────────── - # After all tool calls in this turn are processed, check if we're - # approaching max_iterations. If so, inject a warning into the LAST - # tool result's JSON so the LLM sees it naturally when reading results. - budget_warning = self._get_budget_warning(api_call_count) - if budget_warning and messages and messages[-1].get("role") == "tool": - last_content = messages[-1]["content"] - try: - parsed = json.loads(last_content) - if isinstance(parsed, dict): - parsed["_budget_warning"] = budget_warning - messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False) - else: - messages[-1]["content"] = last_content + f"\n\n{budget_warning}" - except (json.JSONDecodeError, TypeError): - messages[-1]["content"] = last_content + f"\n\n{budget_warning}" - if not self.quiet_mode: - remaining = self.max_iterations - api_call_count - tier = "⚠️ WARNING" if remaining <= self.max_iterations * 0.1 else "💡 CAUTION" - print(f"{self.log_prefix}{tier}: {remaining} iterations remaining") - def _get_budget_warning(self, api_call_count: int) -> Optional[str]: - """Return a budget pressure string, or None if not yet needed. - - Two-tier system: - - Caution (70%): nudge to consolidate work - - Warning (90%): urgent, must respond now - """ - if not self._budget_pressure_enabled or self.max_iterations <= 0: - return None - progress = api_call_count / self.max_iterations - remaining = self.max_iterations - api_call_count - if progress >= self._budget_warning_threshold: - return ( - f"[BUDGET WARNING: Iteration {api_call_count}/{self.max_iterations}. " - f"Only {remaining} iteration(s) left. " - "Provide your final response NOW. No more tool calls unless absolutely critical.]" - ) - if progress >= self._budget_caution_threshold: - return ( - f"[BUDGET: Iteration {api_call_count}/{self.max_iterations}. " - f"{remaining} iterations left. Start consolidating your work.]" - ) - return None def _emit_context_pressure(self, compaction_progress: float, compressor) -> None: """Notify the user that context is approaching the compaction threshold. @@ -7610,14 +7598,6 @@ class AIAgent: # Initialize conversation (copy to avoid mutating the caller's list) messages = list(conversation_history) if conversation_history else [] - # Strip budget pressure warnings from previous turns. These are - # turn-scoped signals injected by _get_budget_warning() into tool - # result content. If left in the replayed history, models (especially - # GPT-family) interpret them as still-active instructions and avoid - # making tool calls in ALL subsequent turns. - if messages: - _strip_budget_warnings_from_history(messages) - # Hydrate todo store from conversation history (gateway creates a fresh # AIAgent per message, so the in-memory store is empty -- we need to # recover the todo state from the most recent todo tool response in history) @@ -7834,7 +7814,7 @@ class AIAgent: except Exception: pass - while api_call_count < self.max_iterations and self.iteration_budget.remaining > 0: + while (api_call_count < self.max_iterations and self.iteration_budget.remaining > 0) or self._budget_grace_call: # Reset per-turn checkpoint dedup so each iteration can take one snapshot self._checkpoint_mgr.new_turn() @@ -7849,7 +7829,13 @@ class AIAgent: api_call_count += 1 self._api_call_count = api_call_count self._touch_activity(f"starting API call #{api_call_count}") - if not self.iteration_budget.consume(): + + # Grace call: the budget is exhausted but we gave the model one + # more chance. Consume the grace flag so the loop exits after + # this iteration regardless of outcome. + if self._budget_grace_call: + self._budget_grace_call = False + elif not self.iteration_budget.consume(): _turn_exit_reason = "budget_exhausted" if not self.quiet_mode: self._safe_print(f"\n⚠️ Iteration budget exhausted ({self.iteration_budget.used}/{self.iteration_budget.max_total} iterations used)") @@ -7976,9 +7962,39 @@ class AIAgent: # manual message manipulation are always caught. api_messages = self._sanitize_api_messages(api_messages) + # Normalize message whitespace and tool-call JSON for consistent + # prefix matching. Ensures bit-perfect prefixes across turns, + # which enables KV cache reuse on local inference servers + # (llama.cpp, vLLM, Ollama) and improves cache hit rates for + # cloud providers. Operates on api_messages (the API copy) so + # the original conversation history in `messages` is untouched. + for am in api_messages: + if isinstance(am.get("content"), str): + am["content"] = am["content"].strip() + for am in api_messages: + tcs = am.get("tool_calls") + if not tcs: + continue + new_tcs = [] + for tc in tcs: + if isinstance(tc, dict) and "function" in tc: + try: + args_obj = json.loads(tc["function"]["arguments"]) + tc = {**tc, "function": { + **tc["function"], + "arguments": json.dumps( + args_obj, separators=(",", ":"), + sort_keys=True, + ), + }} + except Exception: + pass + new_tcs.append(tc) + am["tool_calls"] = new_tcs + # Calculate approximate request size for logging total_chars = sum(len(str(msg)) for msg in api_messages) - approx_tokens = total_chars // 4 # Rough estimate: 4 chars per token + approx_tokens = estimate_messages_tokens_rough(api_messages) # Thinking spinner for quiet mode (animated during API call) thinking_spinner = None @@ -8027,6 +8043,7 @@ class AIAgent: while retry_count < max_retries: try: + self._reset_stream_delivery_tracking() api_kwargs = self._build_api_kwargs(api_messages) if self.api_mode == "codex_responses": api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False) @@ -9441,6 +9458,7 @@ class AIAgent: ) if not duplicate_interim: messages.append(interim_msg) + self._emit_interim_assistant_message(interim_msg) if self._codex_incomplete_retries < 3: if not self.quiet_mode: @@ -9660,6 +9678,7 @@ class AIAgent: messages.pop() messages.append(assistant_msg) + self._emit_interim_assistant_message(assistant_msg) # Close any open streaming display (response box, reasoning # box) before tool execution begins. Intermediate turns may @@ -9938,6 +9957,7 @@ class AIAgent: codex_ack_continuations += 1 interim_msg = self._build_assistant_message(assistant_message, "incomplete") messages.append(interim_msg) + self._emit_interim_assistant_message(interim_msg) continue_msg = { "role": "user", @@ -10034,7 +10054,31 @@ class AIAgent: if final_response is None and ( api_call_count >= self.max_iterations or self.iteration_budget.remaining <= 0 - ): + ) and not self._budget_exhausted_injected: + # Budget exhausted but we haven't tried asking the model to + # summarise yet. Inject a user message and give it one grace + # API call to produce a text response. + self._budget_exhausted_injected = True + self._budget_grace_call = True + _grace_msg = ( + "Your tool budget ran out. Please give me the information " + "or actions you've completed so far." + ) + messages.append({"role": "user", "content": _grace_msg}) + self._emit_status( + f"⚠️ Iteration budget exhausted ({api_call_count}/{self.max_iterations}) " + "— asking model to summarise" + ) + if not self.quiet_mode: + self._safe_print( + f"\n⚠️ Iteration budget exhausted ({api_call_count}/{self.max_iterations}) " + "— requesting summary..." + ) + + if final_response is None and ( + api_call_count >= self.max_iterations + or self.iteration_budget.remaining <= 0 + ) and not self._budget_grace_call: _turn_exit_reason = f"max_iterations_reached({api_call_count}/{self.max_iterations})" if self.iteration_budget.remaining <= 0 and not self.quiet_mode: print(f"\n⚠️ Iteration budget exhausted ({self.iteration_budget.used}/{self.iteration_budget.max_total} iterations used)") diff --git a/tests/agent/test_context_compressor.py b/tests/agent/test_context_compressor.py index 88a23b44cf..f4cf19666f 100644 --- a/tests/agent/test_context_compressor.py +++ b/tests/agent/test_context_compressor.py @@ -576,11 +576,19 @@ class TestSummaryTargetRatio: assert c.summary_target_ratio == 0.80 def test_default_threshold_is_50_percent(self): - """Default compression threshold should be 50%.""" + """Default compression threshold should be 50%, with a 64K floor.""" with patch("agent.context_compressor.get_model_context_length", return_value=100_000): c = ContextCompressor(model="test", quiet_mode=True) assert c.threshold_percent == 0.50 - assert c.threshold_tokens == 50_000 + # 50% of 100K = 50K, but the floor is 64K + assert c.threshold_tokens == 64_000 + + def test_threshold_floor_does_not_apply_above_128k(self): + """On large-context models the 50% percentage is used directly.""" + with patch("agent.context_compressor.get_model_context_length", return_value=200_000): + c = ContextCompressor(model="test", quiet_mode=True) + # 50% of 200K = 100K, which is above the 64K floor + assert c.threshold_tokens == 100_000 def test_default_protect_last_n_is_20(self): """Default protect_last_n should be 20.""" diff --git a/tests/agent/test_model_metadata.py b/tests/agent/test_model_metadata.py index 1eac37e20f..df680fb241 100644 --- a/tests/agent/test_model_metadata.py +++ b/tests/agent/test_model_metadata.py @@ -50,7 +50,8 @@ class TestEstimateTokensRough: assert estimate_tokens_rough("a" * 400) == 100 def test_short_text(self): - assert estimate_tokens_rough("hello") == 1 + # "hello" = 5 chars → ceil(5/4) = 2 + assert estimate_tokens_rough("hello") == 2 def test_proportional(self): short = estimate_tokens_rough("hello world") @@ -68,10 +69,11 @@ class TestEstimateMessagesTokensRough: assert estimate_messages_tokens_rough([]) == 0 def test_single_message_concrete_value(self): - """Verify against known str(msg) length.""" + """Verify against known str(msg) length (ceiling division).""" msg = {"role": "user", "content": "a" * 400} result = estimate_messages_tokens_rough([msg]) - expected = len(str(msg)) // 4 + n = len(str(msg)) + expected = (n + 3) // 4 assert result == expected def test_multiple_messages_additive(self): @@ -80,7 +82,8 @@ class TestEstimateMessagesTokensRough: {"role": "assistant", "content": "Hi there, how can I help?"}, ] result = estimate_messages_tokens_rough(msgs) - expected = sum(len(str(m)) for m in msgs) // 4 + n = sum(len(str(m)) for m in msgs) + expected = (n + 3) // 4 assert result == expected def test_tool_call_message(self): @@ -89,7 +92,7 @@ class TestEstimateMessagesTokensRough: "tool_calls": [{"id": "1", "function": {"name": "terminal", "arguments": "{}"}}]} result = estimate_messages_tokens_rough([msg]) assert result > 0 - assert result == len(str(msg)) // 4 + assert result == (len(str(msg)) + 3) // 4 def test_message_with_list_content(self): """Vision messages with multimodal content arrays.""" @@ -98,7 +101,7 @@ class TestEstimateMessagesTokensRough: {"type": "image_url", "image_url": {"url": "data:image/png;base64,AAAA"}} ]} result = estimate_messages_tokens_rough([msg]) - assert result == len(str(msg)) // 4 + assert result == (len(str(msg)) + 3) // 4 # ========================================================================= diff --git a/tests/agent/test_prompt_builder.py b/tests/agent/test_prompt_builder.py index 3b6a4c3ec1..1f2f6ada77 100644 --- a/tests/agent/test_prompt_builder.py +++ b/tests/agent/test_prompt_builder.py @@ -1009,65 +1009,4 @@ class TestOpenAIModelExecutionGuidance: # ========================================================================= -class TestStripBudgetWarningsFromHistory: - def test_strips_json_budget_warning_key(self): - import json - from run_agent import _strip_budget_warnings_from_history - messages = [ - {"role": "tool", "tool_call_id": "c1", "content": json.dumps({ - "output": "hello", - "exit_code": 0, - "_budget_warning": "[BUDGET: Iteration 55/60. 5 iterations left. Start consolidating your work.]", - })}, - ] - _strip_budget_warnings_from_history(messages) - parsed = json.loads(messages[0]["content"]) - assert "_budget_warning" not in parsed - assert parsed["output"] == "hello" - assert parsed["exit_code"] == 0 - - def test_strips_text_budget_warning(self): - from run_agent import _strip_budget_warnings_from_history - - messages = [ - {"role": "tool", "tool_call_id": "c1", - "content": "some result\n\n[BUDGET WARNING: Iteration 58/60. Only 2 iteration(s) left. Provide your final response NOW. No more tool calls unless absolutely critical.]"}, - ] - _strip_budget_warnings_from_history(messages) - assert messages[0]["content"] == "some result" - - def test_leaves_non_tool_messages_unchanged(self): - from run_agent import _strip_budget_warnings_from_history - - messages = [ - {"role": "assistant", "content": "[BUDGET WARNING: Iteration 58/60. Only 2 iteration(s) left. Provide your final response NOW. No more tool calls unless absolutely critical.]"}, - {"role": "user", "content": "hello"}, - ] - original_contents = [m["content"] for m in messages] - _strip_budget_warnings_from_history(messages) - assert [m["content"] for m in messages] == original_contents - - def test_handles_empty_and_missing_content(self): - from run_agent import _strip_budget_warnings_from_history - - messages = [ - {"role": "tool", "tool_call_id": "c1", "content": ""}, - {"role": "tool", "tool_call_id": "c2"}, - ] - _strip_budget_warnings_from_history(messages) - assert messages[0]["content"] == "" - - def test_strips_caution_variant(self): - import json - from run_agent import _strip_budget_warnings_from_history - - messages = [ - {"role": "tool", "tool_call_id": "c1", "content": json.dumps({ - "output": "ok", - "_budget_warning": "[BUDGET: Iteration 42/60. 18 iterations left. Start consolidating your work.]", - })}, - ] - _strip_budget_warnings_from_history(messages) - parsed = json.loads(messages[0]["content"]) - assert "_budget_warning" not in parsed diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index c28317d7e4..6b1d46567d 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -8,8 +8,8 @@ from types import SimpleNamespace import pytest -from gateway.config import Platform, PlatformConfig -from gateway.platforms.base import BasePlatformAdapter, SendResult +from gateway.config import Platform, PlatformConfig, StreamingConfig +from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType, SendResult from gateway.session import SessionSource @@ -104,6 +104,11 @@ def _make_runner(adapter): runner._session_db = None runner._running_agents = {} runner.hooks = SimpleNamespace(loaded_hooks=False) + runner.config = SimpleNamespace( + thread_sessions_per_user=False, + group_sessions_per_user=False, + stt_enabled=False, + ) return runner @@ -118,6 +123,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa fake_run_agent = types.ModuleType("run_agent") fake_run_agent.AIAgent = FakeAgent monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + import tools.terminal_tool # noqa: F401 - register terminal emoji for this fake-agent test adapter = ProgressCaptureAdapter() runner = _make_runner(adapter) @@ -144,7 +150,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa assert adapter.sent == [ { "chat_id": "-1001", - "content": '⚙️ terminal: "pwd"', + "content": '💻 terminal: "pwd"', "reply_to": None, "metadata": {"thread_id": "17585"}, } @@ -334,3 +340,238 @@ def test_all_mode_no_truncation_when_preview_fits(monkeypatch, tmp_path): content = adapter.sent[0]["content"] # With a 200-char cap, the 165-char command should NOT be truncated assert "..." not in content, f"Preview was truncated when it shouldn't be: {content}" + + +class CommentaryAgent: + def __init__(self, **kwargs): + self.tool_progress_callback = kwargs.get("tool_progress_callback") + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.stream_delta_callback = kwargs.get("stream_delta_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.interim_assistant_callback: + self.interim_assistant_callback("I'll inspect the repo first.", already_streamed=False) + time.sleep(0.1) + if self.stream_delta_callback: + self.stream_delta_callback("done") + return { + "final_response": "done", + "messages": [], + "api_calls": 1, + } + + +class PreviewedResponseAgent: + def __init__(self, **kwargs): + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.interim_assistant_callback: + self.interim_assistant_callback("You're welcome.", already_streamed=False) + return { + "final_response": "You're welcome.", + "response_previewed": True, + "messages": [], + "api_calls": 1, + } + + +class QueuedCommentaryAgent: + calls = 0 + + def __init__(self, **kwargs): + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + type(self).calls += 1 + if type(self).calls == 1 and self.interim_assistant_callback: + self.interim_assistant_callback("I'll inspect the repo first.", already_streamed=False) + return { + "final_response": f"final response {type(self).calls}", + "messages": [], + "api_calls": 1, + } + + +async def _run_with_agent( + monkeypatch, + tmp_path, + agent_cls, + *, + session_id, + pending_text=None, + config_data=None, +): + if config_data: + import yaml + + (tmp_path / "config.yaml").write_text(yaml.dump(config_data), encoding="utf-8") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = agent_cls + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = ProgressCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + if config_data and "streaming" in config_data: + runner.config.streaming = StreamingConfig.from_dict(config_data["streaming"]) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + ) + session_key = "agent:main:telegram:group:-1001:17585" + if pending_text is not None: + adapter._pending_messages[session_key] = MessageEvent( + text=pending_text, + message_type=MessageType.TEXT, + source=source, + message_id="queued-1", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id=session_id, + session_key=session_key, + ) + return adapter, result + + +@pytest.mark.asyncio +async def test_run_agent_surfaces_real_interim_commentary(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + assert result.get("already_sent") is not True + assert any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_surfaces_interim_commentary_by_default(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-default-on", + ) + + assert any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_suppresses_interim_commentary_when_disabled(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-disabled", + config_data={"display": {"interim_assistant_messages": False}}, + ) + + assert result.get("already_sent") is not True + assert not any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_tool_progress_does_not_control_interim_commentary(monkeypatch, tmp_path): + """tool_progress=all with interim_assistant_messages=false should not surface commentary.""" + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-tool-progress", + config_data={"display": {"tool_progress": "all", "interim_assistant_messages": False}}, + ) + + assert result.get("already_sent") is not True + assert not any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_streaming_does_not_enable_completed_interim_commentary( + monkeypatch, tmp_path +): + """Streaming alone with interim_assistant_messages=false should not surface commentary.""" + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-streaming", + config_data={ + "display": {"tool_progress": "off", "interim_assistant_messages": False}, + "streaming": {"enabled": True}, + }, + ) + + assert result.get("already_sent") is True + assert not any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_interim_commentary_works_with_tool_progress_off(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-explicit-on", + config_data={ + "display": { + "tool_progress": "off", + "interim_assistant_messages": True, + }, + }, + ) + + assert result.get("already_sent") is not True + assert any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_previewed_final_marks_already_sent(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + PreviewedResponseAgent, + session_id="sess-previewed", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + assert result.get("already_sent") is True + assert [call["content"] for call in adapter.sent] == ["You're welcome."] + + +@pytest.mark.asyncio +async def test_run_agent_queued_message_does_not_treat_commentary_as_final(monkeypatch, tmp_path): + QueuedCommentaryAgent.calls = 0 + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + QueuedCommentaryAgent, + session_id="sess-queued-commentary", + pending_text="queued follow-up", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + sent_texts = [call["content"] for call in adapter.sent] + assert result["final_response"] == "final response 2" + assert "I'll inspect the repo first." in sent_texts + assert "final response 1" in sent_texts diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 5cebb20eee..8f7fb6dd5d 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -505,3 +505,81 @@ class TestSegmentBreakOnToolBoundary: assert len(sent_texts) == 3 assert sent_texts[0].startswith(prefix) assert sum(len(t) for t in sent_texts[1:]) == len(tail) + + +class TestInterimCommentaryMessages: + @pytest.mark.asyncio + async def test_commentary_message_stays_separate_from_final_stream(self): + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=[ + SimpleNamespace(success=True, message_id="msg_1"), + SimpleNamespace(success=True, message_id="msg_2"), + ]) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + consumer.on_commentary("I'll inspect the repository first.") + consumer.on_delta("Done.") + consumer.finish() + + await consumer.run() + + sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] + assert sent_texts == ["I'll inspect the repository first.", "Done."] + assert consumer.final_response_sent is True + + @pytest.mark.asyncio + async def test_failed_final_send_does_not_mark_final_response_sent(self): + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=False, message_id=None)) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + consumer.on_delta("Done.") + consumer.finish() + + await consumer.run() + + assert consumer.final_response_sent is False + assert consumer.already_sent is False + + @pytest.mark.asyncio + async def test_success_without_message_id_marks_visible_and_sends_only_tail(self): + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=[ + SimpleNamespace(success=True, message_id=None), + SimpleNamespace(success=True, message_id=None), + ]) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉"), + ) + + consumer.on_delta("Hello") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + consumer.on_delta(" world") + await asyncio.sleep(0.08) + consumer.finish() + await task + + sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] + assert sent_texts == ["Hello ▉", "world"] + assert consumer.already_sent is True + assert consumer.final_response_sent is True diff --git a/tests/hermes_cli/test_config.py b/tests/hermes_cli/test_config.py index 1c245577e9..2bb63767ac 100644 --- a/tests/hermes_cli/test_config.py +++ b/tests/hermes_cli/test_config.py @@ -68,6 +68,7 @@ class TestLoadConfigDefaults: assert "max_turns" not in config assert "terminal" in config assert config["terminal"]["backend"] == "local" + assert config["display"]["interim_assistant_messages"] is True def test_legacy_root_level_max_turns_migrates_to_agent_config(self, tmp_path): with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}): @@ -421,3 +422,25 @@ class TestAnthropicTokenMigration: }): migrate_config(interactive=False, quiet=True) assert load_env().get("ANTHROPIC_TOKEN") == "current-token" + + +class TestInterimAssistantMessageConfig: + """Test the explicit gateway interim-message config gate.""" + + def test_default_config_enables_interim_assistant_messages(self): + assert DEFAULT_CONFIG["display"]["interim_assistant_messages"] is True + + def test_migrate_to_v15_adds_interim_assistant_message_gate(self, tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump({"_config_version": 14, "display": {"tool_progress": "off"}}), + encoding="utf-8", + ) + + with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}): + migrate_config(interactive=False, quiet=True) + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) + + assert raw["_config_version"] == 15 + assert raw["display"]["tool_progress"] == "off" + assert raw["display"]["interim_assistant_messages"] is True diff --git a/tests/hermes_cli/test_container_aware_cli.py b/tests/hermes_cli/test_container_aware_cli.py new file mode 100644 index 0000000000..9e21c0b8d2 --- /dev/null +++ b/tests/hermes_cli/test_container_aware_cli.py @@ -0,0 +1,342 @@ +"""Tests for container-aware CLI routing (NixOS container mode). + +When container.enable = true in the NixOS module, the activation script +writes a .container-mode metadata file. The host CLI detects this and +execs into the container instead of running locally. +""" +import os +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from hermes_cli.config import ( + _is_inside_container, + get_container_exec_info, +) + + +# ============================================================================= +# _is_inside_container +# ============================================================================= + + +def test_is_inside_container_dockerenv(): + """Detects /.dockerenv marker file.""" + with patch("os.path.exists") as mock_exists: + mock_exists.side_effect = lambda p: p == "/.dockerenv" + assert _is_inside_container() is True + + +def test_is_inside_container_containerenv(): + """Detects Podman's /run/.containerenv marker.""" + with patch("os.path.exists") as mock_exists: + mock_exists.side_effect = lambda p: p == "/run/.containerenv" + assert _is_inside_container() is True + + +def test_is_inside_container_cgroup_docker(): + """Detects 'docker' in /proc/1/cgroup.""" + with patch("os.path.exists", return_value=False), \ + patch("builtins.open", create=True) as mock_open: + mock_open.return_value.__enter__ = lambda s: s + mock_open.return_value.__exit__ = MagicMock(return_value=False) + mock_open.return_value.read = MagicMock( + return_value="12:memory:/docker/abc123\n" + ) + assert _is_inside_container() is True + + +def test_is_inside_container_false_on_host(): + """Returns False when none of the container indicators are present.""" + with patch("os.path.exists", return_value=False), \ + patch("builtins.open", side_effect=OSError("no such file")): + assert _is_inside_container() is False + + +# ============================================================================= +# get_container_exec_info +# ============================================================================= + + +@pytest.fixture +def container_env(tmp_path, monkeypatch): + """Set up a fake HERMES_HOME with .container-mode file.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.delenv("HERMES_DEV", raising=False) + + container_mode = hermes_home / ".container-mode" + container_mode.write_text( + "# Written by NixOS activation script. Do not edit manually.\n" + "backend=podman\n" + "container_name=hermes-agent\n" + "exec_user=hermes\n" + "hermes_bin=/data/current-package/bin/hermes\n" + ) + return hermes_home + + +def test_get_container_exec_info_returns_metadata(container_env): + """Reads .container-mode and returns all fields including exec_user.""" + with patch("hermes_cli.config._is_inside_container", return_value=False): + info = get_container_exec_info() + + assert info is not None + assert info["backend"] == "podman" + assert info["container_name"] == "hermes-agent" + assert info["exec_user"] == "hermes" + assert info["hermes_bin"] == "/data/current-package/bin/hermes" + + +def test_get_container_exec_info_none_inside_container(container_env): + """Returns None when we're already inside a container.""" + with patch("hermes_cli.config._is_inside_container", return_value=True): + info = get_container_exec_info() + + assert info is None + + +def test_get_container_exec_info_none_without_file(tmp_path, monkeypatch): + """Returns None when .container-mode doesn't exist (native mode).""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.delenv("HERMES_DEV", raising=False) + + with patch("hermes_cli.config._is_inside_container", return_value=False): + info = get_container_exec_info() + + assert info is None + + +def test_get_container_exec_info_skipped_when_hermes_dev(container_env, monkeypatch): + """Returns None when HERMES_DEV=1 is set (dev mode bypass).""" + monkeypatch.setenv("HERMES_DEV", "1") + + with patch("hermes_cli.config._is_inside_container", return_value=False): + info = get_container_exec_info() + + assert info is None + + +def test_get_container_exec_info_not_skipped_when_hermes_dev_zero(container_env, monkeypatch): + """HERMES_DEV=0 does NOT trigger bypass — only '1' does.""" + monkeypatch.setenv("HERMES_DEV", "0") + + with patch("hermes_cli.config._is_inside_container", return_value=False): + info = get_container_exec_info() + + assert info is not None + + +def test_get_container_exec_info_defaults(): + """Falls back to defaults for missing keys.""" + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + hermes_home = Path(tmpdir) / ".hermes" + hermes_home.mkdir() + (hermes_home / ".container-mode").write_text( + "# minimal file with no keys\n" + ) + + with patch("hermes_cli.config._is_inside_container", return_value=False), \ + patch("hermes_cli.config.get_hermes_home", return_value=hermes_home), \ + patch.dict(os.environ, {}, clear=False): + os.environ.pop("HERMES_DEV", None) + info = get_container_exec_info() + + assert info is not None + assert info["backend"] == "docker" + assert info["container_name"] == "hermes-agent" + assert info["exec_user"] == "hermes" + assert info["hermes_bin"] == "/data/current-package/bin/hermes" + + +def test_get_container_exec_info_docker_backend(container_env): + """Correctly reads docker backend with custom exec_user.""" + (container_env / ".container-mode").write_text( + "backend=docker\n" + "container_name=hermes-custom\n" + "exec_user=myuser\n" + "hermes_bin=/opt/hermes/bin/hermes\n" + ) + + with patch("hermes_cli.config._is_inside_container", return_value=False): + info = get_container_exec_info() + + assert info["backend"] == "docker" + assert info["container_name"] == "hermes-custom" + assert info["exec_user"] == "myuser" + assert info["hermes_bin"] == "/opt/hermes/bin/hermes" + + +def test_get_container_exec_info_crashes_on_permission_error(container_env): + """PermissionError propagates instead of being silently swallowed.""" + with patch("hermes_cli.config._is_inside_container", return_value=False), \ + patch("builtins.open", side_effect=PermissionError("permission denied")): + with pytest.raises(PermissionError): + get_container_exec_info() + + +# ============================================================================= +# _exec_in_container +# ============================================================================= + + +@pytest.fixture +def docker_container_info(): + return { + "backend": "docker", + "container_name": "hermes-agent", + "exec_user": "hermes", + "hermes_bin": "/data/current-package/bin/hermes", + } + + +@pytest.fixture +def podman_container_info(): + return { + "backend": "podman", + "container_name": "hermes-agent", + "exec_user": "hermes", + "hermes_bin": "/data/current-package/bin/hermes", + } + + +def test_exec_in_container_calls_execvp(docker_container_info): + """Verifies os.execvp is called with correct args: runtime, tty flags, + user, env vars, container name, binary, and CLI args.""" + from hermes_cli.main import _exec_in_container + + with patch("shutil.which", return_value="/usr/bin/docker"), \ + patch("subprocess.run") as mock_run, \ + patch("sys.stdin") as mock_stdin, \ + patch("os.execvp") as mock_execvp, \ + patch.dict(os.environ, {"TERM": "xterm-256color", "LANG": "en_US.UTF-8"}, + clear=False): + mock_stdin.isatty.return_value = True + mock_run.return_value = MagicMock(returncode=0) + + _exec_in_container(docker_container_info, ["chat", "-m", "opus"]) + + mock_execvp.assert_called_once() + cmd = mock_execvp.call_args[0][1] + assert cmd[0] == "/usr/bin/docker" + assert cmd[1] == "exec" + assert "-it" in cmd + idx_u = cmd.index("-u") + assert cmd[idx_u + 1] == "hermes" + e_indices = [i for i, v in enumerate(cmd) if v == "-e"] + e_values = [cmd[i + 1] for i in e_indices] + assert "TERM=xterm-256color" in e_values + assert "LANG=en_US.UTF-8" in e_values + assert "hermes-agent" in cmd + assert "/data/current-package/bin/hermes" in cmd + assert "chat" in cmd + + +def test_exec_in_container_non_tty_uses_i_only(docker_container_info): + """Non-TTY mode uses -i instead of -it.""" + from hermes_cli.main import _exec_in_container + + with patch("shutil.which", return_value="/usr/bin/docker"), \ + patch("subprocess.run") as mock_run, \ + patch("sys.stdin") as mock_stdin, \ + patch("os.execvp") as mock_execvp: + mock_stdin.isatty.return_value = False + mock_run.return_value = MagicMock(returncode=0) + + _exec_in_container(docker_container_info, ["sessions", "list"]) + + cmd = mock_execvp.call_args[0][1] + assert "-i" in cmd + assert "-it" not in cmd + + +def test_exec_in_container_no_runtime_hard_fails(podman_container_info): + """Hard fails when runtime not found (no fallback).""" + from hermes_cli.main import _exec_in_container + + with patch("shutil.which", return_value=None), \ + patch("subprocess.run") as mock_run, \ + patch("os.execvp") as mock_execvp, \ + pytest.raises(SystemExit) as exc_info: + _exec_in_container(podman_container_info, ["chat"]) + + mock_run.assert_not_called() + mock_execvp.assert_not_called() + assert exc_info.value.code != 0 + + +def test_exec_in_container_sudo_probe_sets_prefix(podman_container_info): + """When first probe fails and sudo probe succeeds, execvp is called + with sudo -n prefix.""" + from hermes_cli.main import _exec_in_container + + def which_side_effect(name): + if name == "podman": + return "/usr/bin/podman" + if name == "sudo": + return "/usr/bin/sudo" + return None + + with patch("shutil.which", side_effect=which_side_effect), \ + patch("subprocess.run") as mock_run, \ + patch("sys.stdin") as mock_stdin, \ + patch("os.execvp") as mock_execvp: + mock_stdin.isatty.return_value = True + mock_run.side_effect = [ + MagicMock(returncode=1), # direct probe fails + MagicMock(returncode=0), # sudo probe succeeds + ] + + _exec_in_container(podman_container_info, ["chat"]) + + mock_execvp.assert_called_once() + cmd = mock_execvp.call_args[0][1] + assert cmd[0] == "/usr/bin/sudo" + assert cmd[1] == "-n" + assert cmd[2] == "/usr/bin/podman" + assert cmd[3] == "exec" + + +def test_exec_in_container_probe_timeout_prints_message(docker_container_info): + """TimeoutExpired from probe produces a human-readable error, not a + raw traceback.""" + from hermes_cli.main import _exec_in_container + + with patch("shutil.which", return_value="/usr/bin/docker"), \ + patch("subprocess.run", side_effect=subprocess.TimeoutExpired( + cmd=["docker", "inspect"], timeout=15)), \ + patch("os.execvp") as mock_execvp, \ + pytest.raises(SystemExit) as exc_info: + _exec_in_container(docker_container_info, ["chat"]) + + mock_execvp.assert_not_called() + assert exc_info.value.code == 1 + + +def test_exec_in_container_container_not_running_no_sudo(docker_container_info): + """When runtime exists but container not found and no sudo available, + prints helpful error about root containers.""" + from hermes_cli.main import _exec_in_container + + def which_side_effect(name): + if name == "docker": + return "/usr/bin/docker" + return None + + with patch("shutil.which", side_effect=which_side_effect), \ + patch("subprocess.run") as mock_run, \ + patch("os.execvp") as mock_execvp, \ + pytest.raises(SystemExit) as exc_info: + mock_run.return_value = MagicMock(returncode=1) + + _exec_in_container(docker_container_info, ["chat"]) + + mock_execvp.assert_not_called() + assert exc_info.value.code == 1 diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 61137fe90a..d716b59b27 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -2742,74 +2742,12 @@ class TestSystemPromptStability: assert "Hermes Agent" in agent._cached_system_prompt class TestBudgetPressure: - """Budget pressure warning system (issue #414).""" + """Budget exhaustion grace call system.""" - def test_no_warning_below_caution(self, agent): - agent.max_iterations = 60 - assert agent._get_budget_warning(30) is None - - def test_caution_at_70_percent(self, agent): - agent.max_iterations = 60 - msg = agent._get_budget_warning(42) - assert msg is not None - assert "[BUDGET:" in msg - assert "18 iterations left" in msg - - def test_warning_at_90_percent(self, agent): - agent.max_iterations = 60 - msg = agent._get_budget_warning(54) - assert "[BUDGET WARNING:" in msg - assert "Provide your final response NOW" in msg - - def test_last_iteration(self, agent): - agent.max_iterations = 60 - msg = agent._get_budget_warning(59) - assert "1 iteration(s) left" in msg - - def test_disabled(self, agent): - agent.max_iterations = 60 - agent._budget_pressure_enabled = False - assert agent._get_budget_warning(55) is None - - def test_zero_max_iterations(self, agent): - agent.max_iterations = 0 - assert agent._get_budget_warning(0) is None - - def test_injects_into_json_tool_result(self, agent): - """Warning should be injected as _budget_warning field in JSON tool results.""" - import json - agent.max_iterations = 10 - messages = [ - {"role": "tool", "content": json.dumps({"output": "done", "exit_code": 0}), "tool_call_id": "tc1"} - ] - warning = agent._get_budget_warning(9) - assert warning is not None - # Simulate the injection logic - last_content = messages[-1]["content"] - parsed = json.loads(last_content) - parsed["_budget_warning"] = warning - messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False) - result = json.loads(messages[-1]["content"]) - assert "_budget_warning" in result - assert "BUDGET WARNING" in result["_budget_warning"] - assert result["output"] == "done" # original content preserved - - def test_appends_to_non_json_tool_result(self, agent): - """Warning should be appended as text for non-JSON tool results.""" - agent.max_iterations = 10 - messages = [ - {"role": "tool", "content": "plain text result", "tool_call_id": "tc1"} - ] - warning = agent._get_budget_warning(9) - # Simulate injection logic for non-JSON - last_content = messages[-1]["content"] - try: - import json - json.loads(last_content) - except (json.JSONDecodeError, TypeError): - messages[-1]["content"] = last_content + f"\n\n{warning}" - assert "plain text result" in messages[-1]["content"] - assert "BUDGET WARNING" in messages[-1]["content"] + def test_grace_call_flags_initialized(self, agent): + """Agent should have budget grace call flags.""" + assert agent._budget_exhausted_injected is False + assert agent._budget_grace_call is False class TestSafeWriter: diff --git a/tests/run_agent/test_run_agent_codex_responses.py b/tests/run_agent/test_run_agent_codex_responses.py index 17a70624d8..533a85ac83 100644 --- a/tests/run_agent/test_run_agent_codex_responses.py +++ b/tests/run_agent/test_run_agent_codex_responses.py @@ -744,6 +744,44 @@ def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(mo assert "inspect the repository" in (assistant_message.content or "") +def test_interim_commentary_is_not_marked_already_streamed_without_callbacks(monkeypatch): + agent = _build_agent(monkeypatch) + observed = {} + + agent._fire_stream_delta("short version: yes") + agent.interim_assistant_callback = lambda text, *, already_streamed=False: observed.update( + {"text": text, "already_streamed": already_streamed} + ) + + agent._emit_interim_assistant_message({"role": "assistant", "content": "short version: yes"}) + + assert observed == { + "text": "short version: yes", + "already_streamed": False, + } + + +def test_interim_commentary_is_not_marked_already_streamed_when_stream_callback_fails(monkeypatch): + agent = _build_agent(monkeypatch) + observed = {} + + def failing_callback(_text): + raise RuntimeError("display failed") + + agent.stream_delta_callback = failing_callback + agent._fire_stream_delta("short version: yes") + agent.interim_assistant_callback = lambda text, *, already_streamed=False: observed.update( + {"text": text, "already_streamed": already_streamed} + ) + + agent._emit_interim_assistant_message({"role": "assistant", "content": "short version: yes"}) + + assert observed == { + "text": "short version: yes", + "already_streamed": False, + } + + def test_run_conversation_codex_continues_after_commentary_phase_message(monkeypatch): agent = _build_agent(monkeypatch) responses = [ diff --git a/tests/tools/test_browser_camofox_state.py b/tests/tools/test_browser_camofox_state.py index b1f128ccee..33a939f094 100644 --- a/tests/tools/test_browser_camofox_state.py +++ b/tests/tools/test_browser_camofox_state.py @@ -59,8 +59,9 @@ class TestCamofoxConfigDefaults: browser_cfg = DEFAULT_CONFIG["browser"] assert browser_cfg["camofox"]["managed_persistence"] is False - def test_config_version_unchanged(self): + def test_config_version_matches_current_schema(self): from hermes_cli.config import DEFAULT_CONFIG - # managed_persistence is auto-merged by _deep_merge, no version bump needed - assert DEFAULT_CONFIG["_config_version"] == 13 + # The current schema version is tracked globally; unrelated default + # options may bump it after browser defaults are added. + assert DEFAULT_CONFIG["_config_version"] == 15 diff --git a/tests/tools/test_todo_tool.py b/tests/tools/test_todo_tool.py index d4fd03bafe..6215078525 100644 --- a/tests/tools/test_todo_tool.py +++ b/tests/tools/test_todo_tool.py @@ -24,6 +24,18 @@ class TestWriteAndRead: items[0]["content"] = "MUTATED" assert store.read()[0]["content"] == "Task" + def test_write_deduplicates_duplicate_ids(self): + store = TodoStore() + result = store.write([ + {"id": "1", "content": "First version", "status": "pending"}, + {"id": "2", "content": "Other task", "status": "pending"}, + {"id": "1", "content": "Latest version", "status": "in_progress"}, + ]) + assert result == [ + {"id": "2", "content": "Other task", "status": "pending"}, + {"id": "1", "content": "Latest version", "status": "in_progress"}, + ] + class TestHasItems: def test_empty_store(self): diff --git a/tools/environments/base.py b/tools/environments/base.py index 1598c22110..19c3bf024e 100644 --- a/tools/environments/base.py +++ b/tools/environments/base.py @@ -23,6 +23,19 @@ from tools.interrupt import is_interrupted logger = logging.getLogger(__name__) +# Thread-local activity callback. The agent sets this before a tool call so +# long-running _wait_for_process loops can report liveness to the gateway. +_activity_callback_local = threading.local() + + +def set_activity_callback(cb: Callable[[str], None] | None) -> None: + """Register a callback that _wait_for_process fires periodically.""" + _activity_callback_local.callback = cb + + +def _get_activity_callback() -> Callable[[str], None] | None: + return getattr(_activity_callback_local, "callback", None) + def get_sandbox_dir() -> Path: """Return the host-side root for all sandbox storage (Docker workspaces, @@ -370,6 +383,10 @@ class BaseEnvironment(ABC): """Poll-based wait with interrupt checking and stdout draining. Shared across all backends — not overridden. + + Fires the ``activity_callback`` (if set on this instance) every 10s + while the process is running so the gateway's inactivity timeout + doesn't kill long-running commands. """ output_chunks: list[str] = [] @@ -388,6 +405,8 @@ class BaseEnvironment(ABC): drain_thread = threading.Thread(target=_drain, daemon=True) drain_thread.start() deadline = time.monotonic() + timeout + _last_activity_touch = time.monotonic() + _ACTIVITY_INTERVAL = 10.0 # seconds between activity touches while proc.poll() is None: if is_interrupted(): @@ -408,6 +427,17 @@ class BaseEnvironment(ABC): else timeout_msg.lstrip(), "returncode": 124, } + # Periodic activity touch so the gateway knows we're alive + _now = time.monotonic() + if _now - _last_activity_touch >= _ACTIVITY_INTERVAL: + _last_activity_touch = _now + _cb = _get_activity_callback() + if _cb: + try: + _elapsed = int(_now - (deadline - timeout)) + _cb(f"terminal command running ({_elapsed}s elapsed)") + except Exception: + pass time.sleep(0.2) drain_thread.join(timeout=5) diff --git a/tools/todo_tool.py b/tools/todo_tool.py index 9021fbc2d3..b0d38a2342 100644 --- a/tools/todo_tool.py +++ b/tools/todo_tool.py @@ -46,11 +46,11 @@ class TodoStore: """ if not merge: # Replace mode: new list entirely - self._items = [self._validate(t) for t in todos] + self._items = [self._validate(t) for t in self._dedupe_by_id(todos)] else: # Merge mode: update existing items by id, append new ones existing = {item["id"]: item for item in self._items} - for t in todos: + for t in self._dedupe_by_id(todos): item_id = str(t.get("id", "")).strip() if not item_id: continue # Can't merge without an id @@ -143,6 +143,15 @@ class TodoStore: return {"id": item_id, "content": content, "status": status} + @staticmethod + def _dedupe_by_id(todos: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Collapse duplicate ids, keeping the last occurrence in its position.""" + last_index: Dict[str, int] = {} + for i, item in enumerate(todos): + item_id = str(item.get("id", "")).strip() or "?" + last_index[item_id] = i + return [todos[i] for i in sorted(last_index.values())] + def todo_tool( todos: Optional[List[Dict[str, Any]]] = None, diff --git a/website/docs/getting-started/nix-setup.md b/website/docs/getting-started/nix-setup.md index 4db4939868..858315329b 100644 --- a/website/docs/getting-started/nix-setup.md +++ b/website/docs/getting-started/nix-setup.md @@ -122,6 +122,41 @@ services.hermes-agent.environmentFiles = [ "/var/lib/hermes/env" ]; Setting `addToSystemPackages = true` does two things: puts the `hermes` CLI on your system PATH **and** sets `HERMES_HOME` system-wide so the interactive CLI shares state (sessions, skills, cron) with the gateway service. Without it, running `hermes` in your shell creates a separate `~/.hermes/` directory. ::: +:::info Container-aware CLI +When `container.enable = true` and `addToSystemPackages = true`, **every** `hermes` command on the host automatically routes into the managed container. This means your interactive CLI session runs inside the same environment as the gateway service — with access to all container-installed packages and tools. + +- The routing is transparent: `hermes chat`, `hermes sessions list`, `hermes version`, etc. all exec into the container under the hood +- All CLI flags are forwarded as-is +- If the container isn't running, the CLI retries briefly (5s with a spinner for interactive use, 10s silently for scripts) then fails with a clear error — no silent fallback +- For developers working on the hermes codebase, set `HERMES_DEV=1` to bypass container routing and run the local checkout directly + +Set `container.hostUsers` to create a `~/.hermes` symlink to the service state directory, so the host CLI and the container share sessions, config, and memories: + +```nix +services.hermes-agent = { + container.enable = true; + container.hostUsers = [ "your-username" ]; + addToSystemPackages = true; +}; +``` + +Users listed in `hostUsers` are automatically added to the `hermes` group for file permission access. + +**Podman users:** The NixOS service runs the container as root. Docker users get access via the `docker` group socket, but Podman's rootful containers require sudo. Grant passwordless sudo for your container runtime: + +```nix +security.sudo.extraRules = [{ + users = [ "your-username" ]; + commands = [{ + command = "/run/current-system/sw/bin/podman"; + options = [ "NOPASSWD" ]; + }]; +}]; +``` + +The CLI auto-detects when sudo is needed and uses it transparently. Without this, you'll need to run `sudo hermes chat` manually. +::: + ### Verify It Works After `nixos-rebuild switch`, check that the service is running: @@ -246,6 +281,7 @@ Run `nix build .#configKeys && cat result` to see every leaf config key extracte container = { image = "ubuntu:24.04"; backend = "docker"; + hostUsers = [ "your-username" ]; extraVolumes = [ "/home/user/projects:/projects:rw" ]; extraOptions = [ "--gpus" "all" ]; }; @@ -285,6 +321,7 @@ Quick reference for the most common things Nix users want to customize: | Mount host directories into container | `container.extraVolumes` | `[ "/data:/data:rw" ]` | | Pass GPU access to container | `container.extraOptions` | `[ "--gpus" "all" ]` | | Use Podman instead of Docker | `container.backend` | `"podman"` | +| Share state between host CLI and container | `container.hostUsers` | `[ "sidbin" ]` | | Add tools to the service PATH (native only) | `extraPackages` | `[ pkgs.pandoc pkgs.imagemagick ]` | | Use a custom base image | `container.image` | `"ubuntu:24.04"` | | Override the hermes package | `package` | `inputs.hermes-agent.packages.${system}.default.override { ... }` | @@ -518,6 +555,7 @@ When container mode is enabled, hermes runs inside a persistent Ubuntu container Host Container ──── ───────── /nix/store/...-hermes-agent-0.1.0 ──► /nix/store/... (ro) +~/.hermes -> /var/lib/hermes/.hermes (symlink bridge, per hostUsers) /var/lib/hermes/ ──► /data/ (rw) ├── current-package -> /nix/store/... (symlink, updated each rebuild) ├── .gc-root -> /nix/store/... (prevents nix-collect-garbage) @@ -526,6 +564,7 @@ Host Container │ ├── .env (merged from environment + environmentFiles) │ ├── config.yaml (Nix-generated, deep-merged by activation) │ ├── .managed (marker file) + │ ├── .container-mode (routing metadata: backend, exec_user, etc.) │ ├── state.db, sessions/, memories/ (runtime state) │ └── mcp-tokens/ (OAuth tokens for MCP servers) ├── home/ ──► /home/hermes (rw) @@ -698,6 +737,7 @@ nix build .#checks.x86_64-linux.config-roundtrip # merge script preserves use | `container.image` | `str` | `"ubuntu:24.04"` | Base image (pulled at runtime) | | `container.extraVolumes` | `listOf str` | `[]` | Extra volume mounts (`host:container:mode`) | | `container.extraOptions` | `listOf str` | `[]` | Extra args passed to `docker create` | +| `container.hostUsers` | `listOf str` | `[]` | Interactive users who get a `~/.hermes` symlink to the service stateDir and are auto-added to the `hermes` group | --- @@ -818,3 +858,5 @@ nix-store --query --roots $(docker exec hermes-agent readlink /data/current-pack | `hermes version` shows old version | Container not restarted | `systemctl restart hermes-agent` | | Permission denied on `/var/lib/hermes` | State dir is `0750 hermes:hermes` | Use `docker exec` or `sudo -u hermes` | | `nix-collect-garbage` removed hermes | GC root missing | Restart the service (preStart recreates the GC root) | +| `no container with name or ID "hermes-agent"` (Podman) | Podman rootful container not visible to regular user | Add passwordless sudo for podman (see [Container-aware CLI](#container-aware-cli) section) | +| `unable to find user hermes` | Container still starting (entrypoint hasn't created user yet) | Wait a few seconds and retry — the CLI retries automatically | diff --git a/website/docs/getting-started/quickstart.md b/website/docs/getting-started/quickstart.md index bd26f1eebb..9646fbcc9f 100644 --- a/website/docs/getting-started/quickstart.md +++ b/website/docs/getting-started/quickstart.md @@ -64,6 +64,10 @@ hermes setup # Or configure everything at once | **Vercel AI Gateway** | Vercel AI Gateway routing | Set `AI_GATEWAY_API_KEY` | | **Custom Endpoint** | VLLM, SGLang, Ollama, or any OpenAI-compatible API | Set base URL + API key | +:::caution Minimum context: 64K tokens +Hermes Agent requires a model with at least **64,000 tokens** of context. Models with smaller windows cannot maintain enough working memory for multi-step tool-calling workflows and will be rejected at startup. Most hosted models (Claude, GPT, Gemini, Qwen, DeepSeek) meet this easily. If you're running a local model, set its context size to at least 64K (e.g. `--ctx-size 65536` for llama.cpp or `-c 65536` for Ollama). +::: + :::tip You can switch providers at any time with `hermes model` — no code changes, no lock-in. When configuring a custom endpoint, Hermes will prompt for the context window size and auto-detect it when possible. See [Context Length Detection](../integrations/providers.md#context-length-detection) for details. ::: diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index 7b735bbdee..9f7c9e2dd4 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -800,7 +800,7 @@ You can also change the reasoning effort at runtime with the `/reasoning` comman ## Tool-Use Enforcement -Some models (especially GPT-family) occasionally describe intended actions as text instead of making tool calls. Tool-use enforcement injects guidance that steers the model back to actually calling tools. +Some models occasionally describe intended actions as text instead of making tool calls ("I would run the tests..." instead of actually calling the terminal). Tool-use enforcement injects system prompt guidance that steers the model back to actually calling tools. ```yaml agent: @@ -809,12 +809,31 @@ agent: | Value | Behavior | |-------|----------| -| `"auto"` (default) | Enabled for GPT models (`gpt-`, `openai/gpt-`) and disabled for all others. | -| `true` | Always enabled for all models. | -| `false` | Always disabled. | -| `["gpt-", "o1-", "custom-model"]` | Enabled only for models whose name contains one of the listed substrings. | +| `"auto"` (default) | Enabled for models matching: `gpt`, `codex`, `gemini`, `gemma`, `grok`. Disabled for all others (Claude, DeepSeek, Qwen, etc.). | +| `true` | Always enabled, regardless of model. Useful if you notice your current model describing actions instead of performing them. | +| `false` | Always disabled, regardless of model. | +| `["gpt", "codex", "qwen", "llama"]` | Enabled only when the model name contains one of the listed substrings (case-insensitive). | -When enabled, the system prompt includes guidance reminding the model to make actual tool calls rather than describing what it would do. This is transparent to the user and has no effect on models that already use tools reliably. +### What it injects + +When enabled, three layers of guidance may be added to the system prompt: + +1. **General tool-use enforcement** (all matched models) — instructs the model to make tool calls immediately instead of describing intentions, keep working until the task is complete, and never end a turn with a promise of future action. + +2. **OpenAI execution discipline** (GPT and Codex models only) — additional guidance addressing GPT-specific failure modes: abandoning work on partial results, skipping prerequisite lookups, hallucinating instead of using tools, and declaring "done" without verification. + +3. **Google operational guidance** (Gemini and Gemma models only) — conciseness, absolute paths, parallel tool calls, and verify-before-edit patterns. + +These are transparent to the user and only affect the system prompt. Models that already use tools reliably (like Claude) don't need this guidance, which is why `"auto"` excludes them. + +### When to turn it on + +If you're using a model not in the default auto list and notice it frequently describes what it *would* do instead of doing it, set `tool_use_enforcement: true` or add the model substring to the list: + +```yaml +agent: + tool_use_enforcement: ["gpt", "codex", "gemini", "grok", "my-custom-model"] +``` ## TTS Configuration @@ -846,6 +865,7 @@ display: tool_progress: all # off | new | all | verbose tool_progress_command: false # Enable /verbose slash command in messaging gateway tool_progress_overrides: {} # Per-platform overrides (see below) + interim_assistant_messages: true # Gateway: send natural mid-turn assistant updates as separate messages skin: default # Built-in or custom CLI skin (see user-guide/features/skins) personality: "kawaii" # Legacy cosmetic field still surfaced in some summaries compact: false # Compact output mode (less whitespace) @@ -881,6 +901,8 @@ display: Platforms without an override fall back to the global `tool_progress` value. Valid platform keys: `telegram`, `discord`, `slack`, `signal`, `whatsapp`, `matrix`, `mattermost`, `email`, `sms`, `homeassistant`, `dingtalk`, `feishu`, `wecom`, `weixin`, `bluebubbles`. +`interim_assistant_messages` is gateway-only. When enabled, Hermes sends completed mid-turn assistant updates as separate chat messages. This is independent from `tool_progress` and does not require gateway streaming. + ## Privacy ```yaml @@ -971,6 +993,8 @@ streaming: When enabled, the bot sends a message on the first token, then progressively edits it as more tokens arrive. Platforms that don't support message editing (Signal, Email, Home Assistant) are auto-detected on the first attempt — streaming is gracefully disabled for that session with no flood of messages. +For separate natural mid-turn assistant updates without progressive token editing, set `display.interim_assistant_messages: true`. + **Overflow handling:** If the streamed text exceeds the platform's message length limit (~4096 chars), the current message is finalized and a new one starts automatically. :::note