diff --git a/gateway/run.py b/gateway/run.py index de077ede87..c85ed27b88 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -476,6 +476,13 @@ class GatewayRunner: self._honcho_managers: Dict[str, Any] = {} self._honcho_configs: Dict[str, Any] = {} + # Rate-limit compression warning messages sent to users. + # Keyed by chat_id — value is the timestamp of the last warning sent. + # Prevents the warning from firing on every message when a session + # remains above the threshold after compression. + self._compression_warn_sent: Dict[str, float] = {} + self._compression_warn_cooldown: int = 3600 # seconds (1 hour) + # Ensure tirith security scanner is available (downloads if needed) try: from tools.tirith_security import ensure_installed @@ -2400,13 +2407,18 @@ class GatewayRunner: pass # Still too large after compression — warn user + # Rate-limited to once per cooldown period per + # chat to avoid spamming on every message. if _new_tokens >= _warn_token_threshold: logger.warning( "Session hygiene: still ~%s tokens after " "compression — suggesting /reset", f"{_new_tokens:,}", ) - if _hyg_adapter: + _now = time.time() + _last_warn = self._compression_warn_sent.get(source.chat_id, 0) + if _hyg_adapter and _now - _last_warn >= self._compression_warn_cooldown: + self._compression_warn_sent[source.chat_id] = _now try: await _hyg_adapter.send( source.chat_id, @@ -2428,7 +2440,10 @@ class GatewayRunner: if _approx_tokens >= _warn_token_threshold: _hyg_adapter = self.adapters.get(source.platform) _hyg_meta = {"thread_id": source.thread_id} if source.thread_id else None - if _hyg_adapter: + _now = time.time() + _last_warn = self._compression_warn_sent.get(source.chat_id, 0) + if _hyg_adapter and _now - _last_warn >= self._compression_warn_cooldown: + self._compression_warn_sent[source.chat_id] = _now try: await _hyg_adapter.send( source.chat_id, diff --git a/run_agent.py b/run_agent.py index 13eba7fe7c..794c9f67ab 100644 --- a/run_agent.py +++ b/run_agent.py @@ -5221,11 +5221,8 @@ class AIAgent: except Exception as e: logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e) - # Reset context pressure warning and token estimate — usage drops - # after compaction. Without this, the stale last_prompt_tokens from - # the previous API call causes the pressure calculation to stay at - # >1000% and spam warnings / re-trigger compression in a loop. - self._context_pressure_warned = False + # Update token estimate after compaction so pressure calculations + # use the post-compression count, not the stale pre-compression one. _compressed_est = ( estimate_tokens_rough(new_system_prompt) + estimate_messages_tokens_rough(compressed) @@ -5233,6 +5230,16 @@ class AIAgent: self.context_compressor.last_prompt_tokens = _compressed_est self.context_compressor.last_completion_tokens = 0 + # Only reset the pressure warning if compression actually brought + # us below the warning level (85% of threshold). When compression + # can't reduce enough (e.g. threshold is very low, or system prompt + # alone exceeds the warning level), keep the flag set to prevent + # spamming the user with repeated warnings every loop iteration. + if self.context_compressor.threshold_tokens > 0: + _post_progress = _compressed_est / self.context_compressor.threshold_tokens + if _post_progress < 0.85: + self._context_pressure_warned = False + return compressed, new_system_prompt def _execute_tool_calls(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None: diff --git a/tests/gateway/test_session_hygiene.py b/tests/gateway/test_session_hygiene.py index b8ff8f8a88..843c0d4167 100644 --- a/tests/gateway/test_session_hygiene.py +++ b/tests/gateway/test_session_hygiene.py @@ -212,6 +212,49 @@ class TestSessionHygieneWarnThreshold: assert post_compress_tokens < warn_threshold +class TestCompressionWarnRateLimit: + """Compression warning messages must be rate-limited per chat_id.""" + + def _make_runner(self): + from unittest.mock import MagicMock, patch + with patch("gateway.run.load_gateway_config"), \ + patch("gateway.run.SessionStore"), \ + patch("gateway.run.DeliveryRouter"): + from gateway.run import GatewayRunner + runner = GatewayRunner.__new__(GatewayRunner) + runner._compression_warn_sent = {} + runner._compression_warn_cooldown = 3600 + return runner + + def test_first_warn_is_sent(self): + runner = self._make_runner() + now = 1_000_000.0 + last = runner._compression_warn_sent.get("chat:1", 0) + assert now - last >= runner._compression_warn_cooldown + + def test_second_warn_suppressed_within_cooldown(self): + runner = self._make_runner() + now = 1_000_000.0 + runner._compression_warn_sent["chat:1"] = now - 60 # 1 minute ago + last = runner._compression_warn_sent.get("chat:1", 0) + assert now - last < runner._compression_warn_cooldown + + def test_warn_allowed_after_cooldown(self): + runner = self._make_runner() + now = 1_000_000.0 + runner._compression_warn_sent["chat:1"] = now - 3601 # just past cooldown + last = runner._compression_warn_sent.get("chat:1", 0) + assert now - last >= runner._compression_warn_cooldown + + def test_rate_limit_is_per_chat(self): + """Rate-limiting one chat must not suppress warnings for another.""" + runner = self._make_runner() + now = 1_000_000.0 + runner._compression_warn_sent["chat:1"] = now - 60 # suppressed + last_other = runner._compression_warn_sent.get("chat:2", 0) + assert now - last_other >= runner._compression_warn_cooldown + + class TestEstimatedTokenThreshold: """Verify that hygiene thresholds are always below the model's context limit — for both actual and estimated token counts.