diff --git a/gateway/run.py b/gateway/run.py index dba4df6fd..faeddbf35 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -900,159 +900,187 @@ class GatewayRunner: # every new message rehydrates an oversized transcript, causing # repeated truncation/context failures. Detect this early and # compress proactively — before the agent even starts. (#628) + # + # Thresholds are derived from the SAME compression config the + # agent uses (compression.threshold × model context length) so + # CLI and messaging platforms behave identically. # ----------------------------------------------------------------- if history and len(history) >= 4: - from agent.model_metadata import estimate_messages_tokens_rough + from agent.model_metadata import ( + estimate_messages_tokens_rough, + get_model_context_length, + ) - # Read thresholds from config.yaml → session_hygiene section - _hygiene_cfg = {} + # Read model + compression config from config.yaml — same + # source of truth the agent itself uses. + _hyg_model = "anthropic/claude-sonnet-4.6" + _hyg_threshold_pct = 0.85 + _hyg_compression_enabled = True try: _hyg_cfg_path = _hermes_home / "config.yaml" if _hyg_cfg_path.exists(): import yaml as _hyg_yaml with open(_hyg_cfg_path) as _hyg_f: _hyg_data = _hyg_yaml.safe_load(_hyg_f) or {} - _hygiene_cfg = _hyg_data.get("session_hygiene", {}) - if not isinstance(_hygiene_cfg, dict): - _hygiene_cfg = {} + + # Resolve model name (same logic as run_sync) + _model_cfg = _hyg_data.get("model", {}) + if isinstance(_model_cfg, str): + _hyg_model = _model_cfg + elif isinstance(_model_cfg, dict): + _hyg_model = _model_cfg.get("default", _hyg_model) + + # Read compression settings + _comp_cfg = _hyg_data.get("compression", {}) + if isinstance(_comp_cfg, dict): + _hyg_threshold_pct = float( + _comp_cfg.get("threshold", _hyg_threshold_pct) + ) + _hyg_compression_enabled = str( + _comp_cfg.get("enabled", True) + ).lower() in ("true", "1", "yes") except Exception: pass - _compress_token_threshold = int( - _hygiene_cfg.get("auto_compress_tokens", 100_000) - ) - _compress_msg_threshold = int( - _hygiene_cfg.get("auto_compress_messages", 200) - ) - _warn_token_threshold = int( - _hygiene_cfg.get("warn_tokens", 200_000) + # Also check env overrides (same as run_agent.py) + _hyg_threshold_pct = float( + os.getenv("CONTEXT_COMPRESSION_THRESHOLD", str(_hyg_threshold_pct)) ) + if os.getenv("CONTEXT_COMPRESSION_ENABLED", "").lower() in ("false", "0", "no"): + _hyg_compression_enabled = False - _msg_count = len(history) - _approx_tokens = estimate_messages_tokens_rough(history) - - _needs_compress = ( - _approx_tokens >= _compress_token_threshold - or _msg_count >= _compress_msg_threshold - ) - - if _needs_compress: - logger.info( - "Session hygiene: %s messages, ~%s tokens — auto-compressing " - "(thresholds: %s msgs / %s tokens)", - _msg_count, f"{_approx_tokens:,}", - _compress_msg_threshold, f"{_compress_token_threshold:,}", + if _hyg_compression_enabled: + _hyg_context_length = get_model_context_length(_hyg_model) + _compress_token_threshold = int( + _hyg_context_length * _hyg_threshold_pct ) + # Warn if still huge after compression (95% of context) + _warn_token_threshold = int(_hyg_context_length * 0.95) + + _msg_count = len(history) + _approx_tokens = estimate_messages_tokens_rough(history) + + _needs_compress = _approx_tokens >= _compress_token_threshold + + if _needs_compress: + logger.info( + "Session hygiene: %s messages, ~%s tokens — auto-compressing " + "(threshold: %s%% of %s = %s tokens)", + _msg_count, f"{_approx_tokens:,}", + int(_hyg_threshold_pct * 100), + f"{_hyg_context_length:,}", + f"{_compress_token_threshold:,}", + ) + + _hyg_adapter = self.adapters.get(source.platform) + if _hyg_adapter: + try: + await _hyg_adapter.send( + source.chat_id, + f"🗜️ Session is large ({_msg_count} messages, " + f"~{_approx_tokens:,} tokens). Auto-compressing..." + ) + except Exception: + pass - _hyg_adapter = self.adapters.get(source.platform) - if _hyg_adapter: try: - await _hyg_adapter.send( - source.chat_id, - f"🗜️ Session is large ({_msg_count} messages, " - f"~{_approx_tokens:,} tokens). Auto-compressing..." - ) - except Exception: - pass + from run_agent import AIAgent - try: - from run_agent import AIAgent + _hyg_runtime = _resolve_runtime_agent_kwargs() + if _hyg_runtime.get("api_key"): + _hyg_msgs = [ + {"role": m.get("role"), "content": m.get("content")} + for m in history + if m.get("role") in ("user", "assistant") + and m.get("content") + ] - _hyg_runtime = _resolve_runtime_agent_kwargs() - if _hyg_runtime.get("api_key"): - _hyg_msgs = [ - {"role": m.get("role"), "content": m.get("content")} - for m in history - if m.get("role") in ("user", "assistant") - and m.get("content") - ] - - if len(_hyg_msgs) >= 4: - _hyg_agent = AIAgent( - **_hyg_runtime, - max_iterations=4, - quiet_mode=True, - enabled_toolsets=["memory"], - session_id=session_entry.session_id, - ) - - loop = asyncio.get_event_loop() - _compressed, _ = await loop.run_in_executor( - None, - lambda: _hyg_agent._compress_context( - _hyg_msgs, "", - approx_tokens=_approx_tokens, - ), - ) - - self.session_store.rewrite_transcript( - session_entry.session_id, _compressed - ) - history = _compressed - _new_count = len(_compressed) - _new_tokens = estimate_messages_tokens_rough( - _compressed - ) - - logger.info( - "Session hygiene: compressed %s → %s msgs, " - "~%s → ~%s tokens", - _msg_count, _new_count, - f"{_approx_tokens:,}", f"{_new_tokens:,}", - ) - - if _hyg_adapter: - try: - await _hyg_adapter.send( - source.chat_id, - f"🗜️ Compressed: {_msg_count} → " - f"{_new_count} messages, " - f"~{_approx_tokens:,} → " - f"~{_new_tokens:,} tokens" - ) - except Exception: - pass - - # Still too large after compression — warn user - if _new_tokens >= _warn_token_threshold: - logger.warning( - "Session hygiene: still ~%s tokens after " - "compression — suggesting /reset", - f"{_new_tokens:,}", + if len(_hyg_msgs) >= 4: + _hyg_agent = AIAgent( + **_hyg_runtime, + max_iterations=4, + quiet_mode=True, + enabled_toolsets=["memory"], + session_id=session_entry.session_id, ) + + loop = asyncio.get_event_loop() + _compressed, _ = await loop.run_in_executor( + None, + lambda: _hyg_agent._compress_context( + _hyg_msgs, "", + approx_tokens=_approx_tokens, + ), + ) + + self.session_store.rewrite_transcript( + session_entry.session_id, _compressed + ) + history = _compressed + _new_count = len(_compressed) + _new_tokens = estimate_messages_tokens_rough( + _compressed + ) + + logger.info( + "Session hygiene: compressed %s → %s msgs, " + "~%s → ~%s tokens", + _msg_count, _new_count, + f"{_approx_tokens:,}", f"{_new_tokens:,}", + ) + if _hyg_adapter: try: await _hyg_adapter.send( source.chat_id, - "⚠️ Session is still very large " - "after compression " - f"(~{_new_tokens:,} tokens). " - "Consider using /reset to start " - "fresh if you experience issues." + f"🗜️ Compressed: {_msg_count} → " + f"{_new_count} messages, " + f"~{_approx_tokens:,} → " + f"~{_new_tokens:,} tokens" ) except Exception: pass - except Exception as e: - logger.warning( - "Session hygiene auto-compress failed: %s", e - ) - # Compression failed and session is dangerously large - if _approx_tokens >= _warn_token_threshold: - _hyg_adapter = self.adapters.get(source.platform) - if _hyg_adapter: - try: - await _hyg_adapter.send( - source.chat_id, - f"⚠️ Session is very large " - f"({_msg_count} messages, " - f"~{_approx_tokens:,} tokens) and " - "auto-compression failed. Consider " - "using /compress or /reset to avoid " - "issues." - ) - except Exception: - pass + # Still too large after compression — warn user + if _new_tokens >= _warn_token_threshold: + logger.warning( + "Session hygiene: still ~%s tokens after " + "compression — suggesting /reset", + f"{_new_tokens:,}", + ) + if _hyg_adapter: + try: + await _hyg_adapter.send( + source.chat_id, + "⚠️ Session is still very large " + "after compression " + f"(~{_new_tokens:,} tokens). " + "Consider using /reset to start " + "fresh if you experience issues." + ) + except Exception: + pass + + except Exception as e: + logger.warning( + "Session hygiene auto-compress failed: %s", e + ) + # Compression failed and session is dangerously large + if _approx_tokens >= _warn_token_threshold: + _hyg_adapter = self.adapters.get(source.platform) + if _hyg_adapter: + try: + await _hyg_adapter.send( + source.chat_id, + f"⚠️ Session is very large " + f"({_msg_count} messages, " + f"~{_approx_tokens:,} tokens) and " + "auto-compression failed. Consider " + "using /compress or /reset to avoid " + "issues." + ) + except Exception: + pass # First-message onboarding -- only on the very first interaction ever if not history and not self.session_store.has_any_sessions(): diff --git a/tests/gateway/test_session_hygiene.py b/tests/gateway/test_session_hygiene.py index b357d5861..9ac7b8029 100644 --- a/tests/gateway/test_session_hygiene.py +++ b/tests/gateway/test_session_hygiene.py @@ -2,6 +2,10 @@ Verifies that the gateway detects pathologically large transcripts and triggers auto-compression before running the agent. (#628) + +The hygiene system uses the SAME compression config as the agent: + compression.threshold × model context length +so CLI and messaging platforms behave identically. """ import pytest @@ -38,75 +42,113 @@ def _make_large_history_tokens(target_tokens: int) -> list: # --------------------------------------------------------------------------- -# Detection threshold tests +# Detection threshold tests (model-aware, unified with compression config) # --------------------------------------------------------------------------- class TestSessionHygieneThresholds: - """Test that the threshold logic correctly identifies large sessions.""" + """Test that the threshold logic correctly identifies large sessions. + + Thresholds are derived from model context length × compression threshold, + matching what the agent's ContextCompressor uses. + """ def test_small_session_below_thresholds(self): """A 10-message session should not trigger compression.""" history = _make_history(10) - msg_count = len(history) approx_tokens = estimate_messages_tokens_rough(history) - compress_token_threshold = 100_000 - compress_msg_threshold = 200 + # For a 200k-context model at 85% threshold = 170k + context_length = 200_000 + threshold_pct = 0.85 + compress_token_threshold = int(context_length * threshold_pct) - needs_compress = ( - approx_tokens >= compress_token_threshold - or msg_count >= compress_msg_threshold - ) + needs_compress = approx_tokens >= compress_token_threshold assert not needs_compress - def test_large_message_count_triggers(self): - """200+ messages should trigger compression even if tokens are low.""" - history = _make_history(250, content_size=10) - msg_count = len(history) - - compress_msg_threshold = 200 - needs_compress = msg_count >= compress_msg_threshold - assert needs_compress - def test_large_token_count_triggers(self): - """High token count should trigger compression even if message count is low.""" - # 50 messages with huge content to exceed 100K tokens - history = _make_history(50, content_size=10_000) + """High token count should trigger compression when exceeding model threshold.""" + # Build a history that exceeds 85% of a 200k model (170k tokens) + history = _make_large_history_tokens(180_000) approx_tokens = estimate_messages_tokens_rough(history) - compress_token_threshold = 100_000 + context_length = 200_000 + threshold_pct = 0.85 + compress_token_threshold = int(context_length * threshold_pct) + needs_compress = approx_tokens >= compress_token_threshold assert needs_compress - def test_under_both_thresholds_no_trigger(self): - """Session under both thresholds should not trigger.""" - history = _make_history(100, content_size=100) - msg_count = len(history) + def test_under_threshold_no_trigger(self): + """Session under threshold should not trigger, even with many messages.""" + # 250 short messages — lots of messages but well under token threshold + history = _make_history(250, content_size=10) approx_tokens = estimate_messages_tokens_rough(history) - compress_token_threshold = 100_000 - compress_msg_threshold = 200 + # 200k model at 85% = 170k token threshold + context_length = 200_000 + threshold_pct = 0.85 + compress_token_threshold = int(context_length * threshold_pct) - needs_compress = ( - approx_tokens >= compress_token_threshold - or msg_count >= compress_msg_threshold + needs_compress = approx_tokens >= compress_token_threshold + assert not needs_compress, ( + f"250 short messages (~{approx_tokens} tokens) should NOT trigger " + f"compression at {compress_token_threshold} token threshold" ) + + def test_message_count_alone_does_not_trigger(self): + """Message count alone should NOT trigger — only token count matters. + + The old system used an OR of token-count and message-count thresholds, + which caused premature compression in tool-heavy sessions with 200+ + messages but low total tokens. + """ + # 300 very short messages — old system would compress, new should not + history = _make_history(300, content_size=10) + approx_tokens = estimate_messages_tokens_rough(history) + + context_length = 200_000 + threshold_pct = 0.85 + compress_token_threshold = int(context_length * threshold_pct) + + # Token-based check only + needs_compress = approx_tokens >= compress_token_threshold assert not needs_compress - def test_custom_thresholds(self): - """Custom thresholds from config should be respected.""" - history = _make_history(60, content_size=100) - msg_count = len(history) + def test_threshold_scales_with_model(self): + """Different models should have different compression thresholds.""" + # 128k model at 85% = 108,800 tokens + small_model_threshold = int(128_000 * 0.85) + # 200k model at 85% = 170,000 tokens + large_model_threshold = int(200_000 * 0.85) + # 1M model at 85% = 850,000 tokens + huge_model_threshold = int(1_000_000 * 0.85) - # Custom lower threshold - compress_msg_threshold = 50 - needs_compress = msg_count >= compress_msg_threshold - assert needs_compress + # A session at ~120k tokens: + history = _make_large_history_tokens(120_000) + approx_tokens = estimate_messages_tokens_rough(history) - # Custom higher threshold - compress_msg_threshold = 100 - needs_compress = msg_count >= compress_msg_threshold - assert not needs_compress + # Should trigger for 128k model + assert approx_tokens >= small_model_threshold + # Should NOT trigger for 200k model + assert approx_tokens < large_model_threshold + # Should NOT trigger for 1M model + assert approx_tokens < huge_model_threshold + + def test_custom_threshold_percentage(self): + """Custom threshold percentage from config should be respected.""" + context_length = 200_000 + + # At 50% threshold = 100k + low_threshold = int(context_length * 0.50) + # At 90% threshold = 180k + high_threshold = int(context_length * 0.90) + + history = _make_large_history_tokens(150_000) + approx_tokens = estimate_messages_tokens_rough(history) + + # Should trigger at 50% but not at 90% + assert approx_tokens >= low_threshold + assert approx_tokens < high_threshold def test_minimum_message_guard(self): """Sessions with fewer than 4 messages should never trigger.""" @@ -117,18 +159,19 @@ class TestSessionHygieneThresholds: class TestSessionHygieneWarnThreshold: - """Test the post-compression warning threshold.""" + """Test the post-compression warning threshold (95% of context).""" def test_warn_when_still_large(self): - """If compressed result is still above warn_tokens, should warn.""" - # Simulate post-compression tokens - warn_threshold = 200_000 - post_compress_tokens = 250_000 + """If compressed result is still above 95% of context, should warn.""" + context_length = 200_000 + warn_threshold = int(context_length * 0.95) # 190k + post_compress_tokens = 195_000 assert post_compress_tokens >= warn_threshold def test_no_warn_when_under(self): - """If compressed result is under warn_tokens, no warning.""" - warn_threshold = 200_000 + """If compressed result is under 95% of context, no warning.""" + context_length = 200_000 + warn_threshold = int(context_length * 0.95) # 190k post_compress_tokens = 150_000 assert post_compress_tokens < warn_threshold @@ -150,10 +193,12 @@ class TestTokenEstimation: assert estimate_messages_tokens_rough(many) > estimate_messages_tokens_rough(few) def test_pathological_session_detected(self): - """The reported pathological case: 648 messages, ~299K tokens.""" - # Simulate a 648-message session averaging ~460 tokens per message + """The reported pathological case: 648 messages, ~299K tokens. + + With a 200k model at 85% threshold (170k), this should trigger. + """ history = _make_history(648, content_size=1800) tokens = estimate_messages_tokens_rough(history) - # Should be well above the 100K default threshold - assert tokens > 100_000 - assert len(history) > 200 + # Should be well above the 170K threshold for a 200k model + threshold = int(200_000 * 0.85) + assert tokens > threshold