diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 14d764d7d..af0917ded 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -445,6 +445,16 @@ agent: # Higher = more room for complex tasks, but costs more tokens # Recommended: 20-30 for focused tasks, 50-100 for open exploration max_turns: 60 + + # Inactivity timeout for gateway agent runs (seconds, 0 = unlimited). + # The agent can run indefinitely when actively calling tools or receiving + # API responses. Only fires after the agent has been idle for this duration. + # gateway_timeout: 1800 + + # Staged warning: send a warning before escalating to full timeout. + # Fires once per run when inactivity reaches this threshold (seconds). + # Set to 0 to disable the warning. + # gateway_timeout_warning: 900 # Enable verbose logging verbose: false diff --git a/gateway/run.py b/gateway/run.py index e705597ef..ddb57bd7a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -184,6 +184,8 @@ if _config_path.exists(): # Env var from .env takes precedence (already in os.environ). if "gateway_timeout" in _agent_cfg and "HERMES_AGENT_TIMEOUT" not in os.environ: os.environ["HERMES_AGENT_TIMEOUT"] = str(_agent_cfg["gateway_timeout"]) + if "gateway_timeout_warning" in _agent_cfg and "HERMES_AGENT_TIMEOUT_WARNING" not in os.environ: + os.environ["HERMES_AGENT_TIMEOUT_WARNING"] = str(_agent_cfg["gateway_timeout_warning"]) # Timezone: bridge config.yaml → HERMES_TIMEZONE env var. # HERMES_TIMEZONE from .env takes precedence (already in os.environ). _tz_cfg = _cfg.get("timezone", "") @@ -7114,6 +7116,9 @@ class GatewayRunner: # Default 1800s (30 min inactivity). 0 = unlimited. _agent_timeout_raw = float(os.getenv("HERMES_AGENT_TIMEOUT", 1800)) _agent_timeout = _agent_timeout_raw if _agent_timeout_raw > 0 else None + _agent_warning_raw = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900)) + _agent_warning = _agent_warning_raw if _agent_warning_raw > 0 else None + _warning_fired = False loop = asyncio.get_event_loop() _executor_task = asyncio.ensure_future( loop.run_in_executor(None, run_sync) @@ -7146,6 +7151,24 @@ class GatewayRunner: _idle_secs = _act.get("seconds_since_activity", 0.0) except Exception: pass + # Staged warning: fire once before escalating to full timeout. + if (not _warning_fired and _agent_warning is not None + and _idle_secs >= _agent_warning): + _warning_fired = True + _warn_adapter = self.adapters.get(source.platform) + if _warn_adapter: + _warn_mins = int(_agent_warning // 60) or 1 + try: + await _warn_adapter.send( + source.chat_id, + f"⚠️ No activity for {_warn_mins} min. " + f"If the agent does not respond soon, it will " + f"be timed out in {_warn_mins} min. " + f"You can continue waiting or use /reset.", + metadata=_status_thread_metadata, + ) + except Exception: + logger.debug("Inactivity warning send error: %s", _ne) if _idle_secs >= _agent_timeout: _inactivity_timeout = True break diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 0c39902ae..8b5da3522 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -230,6 +230,10 @@ DEFAULT_CONFIG = { # (force on/off for all models), or a list of model-name substrings # to match (e.g. ["gpt", "codex", "gemini", "qwen"]). "tool_use_enforcement": "auto", + # Staged inactivity warning: send a warning to the user at this + # threshold before escalating to a full timeout. The warning fires + # once per run and does not interrupt the agent. 0 = disable warning. + "gateway_timeout_warning": 900, }, "terminal": { diff --git a/tests/gateway/test_gateway_inactivity_timeout.py b/tests/gateway/test_gateway_inactivity_timeout.py new file mode 100644 index 000000000..598f33817 --- /dev/null +++ b/tests/gateway/test_gateway_inactivity_timeout.py @@ -0,0 +1,315 @@ +"""Tests for staged inactivity timeout in gateway agent runs. + +Tests cover: +- Warning fires once when inactivity reaches gateway_timeout_warning threshold +- Warning does not fire when gateway_timeout is 0 (unlimited) +- Warning fires only once per run, not on every poll +- Full timeout still fires at gateway_timeout threshold +- Warning respects HERMES_AGENT_TIMEOUT_WARNING env var +- Warning disabled when gateway_timeout_warning is 0 +""" + +import concurrent.futures +import os +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +class FakeAgent: + """Mock agent with controllable activity summary for timeout tests.""" + + def __init__(self, idle_seconds=0.0, activity_desc="tool_call", + current_tool=None, api_call_count=5, max_iterations=90): + self._idle_seconds = idle_seconds + self._activity_desc = activity_desc + self._current_tool = current_tool + self._api_call_count = api_call_count + self._max_iterations = max_iterations + self._interrupted = False + self._interrupt_msg = None + + def get_activity_summary(self): + return { + "last_activity_ts": time.time() - self._idle_seconds, + "last_activity_desc": self._activity_desc, + "seconds_since_activity": self._idle_seconds, + "current_tool": self._current_tool, + "api_call_count": self._api_call_count, + "max_iterations": self._max_iterations, + } + + def interrupt(self, msg): + self._interrupted = True + self._interrupt_msg = msg + + def run_conversation(self, prompt): + return {"final_response": "Done", "messages": []} + + +class SlowFakeAgent(FakeAgent): + """Agent that runs for a while, then goes idle.""" + + def __init__(self, run_duration=0.5, idle_after=None, **kwargs): + super().__init__(**kwargs) + self._run_duration = run_duration + self._idle_after = idle_after + self._start_time = None + + def get_activity_summary(self): + summary = super().get_activity_summary() + if self._idle_after is not None and self._start_time: + elapsed = time.time() - self._start_time + if elapsed > self._idle_after: + idle_time = elapsed - self._idle_after + summary["seconds_since_activity"] = idle_time + summary["last_activity_desc"] = "api_call_streaming" + else: + summary["seconds_since_activity"] = 0.0 + return summary + + def run_conversation(self, prompt): + self._start_time = time.time() + time.sleep(self._run_duration) + return {"final_response": "Completed after work", "messages": []} + + +class TestStagedInactivityWarning: + """Test the staged inactivity warning before full timeout.""" + + def test_warning_fires_once_before_timeout(self): + """Warning fires when inactivity reaches warning threshold.""" + agent = SlowFakeAgent( + run_duration=10.0, + idle_after=0.1, + activity_desc="api_call_streaming", + ) + + _agent_timeout = 20.0 + _agent_warning = 5.0 + _POLL_INTERVAL = 0.1 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test prompt") + _inactivity_timeout = False + _warning_fired = False + _warning_send_count = 0 + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + result = future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if (not _warning_fired and _agent_warning > 0 + and _idle_secs >= _agent_warning): + _warning_fired = True + _warning_send_count += 1 + if _idle_secs >= _agent_timeout: + _inactivity_timeout = True + break + + pool.shutdown(wait=False, cancel_futures=True) + + assert _warning_fired + assert _warning_send_count == 1 + assert not _inactivity_timeout + + def test_warning_disabled_when_zero(self): + """No warning fires when gateway_timeout_warning is 0.""" + agent = SlowFakeAgent( + run_duration=5.0, + idle_after=0.1, + ) + + _agent_timeout = 20.0 + _agent_warning = 0.0 + _POLL_INTERVAL = 0.1 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + _warning_fired = False + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if (not _warning_fired and _agent_warning > 0 + and _idle_secs >= _agent_warning): + _warning_fired = True + if _idle_secs >= _agent_timeout: + break + + pool.shutdown(wait=False, cancel_futures=True) + assert not _warning_fired + + def test_warning_fires_only_once(self): + """Warning fires exactly once even if agent remains idle.""" + agent = SlowFakeAgent( + run_duration=10.0, + idle_after=0.05, + ) + + _agent_timeout = 20.0 + _agent_warning = 0.2 + _POLL_INTERVAL = 0.05 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + _warning_count = 0 + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if (not _warning_count and _agent_warning > 0 + and _idle_secs >= _agent_warning): + _warning_count += 1 + if _idle_secs >= _agent_timeout: + break + + pool.shutdown(wait=False, cancel_futures=True) + assert _warning_count == 1 + + def test_full_timeout_still_fires_after_warning(self): + """Full timeout fires even after warning was sent.""" + agent = SlowFakeAgent( + run_duration=15.0, + idle_after=0.1, + activity_desc="waiting for provider response (streaming)", + ) + + _agent_timeout = 1.0 + _agent_warning = 0.3 + _POLL_INTERVAL = 0.05 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + _inactivity_timeout = False + _warning_fired = False + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if (not _warning_fired and _agent_warning > 0 + and _idle_secs >= _agent_warning): + _warning_fired = True + if _idle_secs >= _agent_timeout: + _inactivity_timeout = True + break + + pool.shutdown(wait=False, cancel_futures=True) + assert _warning_fired + assert _inactivity_timeout + + def test_warning_env_var_respected(self, monkeypatch): + """HERMES_AGENT_TIMEOUT_WARNING env var is parsed correctly.""" + monkeypatch.setenv("HERMES_AGENT_TIMEOUT_WARNING", "600") + _warning = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900)) + assert _warning == 600.0 + + def test_warning_zero_means_disabled(self, monkeypatch): + """HERMES_AGENT_TIMEOUT_WARNING=0 disables the warning.""" + monkeypatch.setenv("HERMES_AGENT_TIMEOUT_WARNING", "0") + _raw = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900)) + _warning = _raw if _raw > 0 else None + assert _warning is None + + def test_unlimited_timeout_no_warning(self): + """When timeout is unlimited (0), no warning fires either.""" + agent = SlowFakeAgent( + run_duration=0.5, + idle_after=0.0, + ) + + _agent_timeout = None + _agent_warning = 5.0 + _POLL_INTERVAL = 0.05 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + + result = future.result(timeout=2.0) + pool.shutdown(wait=False) + + assert result["final_response"] == "Completed after work" + + +class TestWarningThresholdBelowTimeout: + """Test that warning threshold must be less than timeout threshold.""" + + def test_warning_at_half_timeout(self): + """Warning fires at half the timeout duration.""" + agent = SlowFakeAgent( + run_duration=10.0, + idle_after=0.1, + activity_desc="receiving stream response", + ) + + _agent_timeout = 2.0 + _agent_warning = 1.0 + _POLL_INTERVAL = 0.05 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + _warning_fired = False + _timeout_fired = False + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if (not _warning_fired and _agent_warning > 0 + and _idle_secs >= _agent_warning): + _warning_fired = True + if _idle_secs >= _agent_timeout: + _timeout_fired = True + break + + pool.shutdown(wait=False, cancel_futures=True) + assert _warning_fired + assert _timeout_fired