mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): add staged inactivity warning before timeout escalation
Introduce gateway_timeout_warning (default 900s) as a pre-timeout alert layer. When inactivity reaches the warning threshold, a single notification is sent to the user offering to wait or reset. If inactivity continues to the gateway_timeout (default 1800s), the full timeout fires as before. This gives users a chance to intervene before work is lost on slow API providers without disabling the safety timeout entirely. Config: agent.gateway_timeout_warning in config.yaml, or HERMES_AGENT_TIMEOUT_WARNING env var (0 = disable warning).
This commit is contained in:
parent
980fadfea9
commit
092061711e
4 changed files with 352 additions and 0 deletions
|
|
@ -445,6 +445,16 @@ agent:
|
|||
# Higher = more room for complex tasks, but costs more tokens
|
||||
# Recommended: 20-30 for focused tasks, 50-100 for open exploration
|
||||
max_turns: 60
|
||||
|
||||
# Inactivity timeout for gateway agent runs (seconds, 0 = unlimited).
|
||||
# The agent can run indefinitely when actively calling tools or receiving
|
||||
# API responses. Only fires after the agent has been idle for this duration.
|
||||
# gateway_timeout: 1800
|
||||
|
||||
# Staged warning: send a warning before escalating to full timeout.
|
||||
# Fires once per run when inactivity reaches this threshold (seconds).
|
||||
# Set to 0 to disable the warning.
|
||||
# gateway_timeout_warning: 900
|
||||
|
||||
# Enable verbose logging
|
||||
verbose: false
|
||||
|
|
|
|||
|
|
@ -184,6 +184,8 @@ if _config_path.exists():
|
|||
# Env var from .env takes precedence (already in os.environ).
|
||||
if "gateway_timeout" in _agent_cfg and "HERMES_AGENT_TIMEOUT" not in os.environ:
|
||||
os.environ["HERMES_AGENT_TIMEOUT"] = str(_agent_cfg["gateway_timeout"])
|
||||
if "gateway_timeout_warning" in _agent_cfg and "HERMES_AGENT_TIMEOUT_WARNING" not in os.environ:
|
||||
os.environ["HERMES_AGENT_TIMEOUT_WARNING"] = str(_agent_cfg["gateway_timeout_warning"])
|
||||
# Timezone: bridge config.yaml → HERMES_TIMEZONE env var.
|
||||
# HERMES_TIMEZONE from .env takes precedence (already in os.environ).
|
||||
_tz_cfg = _cfg.get("timezone", "")
|
||||
|
|
@ -7114,6 +7116,9 @@ class GatewayRunner:
|
|||
# Default 1800s (30 min inactivity). 0 = unlimited.
|
||||
_agent_timeout_raw = float(os.getenv("HERMES_AGENT_TIMEOUT", 1800))
|
||||
_agent_timeout = _agent_timeout_raw if _agent_timeout_raw > 0 else None
|
||||
_agent_warning_raw = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900))
|
||||
_agent_warning = _agent_warning_raw if _agent_warning_raw > 0 else None
|
||||
_warning_fired = False
|
||||
loop = asyncio.get_event_loop()
|
||||
_executor_task = asyncio.ensure_future(
|
||||
loop.run_in_executor(None, run_sync)
|
||||
|
|
@ -7146,6 +7151,24 @@ class GatewayRunner:
|
|||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
# Staged warning: fire once before escalating to full timeout.
|
||||
if (not _warning_fired and _agent_warning is not None
|
||||
and _idle_secs >= _agent_warning):
|
||||
_warning_fired = True
|
||||
_warn_adapter = self.adapters.get(source.platform)
|
||||
if _warn_adapter:
|
||||
_warn_mins = int(_agent_warning // 60) or 1
|
||||
try:
|
||||
await _warn_adapter.send(
|
||||
source.chat_id,
|
||||
f"⚠️ No activity for {_warn_mins} min. "
|
||||
f"If the agent does not respond soon, it will "
|
||||
f"be timed out in {_warn_mins} min. "
|
||||
f"You can continue waiting or use /reset.",
|
||||
metadata=_status_thread_metadata,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Inactivity warning send error: %s", _ne)
|
||||
if _idle_secs >= _agent_timeout:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
|
|
|||
|
|
@ -230,6 +230,10 @@ DEFAULT_CONFIG = {
|
|||
# (force on/off for all models), or a list of model-name substrings
|
||||
# to match (e.g. ["gpt", "codex", "gemini", "qwen"]).
|
||||
"tool_use_enforcement": "auto",
|
||||
# Staged inactivity warning: send a warning to the user at this
|
||||
# threshold before escalating to a full timeout. The warning fires
|
||||
# once per run and does not interrupt the agent. 0 = disable warning.
|
||||
"gateway_timeout_warning": 900,
|
||||
},
|
||||
|
||||
"terminal": {
|
||||
|
|
|
|||
315
tests/gateway/test_gateway_inactivity_timeout.py
Normal file
315
tests/gateway/test_gateway_inactivity_timeout.py
Normal file
|
|
@ -0,0 +1,315 @@
|
|||
"""Tests for staged inactivity timeout in gateway agent runs.
|
||||
|
||||
Tests cover:
|
||||
- Warning fires once when inactivity reaches gateway_timeout_warning threshold
|
||||
- Warning does not fire when gateway_timeout is 0 (unlimited)
|
||||
- Warning fires only once per run, not on every poll
|
||||
- Full timeout still fires at gateway_timeout threshold
|
||||
- Warning respects HERMES_AGENT_TIMEOUT_WARNING env var
|
||||
- Warning disabled when gateway_timeout_warning is 0
|
||||
"""
|
||||
|
||||
import concurrent.futures
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
|
||||
class FakeAgent:
|
||||
"""Mock agent with controllable activity summary for timeout tests."""
|
||||
|
||||
def __init__(self, idle_seconds=0.0, activity_desc="tool_call",
|
||||
current_tool=None, api_call_count=5, max_iterations=90):
|
||||
self._idle_seconds = idle_seconds
|
||||
self._activity_desc = activity_desc
|
||||
self._current_tool = current_tool
|
||||
self._api_call_count = api_call_count
|
||||
self._max_iterations = max_iterations
|
||||
self._interrupted = False
|
||||
self._interrupt_msg = None
|
||||
|
||||
def get_activity_summary(self):
|
||||
return {
|
||||
"last_activity_ts": time.time() - self._idle_seconds,
|
||||
"last_activity_desc": self._activity_desc,
|
||||
"seconds_since_activity": self._idle_seconds,
|
||||
"current_tool": self._current_tool,
|
||||
"api_call_count": self._api_call_count,
|
||||
"max_iterations": self._max_iterations,
|
||||
}
|
||||
|
||||
def interrupt(self, msg):
|
||||
self._interrupted = True
|
||||
self._interrupt_msg = msg
|
||||
|
||||
def run_conversation(self, prompt):
|
||||
return {"final_response": "Done", "messages": []}
|
||||
|
||||
|
||||
class SlowFakeAgent(FakeAgent):
|
||||
"""Agent that runs for a while, then goes idle."""
|
||||
|
||||
def __init__(self, run_duration=0.5, idle_after=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._run_duration = run_duration
|
||||
self._idle_after = idle_after
|
||||
self._start_time = None
|
||||
|
||||
def get_activity_summary(self):
|
||||
summary = super().get_activity_summary()
|
||||
if self._idle_after is not None and self._start_time:
|
||||
elapsed = time.time() - self._start_time
|
||||
if elapsed > self._idle_after:
|
||||
idle_time = elapsed - self._idle_after
|
||||
summary["seconds_since_activity"] = idle_time
|
||||
summary["last_activity_desc"] = "api_call_streaming"
|
||||
else:
|
||||
summary["seconds_since_activity"] = 0.0
|
||||
return summary
|
||||
|
||||
def run_conversation(self, prompt):
|
||||
self._start_time = time.time()
|
||||
time.sleep(self._run_duration)
|
||||
return {"final_response": "Completed after work", "messages": []}
|
||||
|
||||
|
||||
class TestStagedInactivityWarning:
|
||||
"""Test the staged inactivity warning before full timeout."""
|
||||
|
||||
def test_warning_fires_once_before_timeout(self):
|
||||
"""Warning fires when inactivity reaches warning threshold."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=10.0,
|
||||
idle_after=0.1,
|
||||
activity_desc="api_call_streaming",
|
||||
)
|
||||
|
||||
_agent_timeout = 20.0
|
||||
_agent_warning = 5.0
|
||||
_POLL_INTERVAL = 0.1
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test prompt")
|
||||
_inactivity_timeout = False
|
||||
_warning_fired = False
|
||||
_warning_send_count = 0
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
result = future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if (not _warning_fired and _agent_warning > 0
|
||||
and _idle_secs >= _agent_warning):
|
||||
_warning_fired = True
|
||||
_warning_send_count += 1
|
||||
if _idle_secs >= _agent_timeout:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
assert _warning_fired
|
||||
assert _warning_send_count == 1
|
||||
assert not _inactivity_timeout
|
||||
|
||||
def test_warning_disabled_when_zero(self):
|
||||
"""No warning fires when gateway_timeout_warning is 0."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=5.0,
|
||||
idle_after=0.1,
|
||||
)
|
||||
|
||||
_agent_timeout = 20.0
|
||||
_agent_warning = 0.0
|
||||
_POLL_INTERVAL = 0.1
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
_warning_fired = False
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if (not _warning_fired and _agent_warning > 0
|
||||
and _idle_secs >= _agent_warning):
|
||||
_warning_fired = True
|
||||
if _idle_secs >= _agent_timeout:
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
assert not _warning_fired
|
||||
|
||||
def test_warning_fires_only_once(self):
|
||||
"""Warning fires exactly once even if agent remains idle."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=10.0,
|
||||
idle_after=0.05,
|
||||
)
|
||||
|
||||
_agent_timeout = 20.0
|
||||
_agent_warning = 0.2
|
||||
_POLL_INTERVAL = 0.05
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
_warning_count = 0
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if (not _warning_count and _agent_warning > 0
|
||||
and _idle_secs >= _agent_warning):
|
||||
_warning_count += 1
|
||||
if _idle_secs >= _agent_timeout:
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
assert _warning_count == 1
|
||||
|
||||
def test_full_timeout_still_fires_after_warning(self):
|
||||
"""Full timeout fires even after warning was sent."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=15.0,
|
||||
idle_after=0.1,
|
||||
activity_desc="waiting for provider response (streaming)",
|
||||
)
|
||||
|
||||
_agent_timeout = 1.0
|
||||
_agent_warning = 0.3
|
||||
_POLL_INTERVAL = 0.05
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
_inactivity_timeout = False
|
||||
_warning_fired = False
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if (not _warning_fired and _agent_warning > 0
|
||||
and _idle_secs >= _agent_warning):
|
||||
_warning_fired = True
|
||||
if _idle_secs >= _agent_timeout:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
assert _warning_fired
|
||||
assert _inactivity_timeout
|
||||
|
||||
def test_warning_env_var_respected(self, monkeypatch):
|
||||
"""HERMES_AGENT_TIMEOUT_WARNING env var is parsed correctly."""
|
||||
monkeypatch.setenv("HERMES_AGENT_TIMEOUT_WARNING", "600")
|
||||
_warning = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900))
|
||||
assert _warning == 600.0
|
||||
|
||||
def test_warning_zero_means_disabled(self, monkeypatch):
|
||||
"""HERMES_AGENT_TIMEOUT_WARNING=0 disables the warning."""
|
||||
monkeypatch.setenv("HERMES_AGENT_TIMEOUT_WARNING", "0")
|
||||
_raw = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900))
|
||||
_warning = _raw if _raw > 0 else None
|
||||
assert _warning is None
|
||||
|
||||
def test_unlimited_timeout_no_warning(self):
|
||||
"""When timeout is unlimited (0), no warning fires either."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=0.5,
|
||||
idle_after=0.0,
|
||||
)
|
||||
|
||||
_agent_timeout = None
|
||||
_agent_warning = 5.0
|
||||
_POLL_INTERVAL = 0.05
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
|
||||
result = future.result(timeout=2.0)
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
assert result["final_response"] == "Completed after work"
|
||||
|
||||
|
||||
class TestWarningThresholdBelowTimeout:
|
||||
"""Test that warning threshold must be less than timeout threshold."""
|
||||
|
||||
def test_warning_at_half_timeout(self):
|
||||
"""Warning fires at half the timeout duration."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=10.0,
|
||||
idle_after=0.1,
|
||||
activity_desc="receiving stream response",
|
||||
)
|
||||
|
||||
_agent_timeout = 2.0
|
||||
_agent_warning = 1.0
|
||||
_POLL_INTERVAL = 0.05
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
_warning_fired = False
|
||||
_timeout_fired = False
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if (not _warning_fired and _agent_warning > 0
|
||||
and _idle_secs >= _agent_warning):
|
||||
_warning_fired = True
|
||||
if _idle_secs >= _agent_timeout:
|
||||
_timeout_fired = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
assert _warning_fired
|
||||
assert _timeout_fired
|
||||
Loading…
Add table
Add a link
Reference in a new issue