diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 125bc1fb6ad..ee706816b9c 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -15,6 +15,7 @@ import re import socket as _socket import subprocess import sys +import time import uuid from abc import ABC, abstractmethod from urllib.parse import urlsplit @@ -40,6 +41,16 @@ def _platform_name(platform) -> str: return str(value or "").lower() +def _float_env(name: str, default: float) -> float: + raw = os.environ.get(name, "").strip() + if not raw: + return default + try: + return float(raw) + except (TypeError, ValueError): + return default + + def _thread_metadata_for_source(source, reply_to_message_id: str | None = None) -> dict | None: """Build platform-aware thread metadata for adapter sends. @@ -1103,6 +1114,14 @@ class MessageEvent: return args +@dataclass +class TextDebounceState: + event: MessageEvent + task: asyncio.Task | None + first_ts: float + last_ts: float + + _PLAINTEXT_GATEWAY_RESTART_PATTERNS: tuple[re.Pattern[str], ...] = ( re.compile(r"^(?:please\s+)?restart\s+(?:the\s+)?gateway[.!?\s]*$", re.IGNORECASE), re.compile(r"^(?:please\s+)?restart\s+(?:the\s+)?hermes\s+gateway[.!?\s]*$", re.IGNORECASE), @@ -1398,6 +1417,17 @@ class BasePlatformAdapter(ABC): self._active_sessions: Dict[str, asyncio.Event] = {} self._pending_messages: Dict[str, MessageEvent] = {} self._session_tasks: Dict[str, asyncio.Task] = {} + self._busy_text_mode: str = ( + os.environ.get("HERMES_GATEWAY_BUSY_TEXT_MODE", "queue").strip().lower() + or "queue" + ) + self._busy_text_debounce_seconds: float = _float_env( + "HERMES_GATEWAY_BUSY_TEXT_DEBOUNCE_SECONDS", 0.35 + ) + self._busy_text_hard_cap_seconds: float = _float_env( + "HERMES_GATEWAY_BUSY_TEXT_HARD_CAP_SECONDS", 1.0 + ) + self._text_debounce: dict[str, TextDebounceState] = {} # Background message-processing tasks spawned by handle_message(). # Gateway shutdown cancels these so an old gateway instance doesn't keep # working on a task after --replace or manual restarts. @@ -2725,6 +2755,161 @@ class BasePlatformAdapter(ABC): return f"{existing_text}\n\n{new_text}".strip() return existing_text + def _text_debounce_store(self) -> dict[str, TextDebounceState]: + store = getattr(self, "_text_debounce", None) + if store is None: + store = {} + self._text_debounce = store + return store + + def _is_queue_text_debounce_candidate(self, event: MessageEvent) -> bool: + """Return True for normal text eligible for queue-mode debounce.""" + result = ( + getattr(self, "_busy_text_mode", "queue") == "queue" + and event.message_type == MessageType.TEXT + and not getattr(event, "internal", False) + and not event.is_command() + and bool((event.text or "").strip()) + ) + if result: + logger.debug( + "[%s] Queue-text debounce candidate accepted: session=%s text=%.60s", + self.name, + getattr(event, "session_key", "?"), + (event.text or "")[:60], + ) + return result + + def _can_merge_text_debounce_events(self, existing: MessageEvent, event: MessageEvent) -> bool: + """Return True when two text debounce events came from the same sender.""" + + def _identity(candidate: MessageEvent) -> tuple[str, ...] | None: + source = getattr(candidate, "source", None) + if source is None: + return None + platform = _platform_name(getattr(source, "platform", None)) + sender = getattr(source, "user_id_alt", None) or getattr(source, "user_id", None) + if sender: + return (platform, str(sender)) + if getattr(source, "chat_type", None) in {"dm", "private"} and getattr(source, "chat_id", None): + return (platform, "dm", str(source.chat_id)) + return None + + existing_sender = _identity(existing) + incoming_sender = _identity(event) + return existing_sender is not None and existing_sender == incoming_sender + + def _text_debounce_delay(self, session_key: str) -> float: + """Return bounded busy-text debounce delay for ``session_key``.""" + state = self._text_debounce_store().get(session_key) + if state is None: + return 0.0 + now = time.monotonic() + window_deadline = state.last_ts + self._busy_text_debounce_seconds + hard_cap_deadline = state.first_ts + self._busy_text_hard_cap_seconds + return max(0.0, min(window_deadline, hard_cap_deadline) - now) + + async def _queue_text_debounce(self, session_key: str, event: MessageEvent) -> None: + """Buffer normal queue-mode busy text and schedule a bounded flush.""" + store = self._text_debounce_store() + state = store.get(session_key) + + if state is not None and not self._can_merge_text_debounce_events(state.event, event): + # Preserve sender attribution in shared sessions. The current + # buffer becomes the next pending turn; the new sender starts a + # fresh debounce burst when the pending slot allows it. + await self._flush_text_debounce_now(session_key) + state = store.get(session_key) + if state is not None and not self._can_merge_text_debounce_events(state.event, event): + existing_pending = self._pending_messages.get(session_key) + if existing_pending is not None and self._can_merge_text_debounce_events(existing_pending, event): + merge_pending_message_event( + self._pending_messages, + session_key, + event, + merge_text=True, + ) + return + + now = time.monotonic() + if state is None: + state = TextDebounceState( + event=event, + task=None, + first_ts=now, + last_ts=now, + ) + store[session_key] = state + else: + if event.text: + state.event.text = ( + f"{state.event.text}\n{event.text}" + if state.event.text + else event.text + ) + latest_message_id = getattr(event, "message_id", None) + latest_anchor = latest_message_id or getattr(event, "reply_to_message_id", None) + if latest_message_id is not None: + state.event.message_id = str(latest_message_id) + if latest_anchor is not None and hasattr(state.event, "reply_to_message_id"): + state.event.reply_to_message_id = str(latest_anchor) + state.last_ts = now + + if state.task is not None and not state.task.done(): + state.task.cancel() + + delay = self._text_debounce_delay(session_key) + state.task = asyncio.create_task(self._flush_text_debounce(session_key, delay)) + + async def _flush_text_debounce(self, session_key: str, delay: float) -> None: + """Timer task that flushes the debounced text buffer.""" + try: + await asyncio.sleep(delay) + await self._flush_text_debounce_now(session_key) + except asyncio.CancelledError: + return + finally: + current = asyncio.current_task() + state = self._text_debounce_store().get(session_key) + if state is not None and state.task is current: + state.task = None + + async def _flush_text_debounce_now(self, session_key: str) -> bool: + """Force-flush one debounced busy-text burst into the pending slot.""" + store = self._text_debounce_store() + state = store.get(session_key) + if state is None: + return False + + current = asyncio.current_task() + if state.task is not None and state.task is not current and not state.task.done(): + state.task.cancel() + state.task = None + + existing_pending = self._pending_messages.get(session_key) + if ( + existing_pending is not None + and not self._can_merge_text_debounce_events(existing_pending, state.event) + ): + return False + + state = store.pop(session_key, None) + if state is None: + return False + merge_pending_message_event( + self._pending_messages, + session_key, + state.event, + merge_text=True, + ) + return True + + def _discard_text_debounce(self, session_key: str) -> None: + """Cancel and drop pending text debounce state for control commands.""" + state = self._text_debounce_store().pop(session_key, None) + if state is not None and state.task is not None and not state.task.done(): + state.task.cancel() + # ------------------------------------------------------------------ # Session task + guard ownership helpers # ------------------------------------------------------------------ @@ -2794,6 +2979,7 @@ class BasePlatformAdapter(ABC): self._active_sessions.pop(session_key, None) self._pending_messages.pop(session_key, None) self._session_tasks.pop(session_key, None) + self._discard_text_debounce(session_key) return True def _start_session_processing( @@ -2875,6 +3061,7 @@ class BasePlatformAdapter(ABC): ) if discard_pending: self._pending_messages.pop(session_key, None) + self._discard_text_debounce(session_key) if release_guard: self._release_session_guard(session_key) @@ -2889,6 +3076,7 @@ class BasePlatformAdapter(ABC): command-scoped guard, then — if a follow-up message landed while the command was running — spawns a fresh processing task for it. """ + await self._flush_text_debounce_now(session_key) pending_event = self._pending_messages.pop(session_key, None) self._release_session_guard(session_key, guard=command_guard) if pending_event is None: @@ -3020,6 +3208,7 @@ class BasePlatformAdapter(ABC): # through the dedicated handoff path that serializes # cancellation + runner response + pending drain. if cmd in {"stop", "new", "reset"}: + self._discard_text_debounce(session_key) try: await self._dispatch_active_session_command(event, session_key, cmd) except Exception as e: @@ -3064,8 +3253,9 @@ class BasePlatformAdapter(ABC): # clarify-intercept can resolve it and unblock the agent. # # Without this bypass: the message gets queued in - # _pending_messages AND triggers an interrupt, killing the - # agent run mid-clarify and discarding the user's answer. + # _pending_messages as a follow-up turn instead of reaching the + # clarify resolver, leaving the agent blocked and discarding the + # user's answer. # Same shape as the /approve deadlock fix (PR #4926) — both # cases are "agent thread blocked on Event.wait, message must # reach the resolver before being treated as a new turn." @@ -3124,27 +3314,28 @@ class BasePlatformAdapter(ABC): merge_pending_message_event(self._pending_messages, session_key, event) return # Don't interrupt now - will run after current task completes - # Default behavior for non-photo follow-ups: interrupt the running agent. - # - # Use merge_text=True so rapid TEXT follow-ups (#4469) accumulate - # into the single pending slot instead of clobbering each other. - # Without merging, three rapid messages "A", "B", "C" land like: - # _pending_messages[k] = A (interrupts) - # _pending_messages[k] = B (replaces A before consumer reads) - # _pending_messages[k] = C (replaces B) - # ...and only "C" reaches the next turn. merge_pending_message_event - # already does the right thing for photo/media bursts; the - # ``merge_text=True`` flag extends that to plain TEXT events. - # Same shape as the Telegram bursty-grace path in gateway/run.py. - logger.debug("[%s] New message while session %s is active — triggering interrupt", self.name, session_key) - merge_pending_message_event( - self._pending_messages, - session_key, - event, - merge_text=True, - ) - # Signal the interrupt (the processing task checks this) - self._active_sessions[session_key].set() + if self._is_queue_text_debounce_candidate(event): + logger.debug( + "[%s] New text message while session %s is active — " + "debouncing follow-up (busy_text_mode=queue, window=%.2fs)", + self.name, + session_key, + self._busy_text_debounce_seconds, + ) + await self._queue_text_debounce(session_key, event) + else: + logger.debug( + "[%s] New message while session %s is active — queuing follow-up " + "(no interrupt, will cascade after current turn)", + self.name, + session_key, + ) + merge_pending_message_event( + self._pending_messages, + session_key, + event, + merge_text=event.message_type == MessageType.TEXT, + ) return # Don't process now - will be handled after current task finishes # Mark session as active BEFORE spawning background task to close @@ -3498,10 +3689,15 @@ class BasePlatformAdapter(ABC): ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE, ) + # The active drain owns debounce state. If a queue-mode timer has + # not fired yet, force-flush into _pending_messages here and let + # this task hand off the follow-up. + await self._flush_text_debounce_now(session_key) + # Check if there's a pending message that was queued during our processing if session_key in self._pending_messages: pending_event = self._pending_messages.pop(session_key) - logger.debug("[%s] Processing queued message from interrupt", self.name) + logger.debug("[%s] Processing queued follow-up message", self.name) # Keep the _active_sessions entry live across the turn chain # and only CLEAR the interrupt Event — do NOT delete the entry. # If we deleted here, a concurrent inbound message arriving @@ -3510,7 +3706,7 @@ class BasePlatformAdapter(ABC): # with the recursive drain below. Two agents on one # session_key = duplicate responses, duplicate tool calls. # Clearing the Event keeps the guard live so follow-ups take - # the busy-handler path (queue + interrupt) as intended. + # the busy-handler path as intended. _active = self._active_sessions.get(session_key) if _active is not None: _active.clear() @@ -3603,6 +3799,9 @@ class BasePlatformAdapter(ABC): await self.stop_typing(event.source.chat_id) except Exception: pass + # Final drain/release boundary: force-flush any timer that missed + # the in-band drain before deciding whether the guard can clear. + await self._flush_text_debounce_now(session_key) # Late-arrival drain: a message may have arrived during the # cleanup awaits above (typing_task cancel, stop_typing). Such # messages passed the Level-1 guard (entry still live, Event @@ -3722,6 +3921,10 @@ class BasePlatformAdapter(ABC): self._session_tasks.clear() self._pending_messages.clear() self._active_sessions.clear() + for state in list(self._text_debounce_store().values()): + if state.task is not None and not state.task.done(): + state.task.cancel() + self._text_debounce_store().clear() def has_pending_interrupt(self, session_key: str) -> bool: """Check if there's a pending interrupt for a session.""" diff --git a/gateway/run.py b/gateway/run.py index 7e34d99138c..2e559d311f2 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -839,6 +839,8 @@ if _config_path.exists(): if _display_cfg and isinstance(_display_cfg, dict): if "busy_input_mode" in _display_cfg: os.environ["HERMES_GATEWAY_BUSY_INPUT_MODE"] = str(_display_cfg["busy_input_mode"]) + if "busy_text_mode" in _display_cfg: + os.environ["HERMES_GATEWAY_BUSY_TEXT_MODE"] = str(_display_cfg["busy_text_mode"]) if "busy_ack_enabled" in _display_cfg: os.environ["HERMES_GATEWAY_BUSY_ACK_ENABLED"] = str(_display_cfg["busy_ack_enabled"]) # Timezone: bridge config.yaml → HERMES_TIMEZONE env var. @@ -1554,6 +1556,7 @@ class GatewayRunner: # blow up on attribute access. _running_agents_ts: Dict[str, float] = {} _busy_input_mode: str = "interrupt" + _busy_text_mode: str = "interrupt" _restart_drain_timeout: float = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT _exit_code: Optional[int] = None _draining: bool = False @@ -1580,6 +1583,7 @@ class GatewayRunner: self._service_tier = self._load_service_tier() self._show_reasoning = self._load_show_reasoning() self._busy_input_mode = self._load_busy_input_mode() + self._busy_text_mode = self._load_busy_text_mode() self._restart_drain_timeout = self._load_restart_drain_timeout() self._provider_routing = self._load_provider_routing() self._fallback_model = self._load_fallback_model() @@ -2826,6 +2830,17 @@ class GatewayRunner: return "steer" return "interrupt" + @staticmethod + def _load_busy_text_mode() -> str: + """Load normal busy TEXT follow-up behavior from config/env.""" + mode = os.getenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "").strip().lower() + if not mode: + cfg = _load_gateway_runtime_config() + mode = str(cfg_get(cfg, "display", "busy_text_mode", default="") or "").strip().lower() + if mode == "interrupt": + return "interrupt" + return "queue" + @staticmethod def _load_restart_drain_timeout() -> float: """Load graceful gateway restart/stop drain timeout in seconds.""" @@ -2973,11 +2988,19 @@ class GatewayRunner: running_agent = self._running_agents.get(session_key) + effective_mode = self._busy_input_mode + busy_text_mode = getattr(self, "_busy_text_mode", "queue") + if ( + event.message_type == MessageType.TEXT + and busy_text_mode == "queue" + and effective_mode != "steer" + ): + return False + # Steer mode: inject mid-run via running_agent.steer() instead of # queueing + interrupting. If the agent isn't running yet # (sentinel) or lacks steer(), or the payload is empty, fall back # to queue semantics so nothing is lost. - effective_mode = self._busy_input_mode steered = False if effective_mode == "steer": steer_text = (event.text or "").strip() @@ -3002,7 +3025,12 @@ class GatewayRunner: # successful steer — the text already landed inside the run and # must NOT also be replayed as a next-turn user message. if not steered: - merge_pending_message_event(adapter._pending_messages, session_key, event) + merge_pending_message_event( + adapter._pending_messages, + session_key, + event, + merge_text=event.message_type == MessageType.TEXT, + ) is_queue_mode = effective_mode == "queue" is_steer_mode = effective_mode == "steer" @@ -3934,6 +3962,7 @@ class GatewayRunner: adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) adapter.set_session_store(self.session_store) adapter.set_busy_session_handler(self._handle_active_session_busy_message) + adapter._busy_text_mode = self._busy_text_mode # Try to connect logger.info("Connecting to %s...", platform.value) @@ -5546,6 +5575,7 @@ class GatewayRunner: adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) adapter.set_session_store(self.session_store) adapter.set_busy_session_handler(self._handle_active_session_busy_message) + adapter._busy_text_mode = self._busy_text_mode success = await self._connect_adapter_with_timeout(adapter, platform) if success: diff --git a/tests/gateway/test_active_session_text_merge.py b/tests/gateway/test_active_session_text_merge.py index 087f8dbabd0..05e7a36fd6b 100644 --- a/tests/gateway/test_active_session_text_merge.py +++ b/tests/gateway/test_active_session_text_merge.py @@ -1,20 +1,10 @@ -"""Regression test for #4469. +"""Regression tests for active-session TEXT follow-up queueing. -When the agent is actively running (session present in -``adapter._active_sessions``) and the user fires off multiple TEXT -follow-ups in rapid succession, the previous behaviour was a single-slot -replacement at ``gateway/platforms/base.py``: - - self._pending_messages[session_key] = event - -So three rapid messages ``A``, ``B``, ``C`` arriving while the agent was -still working on the initial turn produced a pending slot containing only -``C``; ``A`` and ``B`` were silently dropped. - -The fix routes the follow-up through ``merge_pending_message_event(..., -merge_text=True)`` so TEXT events accumulate into the existing pending -event's text instead of clobbering it. Photo / media bursts continue to -merge through the same helper (they always did). +When the agent is actively running, rapid text follow-ups should survive as +one next-turn pending message instead of clobbering each other. In +``busy_text_mode=queue`` those active follow-ups first pass through a short +debounce so bursty multi-message thoughts are merged before the active drain +hands off the next turn. """ from __future__ import annotations @@ -22,7 +12,7 @@ from __future__ import annotations import asyncio import sys import types -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -44,16 +34,27 @@ from gateway.platforms.base import ( BasePlatformAdapter, MessageEvent, MessageType, + SendResult, ) from gateway.session import SessionSource, build_session_key -def _make_event(text: str, chat_id: str = "12345") -> MessageEvent: +def _make_event( + text: str, + chat_id: str = "12345", + *, + chat_type: str = "dm", + user_id: str = "u1", + user_name: str | None = None, + thread_id: str | None = None, +) -> MessageEvent: source = SessionSource( platform=Platform.TELEGRAM, chat_id=chat_id, - chat_type="dm", - user_id="u1", + chat_type=chat_type, + user_id=user_id, + user_name=user_name, + thread_id=thread_id, ) return MessageEvent( text=text, @@ -63,27 +64,26 @@ def _make_event(text: str, chat_id: str = "12345") -> MessageEvent: ) +class _DummyAdapter(BasePlatformAdapter): # type: ignore[misc] + async def connect(self): + pass + + async def disconnect(self): + pass + + async def get_chat_info(self, chat_id): + return None + + async def send(self, *args, **kwargs): + return SendResult(success=True, message_id="x") + + +def _make_initialized_adapter() -> BasePlatformAdapter: + return _DummyAdapter(PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM) + + def _make_adapter() -> BasePlatformAdapter: - """Build a BasePlatformAdapter without running its heavy __init__. - - We only need the bits ``handle_message`` touches on the active-session - path: ``_active_sessions``, ``_pending_messages``, - ``_message_handler``, ``_busy_session_handler``, ``config``, ``platform``. - """ - - class _DummyAdapter(BasePlatformAdapter): # type: ignore[misc] - async def connect(self): - pass - - async def disconnect(self): - pass - - async def get_chat_info(self, chat_id): - return None - - async def send(self, *args, **kwargs): - return MagicMock(success=True, message_id="x", retryable=False) - + """Build a BasePlatformAdapter without running its heavy __init__.""" adapter = object.__new__(_DummyAdapter) adapter.config = PlatformConfig(enabled=True, token="***") adapter.platform = Platform.TELEGRAM @@ -100,6 +100,10 @@ def _make_adapter() -> BasePlatformAdapter: adapter._fatal_error_retryable = True adapter._fatal_error_handler = None adapter._running = True + adapter._busy_text_mode = "queue" + adapter._busy_text_debounce_seconds = 0.1 + adapter._busy_text_hard_cap_seconds = 1.0 + adapter._text_debounce = {} adapter._auto_tts_default = False adapter._auto_tts_enabled_chats = set() adapter._auto_tts_disabled_chats = set() @@ -107,39 +111,235 @@ def _make_adapter() -> BasePlatformAdapter: return adapter +def _debounced_event(adapter: BasePlatformAdapter, session_key: str) -> MessageEvent: + return adapter._text_debounce[session_key].event + + @pytest.mark.asyncio async def test_rapid_text_followups_accumulate_instead_of_replacing(): - """Three rapid TEXT follow-ups during an active session must all - survive in ``adapter._pending_messages[session_key].text``.""" + """Rapid TEXT follow-ups must all survive in the pending event.""" adapter = _make_adapter() + adapter._busy_text_mode = "" # direct-merge behavior, no debounce first = _make_event("part one") session_key = build_session_key(first.source) - - # Mark the session as active so subsequent messages take the - # "already running" branch in handle_message. adapter._active_sessions[session_key] = asyncio.Event() - second = _make_event("part two") - third = _make_event("part three") + await adapter.handle_message(_make_event("part two")) + await adapter.handle_message(_make_event("part three")) - await adapter.handle_message(second) - await adapter.handle_message(third) - - # Both rapid follow-ups must be preserved, not just the last one. pending = adapter._pending_messages[session_key] - assert pending.text == "part two\npart three", ( - f"expected accumulated text, got {pending.text!r}" + assert pending.text == "part two\npart three" + assert not adapter._active_sessions[session_key].is_set() + + +@pytest.mark.asyncio +async def test_debounce_buffers_rapid_text_then_flushes_to_pending(): + adapter = _make_adapter() + adapter._busy_text_debounce_seconds = 0.05 + + first = _make_event("part one") + session_key = build_session_key(first.source) + adapter._active_sessions[session_key] = asyncio.Event() + + await adapter.handle_message(_make_event("part two")) + assert session_key in adapter._text_debounce + assert _debounced_event(adapter, session_key).text == "part two" + assert session_key not in adapter._pending_messages + + await adapter.handle_message(_make_event("part three")) + assert _debounced_event(adapter, session_key).text == "part two\npart three" + + await asyncio.sleep(0.15) + + assert session_key not in adapter._text_debounce + assert adapter._pending_messages[session_key].text == "part two\npart three" + + +@pytest.mark.asyncio +async def test_debounce_resets_timer_on_new_arrival(): + adapter = _make_adapter() + adapter._busy_text_debounce_seconds = 0.1 + + first = _make_event("one") + session_key = build_session_key(first.source) + adapter._active_sessions[session_key] = asyncio.Event() + + await adapter.handle_message(first) + task1 = adapter._text_debounce[session_key].task + assert task1 is not None + assert not task1.done() + + await adapter.handle_message(_make_event("two")) + task2 = adapter._text_debounce[session_key].task + assert task2 is not None + assert task2 is not task1 + await asyncio.sleep(0) + assert task1.cancelled() or task1.done() + assert adapter._text_debounce[session_key].task is task2 + + await adapter.handle_message(_make_event("three")) + task3 = adapter._text_debounce[session_key].task + assert task3 is not None + assert task3 is not task2 + + await asyncio.sleep(0.2) + assert session_key not in adapter._text_debounce + assert adapter._pending_messages[session_key].text == "one\ntwo\nthree" + + +@pytest.mark.asyncio +async def test_active_drain_force_flushes_debounce_before_release(): + adapter = _make_adapter() + adapter._busy_text_debounce_seconds = 1.0 + processed: list[str] = [] + + async def _handler(event): + processed.append(event.text) + if event.text == "current": + await adapter.handle_message(_make_event("follow up")) + return None + + adapter._message_handler = _handler + current = _make_event("current") + session_key = build_session_key(current.source) + + task = asyncio.create_task(adapter._process_message_background(current, session_key)) + adapter._session_tasks[session_key] = task + await asyncio.wait_for(task, timeout=1.0) + + for _ in range(20): + if processed == ["current", "follow up"] and session_key not in adapter._active_sessions: + break + await asyncio.sleep(0.05) + + assert processed == ["current", "follow up"] + assert session_key not in adapter._text_debounce + assert session_key not in adapter._pending_messages + assert session_key not in adapter._active_sessions + + +@pytest.mark.asyncio +async def test_force_flush_cancels_timer_without_duplicate_processing(): + adapter = _make_adapter() + adapter._busy_text_debounce_seconds = 0.2 + + event = _make_event("queued once") + session_key = build_session_key(event.source) + adapter._active_sessions[session_key] = asyncio.Event() + + await adapter.handle_message(event) + timer_task = adapter._text_debounce[session_key].task + + flushed = await adapter._flush_text_debounce_now(session_key) + assert flushed is True + assert session_key not in adapter._text_debounce + assert adapter._pending_messages[session_key].text == "queued once" + + await asyncio.sleep(0.3) + assert timer_task is not None + assert timer_task.cancelled() or timer_task.done() + assert adapter._pending_messages[session_key].text == "queued once" + + +@pytest.mark.asyncio +async def test_text_debounce_does_not_merge_different_senders(): + adapter = _make_adapter() + adapter._busy_text_debounce_seconds = 1.0 + + first = _make_event( + "from alice", + chat_type="group", + user_id="alice", + user_name="Alice", + thread_id="topic-1", ) - # Interrupt event must be signalled exactly like before. - assert adapter._active_sessions[session_key].is_set() + second = _make_event( + "from bob", + chat_type="group", + user_id="bob", + user_name="Bob", + thread_id="topic-1", + ) + session_key = build_session_key(first.source) + assert session_key == build_session_key(second.source) + adapter._active_sessions[session_key] = asyncio.Event() + + await adapter.handle_message(first) + await adapter.handle_message(second) + + assert adapter._pending_messages[session_key].text == "from alice" + assert _debounced_event(adapter, session_key).text == "from bob" + + +@pytest.mark.asyncio +async def test_control_and_clarify_messages_bypass_text_debounce(): + adapter = _make_adapter() + started: list[str] = [] + + def _fake_start(event, session_key, *, interrupt_event=None): + started.append(event.text) + return True + + adapter._start_session_processing = _fake_start # type: ignore[method-assign] + + await adapter.handle_message(_make_event("/status")) + assert started == ["/status"] + assert adapter._text_debounce == {} + + answer = _make_event("clarify answer") + session_key = build_session_key(answer.source) + adapter._active_sessions[session_key] = asyncio.Event() + adapter._message_handler = AsyncMock(return_value=None) + + with patch("tools.clarify_gateway.get_pending_for_session", return_value=object()): + await adapter.handle_message(answer) + + adapter._message_handler.assert_awaited_once_with(answer) + assert session_key not in adapter._text_debounce + assert session_key not in adapter._pending_messages + + +@pytest.mark.asyncio +async def test_debounce_skipped_when_busy_text_mode_not_queue(): + adapter = _make_adapter() + adapter._busy_text_mode = "" + event = _make_event("direct merge") + session_key = build_session_key(event.source) + adapter._active_sessions[session_key] = asyncio.Event() + + await adapter.handle_message(event) + + assert adapter._pending_messages[session_key].text == "direct merge" + assert session_key not in adapter._text_debounce + + +def test_debounce_respects_env_var_override(monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_DEBOUNCE_SECONDS", "2.5") + adapter = _make_initialized_adapter() + assert adapter._busy_text_debounce_seconds == 2.5 + + +@pytest.mark.asyncio +async def test_debounce_cleanup_in_cancel_background_tasks(): + adapter = _make_adapter() + adapter._busy_text_debounce_seconds = 1.0 + + event = _make_event("cleanup test") + session_key = build_session_key(event.source) + adapter._active_sessions[session_key] = asyncio.Event() + await adapter.handle_message(event) + + assert session_key in adapter._text_debounce + + await adapter.cancel_background_tasks() + + assert session_key not in adapter._text_debounce @pytest.mark.asyncio async def test_single_followup_is_stored_as_is(): - """One TEXT follow-up still lands as the event object itself - (no spurious wrapping / mutation) — guards against the merge path - breaking the simple case.""" adapter = _make_adapter() + adapter._busy_text_mode = "" first = _make_event("only one") session_key = build_session_key(first.source) @@ -149,4 +349,29 @@ async def test_single_followup_is_stored_as_is(): pending = adapter._pending_messages[session_key] assert pending is first assert pending.text == "only one" - assert adapter._active_sessions[session_key].is_set() + assert not adapter._active_sessions[session_key].is_set() + + +def test_adapter_defaults_to_queue_mode(monkeypatch): + monkeypatch.delenv("HERMES_GATEWAY_BUSY_TEXT_MODE", raising=False) + adapter = _make_initialized_adapter() + assert adapter._busy_text_mode == "queue" + assert adapter._is_queue_text_debounce_candidate(_make_event("hello")) + + +def test_adapter_is_queue_text_debounce_candidate_by_default(): + adapter = _make_adapter() + assert adapter._is_queue_text_debounce_candidate(_make_event("hello world")) + + +def test_command_messages_bypass_debounce_even_in_queue_mode(): + adapter = _make_adapter() + assert not adapter._is_queue_text_debounce_candidate(_make_event("")) + assert not adapter._is_queue_text_debounce_candidate(_make_event("/stop")) + + +def test_busy_text_mode_respects_env_var_override(monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "interrupt") + adapter = _make_initialized_adapter() + assert adapter._busy_text_mode == "interrupt" + assert not adapter._is_queue_text_debounce_candidate(_make_event("test")) diff --git a/tests/gateway/test_base_topic_sessions.py b/tests/gateway/test_base_topic_sessions.py index a55fcb1d8ff..dd2ef3a1262 100644 --- a/tests/gateway/test_base_topic_sessions.py +++ b/tests/gateway/test_base_topic_sessions.py @@ -15,6 +15,7 @@ from gateway.session import SessionSource, build_session_key class DummyTelegramAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM) + self._busy_text_mode = "" self.sent = [] self.typing = [] self.processing_hooks = [] diff --git a/tests/gateway/test_busy_session_ack.py b/tests/gateway/test_busy_session_ack.py index b16e5ebb5f2..f13e16961e4 100644 --- a/tests/gateway/test_busy_session_ack.py +++ b/tests/gateway/test_busy_session_ack.py @@ -65,6 +65,7 @@ def _make_runner(): runner._pending_messages = {} runner._busy_ack_ts = {} runner._draining = False + runner._busy_text_mode = "interrupt" runner.adapters = {} runner.config = MagicMock() runner.session_store = None @@ -84,6 +85,8 @@ def _make_adapter(platform_val="telegram"): adapter.config = MagicMock() adapter.config.extra = {} adapter.platform = MagicMock(value=platform_val) + adapter._text_debounce = {} + adapter._busy_text_debounce_seconds = 0.6 return adapter @@ -186,6 +189,32 @@ class TestBusySessionAck: assert "respond once the current task finishes" in content assert "Interrupting" not in content + @pytest.mark.asyncio + async def test_busy_text_mode_queue_delegates_to_adapter_handle_message(self): + """busy_text_mode=queue lets the adapter debounce text silently.""" + runner, sentinel = _make_runner() + runner._busy_input_mode = "interrupt" + runner._busy_text_mode = "queue" + adapter = _make_adapter() + + first = _make_event(text="part one") + second = _make_event(text="part two") + sk = build_session_key(first.source) + + agent = MagicMock() + runner._running_agents[sk] = agent + runner.adapters[first.source.platform] = adapter + runner.adapters[second.source.platform] = adapter + + result1 = await runner._handle_active_session_busy_message(first, sk) + result2 = await runner._handle_active_session_busy_message(second, sk) + + assert result1 is False + assert result2 is False + assert sk not in adapter._pending_messages + agent.interrupt.assert_not_called() + adapter._send_with_retry.assert_not_called() + @pytest.mark.asyncio async def test_steer_mode_calls_agent_steer_no_interrupt_no_queue(self): """busy_input_mode='steer' injects via agent.steer() and skips queueing.""" diff --git a/tests/gateway/test_command_bypass_active_session.py b/tests/gateway/test_command_bypass_active_session.py index aae68b6b53f..2c0a593dc55 100644 --- a/tests/gateway/test_command_bypass_active_session.py +++ b/tests/gateway/test_command_bypass_active_session.py @@ -47,6 +47,7 @@ def _make_adapter(): """Create a minimal adapter for testing the active-session guard.""" config = PlatformConfig(enabled=True, token="test-token") adapter = _StubAdapter(config, Platform.TELEGRAM) + adapter._busy_text_mode = "" adapter.sent_responses = [] async def _mock_handler(event): diff --git a/tests/gateway/test_config_env_bridge_authority.py b/tests/gateway/test_config_env_bridge_authority.py index 26c54f1c736..a82beb397b9 100644 --- a/tests/gateway/test_config_env_bridge_authority.py +++ b/tests/gateway/test_config_env_bridge_authority.py @@ -45,6 +45,7 @@ def _run_gateway_import(hermes_home: Path, initial_env: dict[str, str]) -> dict[ "HERMES_AGENT_TIMEOUT", "HERMES_AGENT_TIMEOUT_WARNING", "HERMES_GATEWAY_BUSY_INPUT_MODE", + "HERMES_GATEWAY_BUSY_TEXT_MODE", "HERMES_TIMEZONE", ): v = os.environ.get(k) @@ -143,6 +144,15 @@ def test_config_display_busy_input_mode_wins_over_stale_env(hermes_home: Path) - assert env.get("HERMES_GATEWAY_BUSY_INPUT_MODE") == "interrupt" +def test_config_display_busy_text_mode_wins_over_stale_env(hermes_home: Path) -> None: + _write_config(hermes_home, display_cfg={"busy_text_mode": "queue"}) + _write_env(hermes_home, {"HERMES_GATEWAY_BUSY_TEXT_MODE": "interrupt"}) + + env = _run_gateway_import(hermes_home, initial_env={}) + + assert env.get("HERMES_GATEWAY_BUSY_TEXT_MODE") == "queue" + + def test_config_timezone_wins_over_stale_env(hermes_home: Path) -> None: _write_config(hermes_home, timezone="America/Los_Angeles") _write_env(hermes_home, {"HERMES_TIMEZONE": "UTC"}) diff --git a/tests/gateway/test_interrupt_key_match.py b/tests/gateway/test_interrupt_key_match.py index 445a16f7a19..3a703c0261d 100644 --- a/tests/gateway/test_interrupt_key_match.py +++ b/tests/gateway/test_interrupt_key_match.py @@ -103,6 +103,7 @@ class TestInterruptKeyConsistency: async def test_handle_message_stores_under_session_key(self): """handle_message stores pending messages under session_key, not chat_id.""" adapter = StubAdapter() + adapter._busy_text_mode = "" adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None)) source = _source("-1001234", "group") @@ -120,8 +121,8 @@ class TestInterruptKeyConsistency: # NOT stored under chat_id assert source.chat_id not in adapter._pending_messages - # Interrupt event was set - assert adapter._active_sessions[session_key].is_set() + # Text follow-ups queue silently and do not interrupt the active turn. + assert adapter._active_sessions[session_key].is_set() is False @pytest.mark.asyncio async def test_photo_followup_is_queued_without_interrupt(self): diff --git a/tests/gateway/test_restart_drain.py b/tests/gateway/test_restart_drain.py index 9000e4d4820..c1578e3617a 100644 --- a/tests/gateway/test_restart_drain.py +++ b/tests/gateway/test_restart_drain.py @@ -116,6 +116,24 @@ def test_load_busy_input_mode_prefers_env_then_config_then_default(tmp_path, mon assert gateway_run.GatewayRunner._load_busy_input_mode() == "interrupt" +def test_load_busy_text_mode_defaults_to_queue_and_allows_interrupt(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.delenv("HERMES_GATEWAY_BUSY_TEXT_MODE", raising=False) + + assert gateway_run.GatewayRunner._load_busy_text_mode() == "queue" + + (tmp_path / "config.yaml").write_text( + "display:\n busy_text_mode: interrupt\n", encoding="utf-8" + ) + assert gateway_run.GatewayRunner._load_busy_text_mode() == "interrupt" + + monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "queue") + assert gateway_run.GatewayRunner._load_busy_text_mode() == "queue" + + monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "bogus") + assert gateway_run.GatewayRunner._load_busy_text_mode() == "queue" + + def test_load_restart_drain_timeout_prefers_env_then_config_then_default( tmp_path, monkeypatch, caplog ): diff --git a/tests/gateway/test_session_split_brain_11016.py b/tests/gateway/test_session_split_brain_11016.py index 1076a77c44c..0b2972ac173 100644 --- a/tests/gateway/test_session_split_brain_11016.py +++ b/tests/gateway/test_session_split_brain_11016.py @@ -53,6 +53,7 @@ class _StubAdapter(BasePlatformAdapter): def _make_adapter(): config = PlatformConfig(enabled=True, token="test-token") adapter = _StubAdapter(config, Platform.TELEGRAM) + adapter._busy_text_mode = "" adapter.sent_responses = [] async def _mock_send_retry(chat_id, content, **kwargs): @@ -396,4 +397,3 @@ class TestOldTaskCannotClobberNewerGuard: # default path) still work. adapter._release_session_guard(sk) assert sk not in adapter._active_sessions -