diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 12e2b3999..31d829de0 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -774,6 +774,11 @@ display: # Toggle at runtime with /verbose in the CLI tool_progress: all + # Gateway-only natural mid-turn assistant updates. + # When true, completed assistant status messages are sent as separate chat + # messages. This is independent of tool_progress and gateway streaming. + interim_assistant_messages: true + # What Enter does when Hermes is already busy in the CLI. # interrupt: Interrupt the current run and redirect Hermes (default) # queue: Queue your message for the next turn diff --git a/gateway/run.py b/gateway/run.py index 9434c9e5f..54e574dec 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -76,7 +76,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) # Resolve Hermes home directory (respects HERMES_HOME override) from hermes_constants import get_hermes_home -from utils import atomic_yaml_write +from utils import atomic_yaml_write, is_truthy_value _hermes_home = get_hermes_home() # Load environment variables from ~/.hermes/.env first. @@ -7078,10 +7078,14 @@ class GatewayRunner: from hermes_cli.tools_config import _get_platform_tools enabled_toolsets = sorted(_get_platform_tools(user_config, platform_key)) + display_config = user_config.get("display", {}) + if not isinstance(display_config, dict): + display_config = {} + # Apply tool preview length config (0 = no limit) try: from agent.display import set_tool_preview_max_len - _tpl = user_config.get("display", {}).get("tool_preview_length", 0) + _tpl = display_config.get("tool_preview_length", 0) set_tool_preview_max_len(int(_tpl) if _tpl else 0) except Exception: pass @@ -7094,11 +7098,12 @@ class GatewayRunner: # Per-platform overrides (display.tool_progress_overrides) take # priority over the global setting — e.g. Signal users can set # tool_progress to "off" while keeping Telegram on "all". - _display_cfg = user_config.get("display", {}) - _overrides = _display_cfg.get("tool_progress_overrides", {}) + _overrides = display_config.get("tool_progress_overrides", {}) + if not isinstance(_overrides, dict): + _overrides = {} _raw_tp = _overrides.get(platform_key) if _raw_tp is None: - _raw_tp = _display_cfg.get("tool_progress") + _raw_tp = display_config.get("tool_progress") if _raw_tp is False: _raw_tp = "off" progress_mode = ( @@ -7110,6 +7115,16 @@ class GatewayRunner: # so each progress line would be sent as a separate message. from gateway.config import Platform tool_progress_enabled = progress_mode != "off" and source.platform != Platform.WEBHOOK + # Natural assistant status messages are intentionally independent from + # tool progress and token streaming. Users can keep tool_progress quiet + # in chat platforms while opting into concise mid-turn updates. + interim_assistant_messages_enabled = ( + source.platform != Platform.WEBHOOK + and is_truthy_value( + display_config.get("interim_assistant_messages"), + default=True, + ) + ) # Queue for progress messages (thread-safe) progress_queue = queue.Queue() if tool_progress_enabled else None @@ -7422,7 +7437,7 @@ class GatewayRunner: reasoning_config = self._load_reasoning_config() self._reasoning_config = reasoning_config self._service_tier = self._load_service_tier() - # Set up streaming consumer if enabled + # Set up stream consumer for token streaming or interim commentary. _stream_consumer = None _stream_delta_cb = None _scfg = getattr(getattr(self, 'config', None), 'streaming', None) @@ -7430,7 +7445,10 @@ class GatewayRunner: from gateway.config import StreamingConfig _scfg = StreamingConfig() - if _scfg.enabled and _scfg.transport != "off": + _want_stream_deltas = _scfg.enabled and _scfg.transport != "off" + _want_interim_messages = interim_assistant_messages_enabled + _want_interim_consumer = _want_interim_messages + if _want_stream_deltas or _want_interim_consumer: try: from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig _adapter = self.adapters.get(source.platform) @@ -7446,11 +7464,33 @@ class GatewayRunner: config=_consumer_cfg, metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, ) - _stream_delta_cb = _stream_consumer.on_delta + if _want_stream_deltas: + _stream_delta_cb = _stream_consumer.on_delta stream_consumer_holder[0] = _stream_consumer except Exception as _sc_err: logger.debug("Could not set up stream consumer: %s", _sc_err) + def _interim_assistant_cb(text: str, *, already_streamed: bool = False) -> None: + if _stream_consumer is not None: + if already_streamed: + _stream_consumer.on_segment_break() + else: + _stream_consumer.on_commentary(text) + return + if already_streamed or not _status_adapter or not str(text or "").strip(): + return + try: + asyncio.run_coroutine_threadsafe( + _status_adapter.send( + _status_chat_id, + text, + metadata=_status_thread_metadata, + ), + _loop_for_step, + ) + except Exception as _e: + logger.debug("interim_assistant_callback error: %s", _e) + turn_route = self._resolve_turn_agent_config(message, model, runtime_kwargs) # Check agent cache — reuse the AIAgent from the previous message @@ -7508,6 +7548,7 @@ class GatewayRunner: agent.tool_progress_callback = progress_callback if tool_progress_enabled else None agent.step_callback = _step_callback_sync if _hooks_ref.loaded_hooks else None agent.stream_delta_callback = _stream_delta_cb + agent.interim_assistant_callback = _interim_assistant_cb if _want_interim_messages else None agent.status_callback = _status_callback_sync agent.reasoning_config = reasoning_config agent.service_tier = self._service_tier @@ -7811,6 +7852,7 @@ class GatewayRunner: "output_tokens": _output_toks, "model": _resolved_model, "session_id": effective_session_id, + "response_previewed": result.get("response_previewed", False), } # Start progress message sender if enabled @@ -8133,12 +8175,36 @@ class GatewayRunner: # response before processing the queued follow-up. # Skip if streaming already delivered it. _sc = stream_consumer_holder[0] - _already_streamed = _sc and getattr(_sc, "already_sent", False) + if _sc and stream_task: + try: + await asyncio.wait_for(stream_task, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + stream_task.cancel() + try: + await stream_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.debug("Stream consumer wait before queued message failed: %s", e) + _response_previewed = bool(result.get("response_previewed")) + _already_streamed = bool( + _sc + and ( + getattr(_sc, "final_response_sent", False) + or ( + _response_previewed + and getattr(_sc, "already_sent", False) + ) + ) + ) first_response = result.get("final_response", "") if first_response and not _already_streamed: try: - await adapter.send(source.chat_id, first_response, - metadata={"thread_id": source.thread_id} if source.thread_id else None) + await adapter.send( + source.chat_id, + first_response, + metadata=_status_thread_metadata, + ) except Exception as e: logger.warning("Failed to send first response before queued message: %s", e) # else: interrupted — discard the interrupted response ("Operation @@ -8211,8 +8277,15 @@ class GatewayRunner: # message is new content the user hasn't seen, and it must reach # them even if streaming had sent earlier partial output. _sc = stream_consumer_holder[0] - if _sc and _sc.already_sent and isinstance(response, dict): - if not response.get("failed"): + if _sc and isinstance(response, dict) and not response.get("failed"): + _response_previewed = bool(response.get("response_previewed")) + if ( + getattr(_sc, "final_response_sent", False) + or ( + _response_previewed + and getattr(_sc, "already_sent", False) + ) + ): response["already_sent"] = True return response diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index de0a1453b..486d179de 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -32,6 +32,10 @@ _DONE = object() # new one so that subsequent text appears below tool progress messages. _NEW_SEGMENT = object() +# Queue marker for a completed assistant commentary message emitted between +# API/tool iterations (for example: "I'll inspect the repo first."). +_COMMENTARY = object() + @dataclass class StreamConsumerConfig: @@ -75,20 +79,43 @@ class GatewayStreamConsumer: self._accumulated = "" self._message_id: Optional[str] = None self._already_sent = False - self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA) + self._edit_supported = True # Disabled when progressive edits are no longer usable self._last_edit_time = 0.0 self._last_sent_text = "" # Track last-sent text to skip redundant edits self._fallback_final_send = False self._fallback_prefix = "" self._flood_strikes = 0 # Consecutive flood-control edit failures self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff + self._final_response_sent = False @property def already_sent(self) -> bool: - """True if at least one message was sent/edited — signals the base - adapter to skip re-sending the final response.""" + """True if at least one message was sent or edited during the run.""" return self._already_sent + @property + def final_response_sent(self) -> bool: + """True when the stream consumer delivered the final assistant reply.""" + return self._final_response_sent + + def on_segment_break(self) -> None: + """Finalize the current stream segment and start a fresh message.""" + self._queue.put(_NEW_SEGMENT) + + def on_commentary(self, text: str) -> None: + """Queue a completed interim assistant commentary message.""" + if text: + self._queue.put((_COMMENTARY, text)) + + def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None: + if preserve_no_edit and self._message_id == "__no_edit__": + return + self._message_id = None + self._accumulated = "" + self._last_sent_text = "" + self._fallback_final_send = False + self._fallback_prefix = "" + def on_delta(self, text: str) -> None: """Thread-safe callback — called from the agent's worker thread. @@ -99,7 +126,7 @@ class GatewayStreamConsumer: if text: self._queue.put(text) elif text is None: - self._queue.put(_NEW_SEGMENT) + self.on_segment_break() def finish(self) -> None: """Signal that the stream is complete.""" @@ -116,6 +143,7 @@ class GatewayStreamConsumer: # Drain all available items from the queue got_done = False got_segment_break = False + commentary_text = None while True: try: item = self._queue.get_nowait() @@ -125,6 +153,9 @@ class GatewayStreamConsumer: if item is _NEW_SEGMENT: got_segment_break = True break + if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY: + commentary_text = item[1] + break self._accumulated += item except queue.Empty: break @@ -135,11 +166,13 @@ class GatewayStreamConsumer: should_edit = ( got_done or got_segment_break + or commentary_text is not None or (elapsed >= self._current_edit_interval and self._accumulated) or len(self._accumulated) >= self.cfg.buffer_threshold ) + current_update_visible = False if should_edit and self._accumulated: # Split overflow: if accumulated text exceeds the platform # limit, split into properly sized chunks. @@ -161,6 +194,7 @@ class GatewayStreamConsumer: self._last_sent_text = "" self._last_edit_time = time.monotonic() if got_done: + self._final_response_sent = self._already_sent return if got_segment_break: self._message_id = None @@ -192,10 +226,10 @@ class GatewayStreamConsumer: self._last_sent_text = "" display_text = self._accumulated - if not got_done and not got_segment_break: + if not got_done and not got_segment_break and commentary_text is None: display_text += self.cfg.cursor - await self._send_or_edit(display_text) + current_update_visible = await self._send_or_edit(display_text) self._last_edit_time = time.monotonic() if got_done: @@ -206,12 +240,20 @@ class GatewayStreamConsumer: if self._accumulated: if self._fallback_final_send: await self._send_fallback_final(self._accumulated) + elif current_update_visible: + self._final_response_sent = True elif self._message_id: - await self._send_or_edit(self._accumulated) + self._final_response_sent = await self._send_or_edit(self._accumulated) elif not self._already_sent: - await self._send_or_edit(self._accumulated) + self._final_response_sent = await self._send_or_edit(self._accumulated) return + if commentary_text is not None: + self._reset_segment_state() + await self._send_commentary(commentary_text) + self._last_edit_time = time.monotonic() + self._reset_segment_state() + # Tool boundary: reset message state so the next text chunk # creates a fresh message below any tool-progress messages. # @@ -220,17 +262,14 @@ class GatewayStreamConsumer: # github_comment delivery). Resetting to None would re-enter # the "first send" path on every tool boundary and post one # platform message per tool call — that is what caused 155 - # comments under a single PR. Instead, keep all state so the - # full continuation is delivered once via _send_fallback_final. + # comments under a single PR. Instead, preserve the sentinel + # so the full continuation is delivered once via + # _send_fallback_final. # (When editing fails mid-stream due to flood control the id is # a real string like "msg_1", not "__no_edit__", so that case # still resets and creates a fresh segment as intended.) - if got_segment_break and self._message_id != "__no_edit__": - self._message_id = None - self._accumulated = "" - self._last_sent_text = "" - self._fallback_final_send = False - self._fallback_prefix = "" + if got_segment_break: + self._reset_segment_state(preserve_no_edit=True) await asyncio.sleep(0.05) # Small yield to not busy-loop @@ -339,6 +378,7 @@ class GatewayStreamConsumer: if not continuation.strip(): # Nothing new to send — the visible partial already matches final text. self._already_sent = True + self._final_response_sent = True return raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) @@ -373,6 +413,7 @@ class GatewayStreamConsumer: # the base gateway final-send path so we don't resend the # full response and create another duplicate. self._already_sent = True + self._final_response_sent = True self._message_id = last_message_id self._last_sent_text = last_successful_chunk self._fallback_prefix = "" @@ -390,6 +431,7 @@ class GatewayStreamConsumer: self._message_id = last_message_id self._already_sent = True + self._final_response_sent = True self._last_sent_text = chunks[-1] self._fallback_prefix = "" @@ -420,6 +462,24 @@ class GatewayStreamConsumer: except Exception: pass # best-effort — don't let this block the fallback path + async def _send_commentary(self, text: str) -> bool: + """Send a completed interim assistant commentary message.""" + text = self._clean_for_display(text) + if not text.strip(): + return False + try: + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + if result.success: + self._already_sent = True + return True + except Exception as e: + logger.error("Commentary send error: %s", e) + return False + async def _send_or_edit(self, text: str) -> bool: """Send or edit the streaming message. @@ -501,23 +561,21 @@ class GatewayStreamConsumer: content=text, metadata=self.metadata, ) - if result.success and result.message_id: - self._message_id = result.message_id + if result.success: + if result.message_id: + self._message_id = result.message_id + else: + self._edit_supported = False self._already_sent = True self._last_sent_text = text + if not result.message_id: + self._fallback_prefix = self._visible_prefix() + self._fallback_final_send = True + # Sentinel prevents re-entering the first-send path on + # every delta/tool boundary when platforms accept a + # message but do not return an editable message id. + self._message_id = "__no_edit__" return True - elif result.success: - # Platform accepted the message but returned no message_id - # (e.g. Signal). Can't edit without an ID — switch to - # fallback mode: suppress intermediate deltas, send only - # the missing tail once the final response is ready. - self._already_sent = True - self._edit_supported = False - self._fallback_prefix = self._clean_for_display(text) - self._fallback_final_send = True - # Sentinel prevents re-entering this branch on every delta - self._message_id = "__no_edit__" - return True # platform accepted, just can't edit else: # Initial send failed — disable streaming for this session self._edit_supported = False diff --git a/hermes_cli/config.py b/hermes_cli/config.py index f551a195d..67d8c0c13 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -448,6 +448,7 @@ DEFAULT_CONFIG = { "inline_diffs": True, # Show inline diff previews for write actions (write_file, patch, skill_manage) "show_cost": False, # Show $ cost in the status bar (off by default) "skin": "default", + "interim_assistant_messages": True, # Gateway: show natural mid-turn assistant status messages "tool_progress_command": False, # Enable /verbose command in messaging gateway "tool_progress_overrides": {}, # Per-platform overrides: {"signal": "off", "telegram": "all"} "tool_preview_length": 0, # Max chars for tool call previews (0 = no limit, show full paths/commands) @@ -638,7 +639,7 @@ DEFAULT_CONFIG = { }, # Config schema version - bump this when adding new required fields - "_config_version": 14, + "_config_version": 15, } # ============================================================================= @@ -1865,6 +1866,20 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A if not quiet: print(f" ✓ Migrated legacy stt.model to provider-specific config") + # ── Version 14 → 15: add explicit gateway interim-message gate ── + if current_ver < 15: + config = read_raw_config() + display = config.get("display", {}) + if not isinstance(display, dict): + display = {} + if "interim_assistant_messages" not in display: + display["interim_assistant_messages"] = True + config["display"] = display + results["config_added"].append("display.interim_assistant_messages=true (default)") + save_config(config) + if not quiet: + print(" ✓ Added display.interim_assistant_messages=true") + if current_ver < latest_ver and not quiet: print(f"Config version: {current_ver} → {latest_ver}") diff --git a/run_agent.py b/run_agent.py index 0ff11d553..a53d11c7a 100644 --- a/run_agent.py +++ b/run_agent.py @@ -579,6 +579,7 @@ class AIAgent: clarify_callback: callable = None, step_callback: callable = None, stream_delta_callback: callable = None, + interim_assistant_callback: callable = None, tool_gen_callback: callable = None, status_callback: callable = None, max_tokens: int = None, @@ -728,6 +729,7 @@ class AIAgent: self.clarify_callback = clarify_callback self.step_callback = step_callback self.stream_delta_callback = stream_delta_callback + self.interim_assistant_callback = interim_assistant_callback self.status_callback = status_callback self.tool_gen_callback = tool_gen_callback @@ -831,6 +833,11 @@ class AIAgent: # Deferred paragraph break flag — set after tool iterations so a # single "\n\n" is prepended to the next real text delta. self._stream_needs_break = False + # Visible assistant text already delivered through live token callbacks + # during the current model response. Used to avoid re-sending the same + # commentary when the provider later returns it as a completed interim + # assistant message. + self._current_streamed_assistant_text = "" # Optional current-turn user-message override used when the API-facing # user message intentionally differs from the persisted transcript @@ -4730,6 +4737,49 @@ class AIAgent: # ── Unified streaming API call ───────────────────────────────────────── + def _reset_stream_delivery_tracking(self) -> None: + """Reset tracking for text delivered during the current model response.""" + self._current_streamed_assistant_text = "" + + def _record_streamed_assistant_text(self, text: str) -> None: + """Accumulate visible assistant text emitted through stream callbacks.""" + if isinstance(text, str) and text: + self._current_streamed_assistant_text = ( + getattr(self, "_current_streamed_assistant_text", "") + text + ) + + @staticmethod + def _normalize_interim_visible_text(text: str) -> str: + if not isinstance(text, str): + return "" + return re.sub(r"\s+", " ", text).strip() + + def _interim_content_was_streamed(self, content: str) -> bool: + visible_content = self._normalize_interim_visible_text( + self._strip_think_blocks(content or "") + ) + if not visible_content: + return False + streamed = self._normalize_interim_visible_text( + self._strip_think_blocks(getattr(self, "_current_streamed_assistant_text", "") or "") + ) + return bool(streamed) and streamed == visible_content + + def _emit_interim_assistant_message(self, assistant_msg: Dict[str, Any]) -> None: + """Surface a real mid-turn assistant commentary message to the UI layer.""" + cb = getattr(self, "interim_assistant_callback", None) + if cb is None or not isinstance(assistant_msg, dict): + return + content = assistant_msg.get("content") + visible = self._strip_think_blocks(content or "").strip() + if not visible or visible == "(empty)": + return + already_streamed = self._interim_content_was_streamed(visible) + try: + cb(visible, already_streamed=already_streamed) + except Exception: + logger.debug("interim_assistant_callback error", exc_info=True) + def _fire_stream_delta(self, text: str) -> None: """Fire all registered stream delta callbacks (display + TTS).""" # If a tool iteration set the break flag, prepend a single paragraph @@ -4739,12 +4789,16 @@ class AIAgent: if getattr(self, "_stream_needs_break", False) and text and text.strip(): self._stream_needs_break = False text = "\n\n" + text - for cb in (self.stream_delta_callback, self._stream_callback): - if cb is not None: - try: - cb(text) - except Exception: - pass + callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None] + delivered = False + for cb in callbacks: + try: + cb(text) + delivered = True + except Exception: + pass + if delivered: + self._record_streamed_assistant_text(text) def _fire_reasoning_delta(self, text: str) -> None: """Fire reasoning callback if registered.""" @@ -4928,6 +4982,7 @@ class AIAgent: if self.stream_delta_callback: try: self.stream_delta_callback(delta.content) + self._record_streamed_assistant_text(delta.content) except Exception: pass @@ -8027,6 +8082,7 @@ class AIAgent: while retry_count < max_retries: try: + self._reset_stream_delivery_tracking() api_kwargs = self._build_api_kwargs(api_messages) if self.api_mode == "codex_responses": api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False) @@ -9441,6 +9497,7 @@ class AIAgent: ) if not duplicate_interim: messages.append(interim_msg) + self._emit_interim_assistant_message(interim_msg) if self._codex_incomplete_retries < 3: if not self.quiet_mode: @@ -9660,6 +9717,7 @@ class AIAgent: messages.pop() messages.append(assistant_msg) + self._emit_interim_assistant_message(assistant_msg) # Close any open streaming display (response box, reasoning # box) before tool execution begins. Intermediate turns may @@ -9938,6 +9996,7 @@ class AIAgent: codex_ack_continuations += 1 interim_msg = self._build_assistant_message(assistant_message, "incomplete") messages.append(interim_msg) + self._emit_interim_assistant_message(interim_msg) continue_msg = { "role": "user", diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index c28317d7e..6b1d46567 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -8,8 +8,8 @@ from types import SimpleNamespace import pytest -from gateway.config import Platform, PlatformConfig -from gateway.platforms.base import BasePlatformAdapter, SendResult +from gateway.config import Platform, PlatformConfig, StreamingConfig +from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType, SendResult from gateway.session import SessionSource @@ -104,6 +104,11 @@ def _make_runner(adapter): runner._session_db = None runner._running_agents = {} runner.hooks = SimpleNamespace(loaded_hooks=False) + runner.config = SimpleNamespace( + thread_sessions_per_user=False, + group_sessions_per_user=False, + stt_enabled=False, + ) return runner @@ -118,6 +123,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa fake_run_agent = types.ModuleType("run_agent") fake_run_agent.AIAgent = FakeAgent monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + import tools.terminal_tool # noqa: F401 - register terminal emoji for this fake-agent test adapter = ProgressCaptureAdapter() runner = _make_runner(adapter) @@ -144,7 +150,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa assert adapter.sent == [ { "chat_id": "-1001", - "content": '⚙️ terminal: "pwd"', + "content": '💻 terminal: "pwd"', "reply_to": None, "metadata": {"thread_id": "17585"}, } @@ -334,3 +340,238 @@ def test_all_mode_no_truncation_when_preview_fits(monkeypatch, tmp_path): content = adapter.sent[0]["content"] # With a 200-char cap, the 165-char command should NOT be truncated assert "..." not in content, f"Preview was truncated when it shouldn't be: {content}" + + +class CommentaryAgent: + def __init__(self, **kwargs): + self.tool_progress_callback = kwargs.get("tool_progress_callback") + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.stream_delta_callback = kwargs.get("stream_delta_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.interim_assistant_callback: + self.interim_assistant_callback("I'll inspect the repo first.", already_streamed=False) + time.sleep(0.1) + if self.stream_delta_callback: + self.stream_delta_callback("done") + return { + "final_response": "done", + "messages": [], + "api_calls": 1, + } + + +class PreviewedResponseAgent: + def __init__(self, **kwargs): + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.interim_assistant_callback: + self.interim_assistant_callback("You're welcome.", already_streamed=False) + return { + "final_response": "You're welcome.", + "response_previewed": True, + "messages": [], + "api_calls": 1, + } + + +class QueuedCommentaryAgent: + calls = 0 + + def __init__(self, **kwargs): + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + type(self).calls += 1 + if type(self).calls == 1 and self.interim_assistant_callback: + self.interim_assistant_callback("I'll inspect the repo first.", already_streamed=False) + return { + "final_response": f"final response {type(self).calls}", + "messages": [], + "api_calls": 1, + } + + +async def _run_with_agent( + monkeypatch, + tmp_path, + agent_cls, + *, + session_id, + pending_text=None, + config_data=None, +): + if config_data: + import yaml + + (tmp_path / "config.yaml").write_text(yaml.dump(config_data), encoding="utf-8") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = agent_cls + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = ProgressCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + if config_data and "streaming" in config_data: + runner.config.streaming = StreamingConfig.from_dict(config_data["streaming"]) + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + ) + session_key = "agent:main:telegram:group:-1001:17585" + if pending_text is not None: + adapter._pending_messages[session_key] = MessageEvent( + text=pending_text, + message_type=MessageType.TEXT, + source=source, + message_id="queued-1", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id=session_id, + session_key=session_key, + ) + return adapter, result + + +@pytest.mark.asyncio +async def test_run_agent_surfaces_real_interim_commentary(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + assert result.get("already_sent") is not True + assert any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_surfaces_interim_commentary_by_default(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-default-on", + ) + + assert any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_suppresses_interim_commentary_when_disabled(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-disabled", + config_data={"display": {"interim_assistant_messages": False}}, + ) + + assert result.get("already_sent") is not True + assert not any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_tool_progress_does_not_control_interim_commentary(monkeypatch, tmp_path): + """tool_progress=all with interim_assistant_messages=false should not surface commentary.""" + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-tool-progress", + config_data={"display": {"tool_progress": "all", "interim_assistant_messages": False}}, + ) + + assert result.get("already_sent") is not True + assert not any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_streaming_does_not_enable_completed_interim_commentary( + monkeypatch, tmp_path +): + """Streaming alone with interim_assistant_messages=false should not surface commentary.""" + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-streaming", + config_data={ + "display": {"tool_progress": "off", "interim_assistant_messages": False}, + "streaming": {"enabled": True}, + }, + ) + + assert result.get("already_sent") is True + assert not any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_interim_commentary_works_with_tool_progress_off(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + CommentaryAgent, + session_id="sess-commentary-explicit-on", + config_data={ + "display": { + "tool_progress": "off", + "interim_assistant_messages": True, + }, + }, + ) + + assert result.get("already_sent") is not True + assert any(call["content"] == "I'll inspect the repo first." for call in adapter.sent) + + +@pytest.mark.asyncio +async def test_run_agent_previewed_final_marks_already_sent(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + PreviewedResponseAgent, + session_id="sess-previewed", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + assert result.get("already_sent") is True + assert [call["content"] for call in adapter.sent] == ["You're welcome."] + + +@pytest.mark.asyncio +async def test_run_agent_queued_message_does_not_treat_commentary_as_final(monkeypatch, tmp_path): + QueuedCommentaryAgent.calls = 0 + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + QueuedCommentaryAgent, + session_id="sess-queued-commentary", + pending_text="queued follow-up", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + sent_texts = [call["content"] for call in adapter.sent] + assert result["final_response"] == "final response 2" + assert "I'll inspect the repo first." in sent_texts + assert "final response 1" in sent_texts diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 5cebb20ee..8f7fb6dd5 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -505,3 +505,81 @@ class TestSegmentBreakOnToolBoundary: assert len(sent_texts) == 3 assert sent_texts[0].startswith(prefix) assert sum(len(t) for t in sent_texts[1:]) == len(tail) + + +class TestInterimCommentaryMessages: + @pytest.mark.asyncio + async def test_commentary_message_stays_separate_from_final_stream(self): + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=[ + SimpleNamespace(success=True, message_id="msg_1"), + SimpleNamespace(success=True, message_id="msg_2"), + ]) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + consumer.on_commentary("I'll inspect the repository first.") + consumer.on_delta("Done.") + consumer.finish() + + await consumer.run() + + sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] + assert sent_texts == ["I'll inspect the repository first.", "Done."] + assert consumer.final_response_sent is True + + @pytest.mark.asyncio + async def test_failed_final_send_does_not_mark_final_response_sent(self): + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=False, message_id=None)) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + consumer.on_delta("Done.") + consumer.finish() + + await consumer.run() + + assert consumer.final_response_sent is False + assert consumer.already_sent is False + + @pytest.mark.asyncio + async def test_success_without_message_id_marks_visible_and_sends_only_tail(self): + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=[ + SimpleNamespace(success=True, message_id=None), + SimpleNamespace(success=True, message_id=None), + ]) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉"), + ) + + consumer.on_delta("Hello") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + consumer.on_delta(" world") + await asyncio.sleep(0.08) + consumer.finish() + await task + + sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] + assert sent_texts == ["Hello ▉", "world"] + assert consumer.already_sent is True + assert consumer.final_response_sent is True diff --git a/tests/hermes_cli/test_config.py b/tests/hermes_cli/test_config.py index 1c245577e..2bb63767a 100644 --- a/tests/hermes_cli/test_config.py +++ b/tests/hermes_cli/test_config.py @@ -68,6 +68,7 @@ class TestLoadConfigDefaults: assert "max_turns" not in config assert "terminal" in config assert config["terminal"]["backend"] == "local" + assert config["display"]["interim_assistant_messages"] is True def test_legacy_root_level_max_turns_migrates_to_agent_config(self, tmp_path): with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}): @@ -421,3 +422,25 @@ class TestAnthropicTokenMigration: }): migrate_config(interactive=False, quiet=True) assert load_env().get("ANTHROPIC_TOKEN") == "current-token" + + +class TestInterimAssistantMessageConfig: + """Test the explicit gateway interim-message config gate.""" + + def test_default_config_enables_interim_assistant_messages(self): + assert DEFAULT_CONFIG["display"]["interim_assistant_messages"] is True + + def test_migrate_to_v15_adds_interim_assistant_message_gate(self, tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump({"_config_version": 14, "display": {"tool_progress": "off"}}), + encoding="utf-8", + ) + + with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}): + migrate_config(interactive=False, quiet=True) + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) + + assert raw["_config_version"] == 15 + assert raw["display"]["tool_progress"] == "off" + assert raw["display"]["interim_assistant_messages"] is True diff --git a/tests/run_agent/test_run_agent_codex_responses.py b/tests/run_agent/test_run_agent_codex_responses.py index 17a70624d..533a85ac8 100644 --- a/tests/run_agent/test_run_agent_codex_responses.py +++ b/tests/run_agent/test_run_agent_codex_responses.py @@ -744,6 +744,44 @@ def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(mo assert "inspect the repository" in (assistant_message.content or "") +def test_interim_commentary_is_not_marked_already_streamed_without_callbacks(monkeypatch): + agent = _build_agent(monkeypatch) + observed = {} + + agent._fire_stream_delta("short version: yes") + agent.interim_assistant_callback = lambda text, *, already_streamed=False: observed.update( + {"text": text, "already_streamed": already_streamed} + ) + + agent._emit_interim_assistant_message({"role": "assistant", "content": "short version: yes"}) + + assert observed == { + "text": "short version: yes", + "already_streamed": False, + } + + +def test_interim_commentary_is_not_marked_already_streamed_when_stream_callback_fails(monkeypatch): + agent = _build_agent(monkeypatch) + observed = {} + + def failing_callback(_text): + raise RuntimeError("display failed") + + agent.stream_delta_callback = failing_callback + agent._fire_stream_delta("short version: yes") + agent.interim_assistant_callback = lambda text, *, already_streamed=False: observed.update( + {"text": text, "already_streamed": already_streamed} + ) + + agent._emit_interim_assistant_message({"role": "assistant", "content": "short version: yes"}) + + assert observed == { + "text": "short version: yes", + "already_streamed": False, + } + + def test_run_conversation_codex_continues_after_commentary_phase_message(monkeypatch): agent = _build_agent(monkeypatch) responses = [ diff --git a/tests/tools/test_browser_camofox_state.py b/tests/tools/test_browser_camofox_state.py index b1f128cce..33a939f09 100644 --- a/tests/tools/test_browser_camofox_state.py +++ b/tests/tools/test_browser_camofox_state.py @@ -59,8 +59,9 @@ class TestCamofoxConfigDefaults: browser_cfg = DEFAULT_CONFIG["browser"] assert browser_cfg["camofox"]["managed_persistence"] is False - def test_config_version_unchanged(self): + def test_config_version_matches_current_schema(self): from hermes_cli.config import DEFAULT_CONFIG - # managed_persistence is auto-merged by _deep_merge, no version bump needed - assert DEFAULT_CONFIG["_config_version"] == 13 + # The current schema version is tracked globally; unrelated default + # options may bump it after browser defaults are added. + assert DEFAULT_CONFIG["_config_version"] == 15 diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index 7b735bbde..84fa4853d 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -846,6 +846,7 @@ display: tool_progress: all # off | new | all | verbose tool_progress_command: false # Enable /verbose slash command in messaging gateway tool_progress_overrides: {} # Per-platform overrides (see below) + interim_assistant_messages: true # Gateway: send natural mid-turn assistant updates as separate messages skin: default # Built-in or custom CLI skin (see user-guide/features/skins) personality: "kawaii" # Legacy cosmetic field still surfaced in some summaries compact: false # Compact output mode (less whitespace) @@ -881,6 +882,8 @@ display: Platforms without an override fall back to the global `tool_progress` value. Valid platform keys: `telegram`, `discord`, `slack`, `signal`, `whatsapp`, `matrix`, `mattermost`, `email`, `sms`, `homeassistant`, `dingtalk`, `feishu`, `wecom`, `weixin`, `bluebubbles`. +`interim_assistant_messages` is gateway-only. When enabled, Hermes sends completed mid-turn assistant updates as separate chat messages. This is independent from `tool_progress` and does not require gateway streaming. + ## Privacy ```yaml @@ -971,6 +974,8 @@ streaming: When enabled, the bot sends a message on the first token, then progressively edits it as more tokens arrive. Platforms that don't support message editing (Signal, Email, Home Assistant) are auto-detected on the first attempt — streaming is gracefully disabled for that session with no flood of messages. +For separate natural mid-turn assistant updates without progressive token editing, set `display.interim_assistant_messages: true`. + **Overflow handling:** If the streamed text exceeds the platform's message length limit (~4096 chars), the current message is finalized and a new one starts automatically. :::note