From c52353cf8a3e8aaa2ea710560c37b75d059efb21 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Fri, 20 Mar 2026 08:37:36 -0700 Subject: [PATCH] feat: context pressure warnings for CLI and gateway (#2159) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: context pressure warnings for CLI and gateway User-facing notifications as context approaches the compaction threshold. Warnings fire at 60% and 85% of the way to compaction — relative to the configured compression threshold, not the raw context window. CLI: Formatted line with a progress bar showing distance to compaction. Cyan at 60% (approaching), bold yellow at 85% (imminent). ◐ context ▰▰▰▰▰▰▰▰▰▰▰▰▱▱▱▱▱▱▱▱ 60% to compaction 100k threshold (50%) · approaching compaction ⚠ context ▰▰▰▰▰▰▰▰▰▰▰▰▰▰▰▰▰▱▱▱ 85% to compaction 100k threshold (50%) · compaction imminent Gateway: Plain-text notification sent to the user's chat via the new status_callback mechanism (asyncio.run_coroutine_threadsafe bridge, same pattern as step_callback). Does NOT inject into the message stream. The LLM never sees these warnings. Flags reset after each compaction cycle. Files changed: - agent/display.py — format_context_pressure(), format_context_pressure_gateway() - run_agent.py — status_callback param, _context_50/70_warned flags, _emit_context_pressure(), flag reset in _compress_context() - gateway/run.py — _status_callback_sync bridge, wired to AIAgent - tests/test_context_pressure.py — 23 tests * Merge remote-tracking branch 'origin/main' into hermes/hermes-7ea545bf --------- Co-authored-by: Test --- agent/display.py | 92 ++++++++++++ gateway/run.py | 21 +++ run_agent.py | 68 +++++++++ tests/test_context_pressure.py | 249 +++++++++++++++++++++++++++++++++ 4 files changed, 430 insertions(+) create mode 100644 tests/test_context_pressure.py diff --git a/agent/display.py b/agent/display.py index c114db0bf..28878f6f3 100644 --- a/agent/display.py +++ b/agent/display.py @@ -612,3 +612,95 @@ def write_tty(text: str) -> None: except OSError: sys.stdout.write(text) sys.stdout.flush() + + +# ========================================================================= +# Context pressure display (CLI user-facing warnings) +# ========================================================================= + +# ANSI color codes for context pressure tiers +_CYAN = "\033[36m" +_YELLOW = "\033[33m" +_BOLD = "\033[1m" +_DIM_ANSI = "\033[2m" + +# Bar characters +_BAR_FILLED = "▰" +_BAR_EMPTY = "▱" +_BAR_WIDTH = 20 + + +def format_context_pressure( + compaction_progress: float, + threshold_tokens: int, + threshold_percent: float, + compression_enabled: bool = True, +) -> str: + """Build a formatted context pressure line for CLI display. + + The bar and percentage show progress toward the compaction threshold, + NOT the raw context window. 100% = compaction fires. + + Uses ANSI colors: + - cyan at ~60% to compaction = informational + - bold yellow at ~85% to compaction = warning + + Args: + compaction_progress: How close to compaction (0.0–1.0, 1.0 = fires). + threshold_tokens: Compaction threshold in tokens. + threshold_percent: Compaction threshold as a fraction of context window. + compression_enabled: Whether auto-compression is active. + """ + pct_int = int(compaction_progress * 100) + filled = min(int(compaction_progress * _BAR_WIDTH), _BAR_WIDTH) + bar = _BAR_FILLED * filled + _BAR_EMPTY * (_BAR_WIDTH - filled) + + threshold_k = f"{threshold_tokens // 1000}k" if threshold_tokens >= 1000 else str(threshold_tokens) + threshold_pct_int = int(threshold_percent * 100) + + # Tier styling + if compaction_progress >= 0.85: + color = f"{_BOLD}{_YELLOW}" + icon = "⚠" + if compression_enabled: + hint = "compaction imminent" + else: + hint = "no auto-compaction" + else: + color = _CYAN + icon = "◐" + hint = "approaching compaction" + + return ( + f" {color}{icon} context {bar} {pct_int}% to compaction{_ANSI_RESET}" + f" {_DIM_ANSI}{threshold_k} threshold ({threshold_pct_int}%) · {hint}{_ANSI_RESET}" + ) + + +def format_context_pressure_gateway( + compaction_progress: float, + threshold_percent: float, + compression_enabled: bool = True, +) -> str: + """Build a plain-text context pressure notification for messaging platforms. + + No ANSI — just Unicode and plain text suitable for Telegram/Discord/etc. + The percentage shows progress toward the compaction threshold. + """ + pct_int = int(compaction_progress * 100) + filled = min(int(compaction_progress * _BAR_WIDTH), _BAR_WIDTH) + bar = _BAR_FILLED * filled + _BAR_EMPTY * (_BAR_WIDTH - filled) + + threshold_pct_int = int(threshold_percent * 100) + + if compaction_progress >= 0.85: + icon = "⚠️" + if compression_enabled: + hint = f"Context compaction is imminent (threshold: {threshold_pct_int}% of window)." + else: + hint = "Auto-compaction is disabled — context may be truncated." + else: + icon = "ℹ️" + hint = f"Compaction threshold is at {threshold_pct_int}% of context window." + + return f"{icon} Context: {bar} {pct_int}% to compaction\n{hint}" diff --git a/gateway/run.py b/gateway/run.py index 56efa1582..c1acf39d5 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4539,6 +4539,26 @@ class GatewayRunner: except Exception as _e: logger.debug("agent:step hook error: %s", _e) + # Bridge sync status_callback → async adapter.send for context pressure + _status_adapter = self.adapters.get(source.platform) + _status_chat_id = source.chat_id + _status_thread_metadata = {"thread_id": source.thread_id} if source.thread_id else None + + def _status_callback_sync(event_type: str, message: str) -> None: + if not _status_adapter: + return + try: + asyncio.run_coroutine_threadsafe( + _status_adapter.send( + _status_chat_id, + message, + metadata=_status_thread_metadata, + ), + _loop_for_step, + ) + except Exception as _e: + logger.debug("status_callback error (%s): %s", event_type, _e) + def run_sync(): # Pass session_key to process registry via env var so background # processes can be mapped back to this gateway session @@ -4631,6 +4651,7 @@ class GatewayRunner: tool_progress_callback=progress_callback if tool_progress_enabled else None, step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None, stream_delta_callback=_stream_delta_cb, + status_callback=_status_callback_sync, platform=platform_key, honcho_session_key=session_key, honcho_manager=honcho_manager, diff --git a/run_agent.py b/run_agent.py index 60c36101f..78948a782 100644 --- a/run_agent.py +++ b/run_agent.py @@ -400,6 +400,7 @@ class AIAgent: clarify_callback: callable = None, step_callback: callable = None, stream_delta_callback: callable = None, + status_callback: callable = None, max_tokens: int = None, reasoning_config: Dict[str, Any] = None, prefill_messages: List[Dict[str, Any]] = None, @@ -522,6 +523,7 @@ class AIAgent: self.clarify_callback = clarify_callback self.step_callback = step_callback self.stream_delta_callback = stream_delta_callback + self.status_callback = status_callback self._last_reported_tool = None # Track for "new tool" mode # Tool execution state — allows _vprint during tool execution @@ -571,6 +573,12 @@ class AIAgent: self._budget_warning_threshold = 0.9 # 90% — urgent, respond now self._budget_pressure_enabled = True + # Context pressure warnings: notify the USER (not the LLM) as context + # fills up. Purely informational — displayed in CLI output and sent via + # status_callback for gateway platforms. Does NOT inject into messages. + self._context_50_warned = False + self._context_70_warned = False + # Persistent error log -- always writes WARNING+ to ~/.hermes/logs/errors.log # so tool failures, API errors, etc. are inspectable after the fact. # In gateway mode, each incoming message creates a new AIAgent instance, @@ -4385,6 +4393,10 @@ class AIAgent: except Exception as e: logger.debug("Session DB compression split failed: %s", e) + # Reset context pressure warnings — usage drops after compaction + self._context_50_warned = False + self._context_70_warned = False + return compressed, new_system_prompt def _execute_tool_calls(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None: @@ -4965,6 +4977,45 @@ class AIAgent: ) return None + def _emit_context_pressure(self, compaction_progress: float, compressor) -> None: + """Notify the user that context is approaching the compaction threshold. + + Args: + compaction_progress: How close to compaction (0.0–1.0, where 1.0 = fires). + compressor: The ContextCompressor instance (for threshold/context info). + + Purely user-facing — does NOT modify the message stream. + For CLI: prints a formatted line with a progress bar. + For gateway: fires status_callback so the platform can send a chat message. + """ + from agent.display import format_context_pressure, format_context_pressure_gateway + + threshold_pct = compressor.threshold_tokens / compressor.context_length if compressor.context_length else 0.5 + + # CLI output — always shown (these are user-facing status notifications, + # not verbose debug output, so they bypass quiet_mode). + # Gateway users also get the callback below. + if self.platform in (None, "cli"): + line = format_context_pressure( + compaction_progress=compaction_progress, + threshold_tokens=compressor.threshold_tokens, + threshold_percent=threshold_pct, + compression_enabled=self.compression_enabled, + ) + self._safe_print(line) + + # Gateway / external consumers + if self.status_callback: + try: + msg = format_context_pressure_gateway( + compaction_progress=compaction_progress, + threshold_percent=threshold_pct, + compression_enabled=self.compression_enabled, + ) + self.status_callback("context_pressure", msg) + except Exception: + logger.debug("status_callback error in context pressure", exc_info=True) + def _handle_max_iterations(self, messages: list, api_call_count: int) -> str: """Request a summary when max iterations are reached. Returns the final response text.""" print(f"⚠️ Reached maximum iterations ({self.max_iterations}). Requesting summary...") @@ -6540,6 +6591,23 @@ class AIAgent: + _compressor.last_completion_tokens + _new_chars // 3 # conservative: JSON-heavy tool results ≈ 3 chars/token ) + + # ── Context pressure warnings (user-facing only) ────────── + # Notify the user (NOT the LLM) as context approaches the + # compaction threshold. Thresholds are relative to where + # compaction fires, not the raw context window. + # Does not inject into messages — just prints to CLI output + # and fires status_callback for gateway platforms. + if _compressor.threshold_tokens > 0: + _compaction_progress = _estimated_next_prompt / _compressor.threshold_tokens + if _compaction_progress >= 0.85 and not self._context_70_warned: + self._context_70_warned = True + self._context_50_warned = True # skip first tier if we jumped past it + self._emit_context_pressure(_compaction_progress, _compressor) + elif _compaction_progress >= 0.60 and not self._context_50_warned: + self._context_50_warned = True + self._emit_context_pressure(_compaction_progress, _compressor) + if self.compression_enabled and _compressor.should_compress(_estimated_next_prompt): messages, active_system_prompt = self._compress_context( messages, system_message, diff --git a/tests/test_context_pressure.py b/tests/test_context_pressure.py new file mode 100644 index 000000000..3d6b19026 --- /dev/null +++ b/tests/test_context_pressure.py @@ -0,0 +1,249 @@ +"""Tests for context pressure warnings (user-facing, not injected into messages). + +Covers: +- Display formatting (CLI and gateway variants) +- Flag tracking and threshold logic on AIAgent +- Flag reset after compression +- status_callback invocation +""" + +import json +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +from agent.display import format_context_pressure, format_context_pressure_gateway +from run_agent import AIAgent + + +# --------------------------------------------------------------------------- +# Display formatting tests +# --------------------------------------------------------------------------- + + +class TestFormatContextPressure: + """CLI context pressure display (agent/display.py). + + The bar shows progress toward the compaction threshold, not the + raw context window. 60% = 60% of the way to compaction. + """ + + def test_60_percent_uses_info_icon(self): + line = format_context_pressure(0.60, 100_000, 0.50) + assert "◐" in line + assert "60% to compaction" in line + + def test_85_percent_uses_warning_icon(self): + line = format_context_pressure(0.85, 100_000, 0.50) + assert "⚠" in line + assert "85% to compaction" in line + + def test_bar_length_scales_with_progress(self): + line_60 = format_context_pressure(0.60, 100_000, 0.50) + line_85 = format_context_pressure(0.85, 100_000, 0.50) + assert line_85.count("▰") > line_60.count("▰") + + def test_shows_threshold_tokens(self): + line = format_context_pressure(0.60, 100_000, 0.50) + assert "100k" in line + + def test_small_threshold(self): + line = format_context_pressure(0.60, 500, 0.50) + assert "500" in line + + def test_shows_threshold_percent(self): + line = format_context_pressure(0.85, 100_000, 0.50) + assert "50%" in line # threshold percent shown + + def test_imminent_hint_at_85(self): + line = format_context_pressure(0.85, 100_000, 0.50) + assert "compaction imminent" in line + + def test_approaching_hint_below_85(self): + line = format_context_pressure(0.60, 100_000, 0.80) + assert "approaching compaction" in line + + def test_no_compaction_when_disabled(self): + line = format_context_pressure(0.85, 100_000, 0.50, compression_enabled=False) + assert "no auto-compaction" in line + + def test_returns_string(self): + result = format_context_pressure(0.65, 128_000, 0.50) + assert isinstance(result, str) + + def test_over_100_percent_capped(self): + """Progress > 1.0 should not break the bar.""" + line = format_context_pressure(1.05, 100_000, 0.50) + assert "▰" in line + assert line.count("▰") == 20 + + +class TestFormatContextPressureGateway: + """Gateway (plain text) context pressure display.""" + + def test_60_percent_informational(self): + msg = format_context_pressure_gateway(0.60, 0.50) + assert "60% to compaction" in msg + assert "50%" in msg # threshold shown + + def test_85_percent_warning(self): + msg = format_context_pressure_gateway(0.85, 0.50) + assert "85% to compaction" in msg + assert "imminent" in msg + + def test_no_compaction_warning(self): + msg = format_context_pressure_gateway(0.85, 0.50, compression_enabled=False) + assert "disabled" in msg + + def test_no_ansi_codes(self): + msg = format_context_pressure_gateway(0.85, 0.50) + assert "\033[" not in msg + + def test_has_progress_bar(self): + msg = format_context_pressure_gateway(0.85, 0.50) + assert "▰" in msg + + +# --------------------------------------------------------------------------- +# AIAgent context pressure flag tests +# --------------------------------------------------------------------------- + + +def _make_tool_defs(*names): + return [ + { + "type": "function", + "function": { + "name": n, + "description": f"{n} tool", + "parameters": {"type": "object", "properties": {}}, + }, + } + for n in names + ] + + +@pytest.fixture() +def agent(): + """Minimal AIAgent with mocked internals.""" + with ( + patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")), + patch("run_agent.check_toolset_requirements", return_value={}), + patch("run_agent.OpenAI"), + ): + a = AIAgent( + api_key="test-key-1234567890", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + a.client = MagicMock() + return a + + +class TestContextPressureFlags: + """Context pressure warning flag tracking on AIAgent.""" + + def test_flags_initialized_false(self, agent): + assert agent._context_50_warned is False + assert agent._context_70_warned is False + + def test_emit_calls_status_callback(self, agent): + """status_callback should be invoked with event type and message.""" + cb = MagicMock() + agent.status_callback = cb + + compressor = MagicMock() + compressor.context_length = 200_000 + compressor.threshold_tokens = 100_000 # 50% + + agent._emit_context_pressure(0.85, compressor) + + cb.assert_called_once() + args = cb.call_args[0] + assert args[0] == "context_pressure" + assert "85% to compaction" in args[1] + + def test_emit_no_callback_no_crash(self, agent): + """No status_callback set — should not crash.""" + agent.status_callback = None + + compressor = MagicMock() + compressor.context_length = 200_000 + compressor.threshold_tokens = 100_000 + + # Should not raise + agent._emit_context_pressure(0.60, compressor) + + def test_emit_prints_for_cli_platform(self, agent, capsys): + """CLI platform should always print context pressure, even in quiet_mode.""" + agent.quiet_mode = True + agent.platform = "cli" + agent.status_callback = None + + compressor = MagicMock() + compressor.context_length = 200_000 + compressor.threshold_tokens = 100_000 + + agent._emit_context_pressure(0.85, compressor) + captured = capsys.readouterr() + assert "▰" in captured.out + assert "to compaction" in captured.out + + def test_emit_skips_print_for_gateway_platform(self, agent, capsys): + """Gateway platforms get the callback, not CLI print.""" + agent.platform = "telegram" + agent.status_callback = None + + compressor = MagicMock() + compressor.context_length = 200_000 + compressor.threshold_tokens = 100_000 + + agent._emit_context_pressure(0.85, compressor) + captured = capsys.readouterr() + assert "▰" not in captured.out + + def test_flags_reset_on_compression(self, agent): + """After _compress_context, context pressure flags should reset.""" + agent._context_50_warned = True + agent._context_70_warned = True + agent.compression_enabled = True + + # Mock the compressor's compress method to return minimal valid output + agent.context_compressor = MagicMock() + agent.context_compressor.compress.return_value = [ + {"role": "user", "content": "Summary of conversation so far."} + ] + agent.context_compressor.context_length = 200_000 + agent.context_compressor.threshold_tokens = 100_000 + + # Mock _todo_store + agent._todo_store = MagicMock() + agent._todo_store.format_for_injection.return_value = None + + # Mock _build_system_prompt + agent._build_system_prompt = MagicMock(return_value="system prompt") + agent._cached_system_prompt = "old system prompt" + agent._session_db = None + + messages = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "hi there"}, + ] + agent._compress_context(messages, "system prompt") + + assert agent._context_50_warned is False + assert agent._context_70_warned is False + + def test_emit_callback_error_handled(self, agent): + """If status_callback raises, it should be caught gracefully.""" + cb = MagicMock(side_effect=RuntimeError("callback boom")) + agent.status_callback = cb + + compressor = MagicMock() + compressor.context_length = 200_000 + compressor.threshold_tokens = 100_000 + + # Should not raise + agent._emit_context_pressure(0.85, compressor)