diff --git a/agent/context_compressor.py b/agent/context_compressor.py index 586d22626..d81de9479 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -1,8 +1,16 @@ """Automatic context window compression for long conversations. Self-contained class with its own OpenAI client for summarization. -Uses Gemini Flash (cheap/fast) to summarize middle turns while +Uses auxiliary model (cheap/fast) to summarize middle turns while protecting head and tail context. + +Improvements over v1: + - Structured summary template (Goal, Progress, Decisions, Files, Next Steps) + - Iterative summary updates (preserves info across multiple compactions) + - Token-budget tail protection instead of fixed message count + - Tool output pruning before LLM summarization (cheap pre-pass) + - Scaled summary budget (proportional to compressed content) + - Richer tool call/result detail in summarizer input """ import logging @@ -27,12 +35,31 @@ SUMMARY_PREFIX = ( ) LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:" +# Minimum / maximum tokens for the summary output +_MIN_SUMMARY_TOKENS = 2000 +_MAX_SUMMARY_TOKENS = 8000 +# Proportion of compressed content to allocate for summary +_SUMMARY_RATIO = 0.20 + +# Token budget for tail protection (keep most-recent context) +_DEFAULT_TAIL_TOKEN_BUDGET = 20_000 + +# Placeholder used when pruning old tool results +_PRUNED_TOOL_PLACEHOLDER = "[Old tool output cleared to save context space]" + +# Chars per token rough estimate +_CHARS_PER_TOKEN = 4 + class ContextCompressor: """Compresses conversation context when approaching the model's context limit. - Algorithm: protect first N + last N turns, summarize everything in between. - Token tracking uses actual counts from API responses for accuracy. + Algorithm: + 1. Prune old tool results (cheap, no LLM call) + 2. Protect head messages (system prompt + first exchange) + 3. Protect tail messages by token budget (most recent ~20K tokens) + 4. Summarize middle turns with structured LLM prompt + 5. On subsequent compactions, iteratively update the previous summary """ def __init__( @@ -74,6 +101,9 @@ class ContextCompressor: self.summary_model = summary_model_override or "" + # Stores the previous compaction summary for iterative updates + self._previous_summary: Optional[str] = None + def update_from_response(self, usage: Dict[str, Any]): """Update tracked token usage from API response.""" self.last_prompt_tokens = usage.get("prompt_tokens", 0) @@ -100,53 +130,204 @@ class ContextCompressor: "compression_count": self.compression_count, } - def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]: - """Generate a concise summary of conversation turns. + # ------------------------------------------------------------------ + # Tool output pruning (cheap pre-pass, no LLM call) + # ------------------------------------------------------------------ - Tries the auxiliary model first, then falls back to the user's main - model. Returns None if all attempts fail — the caller should drop + def _prune_old_tool_results( + self, messages: List[Dict[str, Any]], protect_tail_count: int, + ) -> tuple[List[Dict[str, Any]], int]: + """Replace old tool result contents with a short placeholder. + + Walks backward from the end, protecting the most recent + ``protect_tail_count`` messages. Older tool results get their + content replaced with a placeholder string. + + Returns (pruned_messages, pruned_count). + """ + if not messages: + return messages, 0 + + result = [m.copy() for m in messages] + pruned = 0 + prune_boundary = len(result) - protect_tail_count + + for i in range(prune_boundary): + msg = result[i] + if msg.get("role") != "tool": + continue + content = msg.get("content", "") + if not content or content == _PRUNED_TOOL_PLACEHOLDER: + continue + # Only prune if the content is substantial (>200 chars) + if len(content) > 200: + result[i] = {**msg, "content": _PRUNED_TOOL_PLACEHOLDER} + pruned += 1 + + return result, pruned + + # ------------------------------------------------------------------ + # Summarization + # ------------------------------------------------------------------ + + def _compute_summary_budget(self, turns_to_summarize: List[Dict[str, Any]]) -> int: + """Scale summary token budget with the amount of content being compressed.""" + content_tokens = estimate_messages_tokens_rough(turns_to_summarize) + budget = int(content_tokens * _SUMMARY_RATIO) + return max(_MIN_SUMMARY_TOKENS, min(budget, _MAX_SUMMARY_TOKENS)) + + def _serialize_for_summary(self, turns: List[Dict[str, Any]]) -> str: + """Serialize conversation turns into labeled text for the summarizer. + + Includes tool call arguments and result content (up to 3000 chars + per message) so the summarizer can preserve specific details like + file paths, commands, and outputs. + """ + parts = [] + for msg in turns: + role = msg.get("role", "unknown") + content = msg.get("content") or "" + + # Tool results: keep more content than before (3000 chars) + if role == "tool": + tool_id = msg.get("tool_call_id", "") + if len(content) > 3000: + content = content[:2000] + "\n...[truncated]...\n" + content[-800:] + parts.append(f"[TOOL RESULT {tool_id}]: {content}") + continue + + # Assistant messages: include tool call names AND arguments + if role == "assistant": + if len(content) > 3000: + content = content[:2000] + "\n...[truncated]...\n" + content[-800:] + tool_calls = msg.get("tool_calls", []) + if tool_calls: + tc_parts = [] + for tc in tool_calls: + if isinstance(tc, dict): + fn = tc.get("function", {}) + name = fn.get("name", "?") + args = fn.get("arguments", "") + # Truncate long arguments but keep enough for context + if len(args) > 500: + args = args[:400] + "..." + tc_parts.append(f" {name}({args})") + else: + fn = getattr(tc, "function", None) + name = getattr(fn, "name", "?") if fn else "?" + tc_parts.append(f" {name}(...)") + content += "\n[Tool calls:\n" + "\n".join(tc_parts) + "\n]" + parts.append(f"[ASSISTANT]: {content}") + continue + + # User and other roles + if len(content) > 3000: + content = content[:2000] + "\n...[truncated]...\n" + content[-800:] + parts.append(f"[{role.upper()}]: {content}") + + return "\n\n".join(parts) + + def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]: + """Generate a structured summary of conversation turns. + + Uses a structured template (Goal, Progress, Decisions, Files, Next Steps) + inspired by Pi-mono and OpenCode. When a previous summary exists, + generates an iterative update instead of summarizing from scratch. + + Returns None if all attempts fail — the caller should drop the middle turns without a summary rather than inject a useless placeholder. """ - parts = [] - for msg in turns_to_summarize: - role = msg.get("role", "unknown") - content = msg.get("content") or "" - if len(content) > 2000: - content = content[:1000] + "\n...[truncated]...\n" + content[-500:] - tool_calls = msg.get("tool_calls", []) - if tool_calls: - tool_names = [tc.get("function", {}).get("name", "?") for tc in tool_calls if isinstance(tc, dict)] - content += f"\n[Tool calls: {', '.join(tool_names)}]" - parts.append(f"[{role.upper()}]: {content}") + summary_budget = self._compute_summary_budget(turns_to_summarize) + content_to_summarize = self._serialize_for_summary(turns_to_summarize) - content_to_summarize = "\n\n".join(parts) - prompt = f"""Create a concise handoff summary for a later assistant that will continue this conversation after earlier turns are compacted. + if self._previous_summary: + # Iterative update: preserve existing info, add new progress + prompt = f"""You are updating a context compaction summary. A previous compaction produced the summary below. New conversation turns have occurred since then and need to be incorporated. -Describe: -1. What actions were taken (tool calls, searches, file operations) -2. Key information or results obtained -3. Important decisions, constraints, or user preferences -4. Relevant data, file names, outputs, or next steps needed to continue +PREVIOUS SUMMARY: +{self._previous_summary} -Keep it factual, concise, and focused on helping the next assistant resume without repeating work. Target ~{self.summary_target_tokens} tokens. +NEW TURNS TO INCORPORATE: +{content_to_summarize} + +Update the summary using this exact structure. PRESERVE all existing information that is still relevant. ADD new progress. Move items from "In Progress" to "Done" when completed. Remove information only if it is clearly obsolete. + +## Goal +[What the user is trying to accomplish — preserve from previous summary, update if goal evolved] + +## Constraints & Preferences +[User preferences, coding style, constraints, important decisions — accumulate across compactions] + +## Progress +### Done +[Completed work — include specific file paths, commands run, results obtained] +### In Progress +[Work currently underway] +### Blocked +[Any blockers or issues encountered] + +## Key Decisions +[Important technical decisions and why they were made] + +## Relevant Files +[Files read, modified, or created — with brief note on each. Accumulate across compactions.] + +## Next Steps +[What needs to happen next to continue the work] + +## Critical Context +[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation] + +Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions. + +Write only the summary body. Do not include any preamble or prefix.""" + else: + # First compaction: summarize from scratch + prompt = f"""Create a structured handoff summary for a later assistant that will continue this conversation after earlier turns are compacted. ---- TURNS TO SUMMARIZE: {content_to_summarize} ---- -Write only the summary body. Do not include any preamble or prefix; the system will add the handoff wrapper.""" +Use this exact structure: + +## Goal +[What the user is trying to accomplish] + +## Constraints & Preferences +[User preferences, coding style, constraints, important decisions] + +## Progress +### Done +[Completed work — include specific file paths, commands run, results obtained] +### In Progress +[Work currently underway] +### Blocked +[Any blockers or issues encountered] + +## Key Decisions +[Important technical decisions and why they were made] + +## Relevant Files +[Files read, modified, or created — with brief note on each] + +## Next Steps +[What needs to happen next to continue the work] + +## Critical Context +[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation] + +Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions. The goal is to prevent the next assistant from repeating work or losing important details. + +Write only the summary body. Do not include any preamble or prefix.""" - # Use the centralized LLM router — handles provider resolution, - # auth, and fallback internally. try: call_kwargs = { "task": "compression", "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, - "max_tokens": self.summary_target_tokens * 2, - "timeout": 30.0, + "max_tokens": summary_budget * 2, + "timeout": 45.0, } if self.summary_model: call_kwargs["model"] = self.summary_model @@ -156,6 +337,8 @@ Write only the summary body. Do not include any preamble or prefix; the system w if not isinstance(content, str): content = str(content) if content else "" summary = content.strip() + # Store for iterative updates on next compaction + self._previous_summary = summary return self._with_summary_prefix(summary) except RuntimeError: logging.warning("Context compression: no provider available for " @@ -280,10 +463,69 @@ Write only the summary body. Do not include any preamble or prefix; the system w idx = check return idx + # ------------------------------------------------------------------ + # Tail protection by token budget + # ------------------------------------------------------------------ + + def _find_tail_cut_by_tokens( + self, messages: List[Dict[str, Any]], head_end: int, + token_budget: int = _DEFAULT_TAIL_TOKEN_BUDGET, + ) -> int: + """Walk backward from the end of messages, accumulating tokens until + the budget is reached. Returns the index where the tail starts. + + Never cuts inside a tool_call/result group. Falls back to the old + ``protect_last_n`` if the budget would protect fewer messages. + """ + n = len(messages) + min_tail = self.protect_last_n + accumulated = 0 + cut_idx = n # start from beyond the end + + for i in range(n - 1, head_end - 1, -1): + msg = messages[i] + content = msg.get("content") or "" + msg_tokens = len(content) // _CHARS_PER_TOKEN + 10 # +10 for role/metadata + # Include tool call arguments in estimate + for tc in msg.get("tool_calls") or []: + if isinstance(tc, dict): + args = tc.get("function", {}).get("arguments", "") + msg_tokens += len(args) // _CHARS_PER_TOKEN + if accumulated + msg_tokens > token_budget and (n - i) >= min_tail: + break + accumulated += msg_tokens + cut_idx = i + + # Ensure we protect at least protect_last_n messages + fallback_cut = n - min_tail + if cut_idx > fallback_cut: + cut_idx = fallback_cut + + # If the token budget would protect everything (small conversations), + # fall back to the fixed protect_last_n approach so compression can + # still remove middle turns. + if cut_idx <= head_end: + cut_idx = fallback_cut + + # Align to avoid splitting tool groups + cut_idx = self._align_boundary_backward(messages, cut_idx) + + return max(cut_idx, head_end + 1) + + # ------------------------------------------------------------------ + # Main compression entry point + # ------------------------------------------------------------------ + def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]: """Compress conversation messages by summarizing middle turns. - Keeps first N + last N turns, summarizes everything in between. + Algorithm: + 1. Prune old tool results (cheap pre-pass, no LLM call) + 2. Protect head messages (system prompt + first exchange) + 3. Find tail boundary by token budget (~20K tokens of recent context) + 4. Summarize middle turns with structured LLM prompt + 5. On re-compression, iteratively update the previous summary + After compression, orphaned tool_call / tool_result pairs are cleaned up so the API never receives mismatched IDs. """ @@ -297,19 +539,26 @@ Write only the summary body. Do not include any preamble or prefix; the system w ) return messages - compress_start = self.protect_first_n - compress_end = n_messages - self.protect_last_n - if compress_start >= compress_end: - return messages + display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages) - # Adjust boundaries to avoid splitting tool_call/result groups. + # Phase 1: Prune old tool results (cheap, no LLM call) + messages, pruned_count = self._prune_old_tool_results( + messages, protect_tail_count=self.protect_last_n * 3, + ) + if pruned_count and not self.quiet_mode: + logger.info("Pre-compression: pruned %d old tool result(s)", pruned_count) + + # Phase 2: Determine boundaries + compress_start = self.protect_first_n compress_start = self._align_boundary_forward(messages, compress_start) - compress_end = self._align_boundary_backward(messages, compress_end) + + # Use token-budget tail protection instead of fixed message count + compress_end = self._find_tail_cut_by_tokens(messages, compress_start) + if compress_start >= compress_end: return messages turns_to_summarize = messages[compress_start:compress_end] - display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages) if not self.quiet_mode: logger.info( @@ -323,15 +572,20 @@ Write only the summary body. Do not include any preamble or prefix; the system w self.threshold_percent * 100, self.threshold_tokens, ) + tail_msgs = n_messages - compress_end logger.info( - "Summarizing turns %d-%d (%d turns)", + "Summarizing turns %d-%d (%d turns), protecting %d head + %d tail messages", compress_start + 1, compress_end, len(turns_to_summarize), + compress_start, + tail_msgs, ) + # Phase 3: Generate structured summary summary = self._generate_summary(turns_to_summarize) + # Phase 4: Assemble compressed message list compressed = [] for i in range(compress_start): msg = messages[i].copy()