mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
After compression, models (especially Kimi 2.5) would sometimes respond
to questions from the summary instead of the latest user message. This
happened ~30% of the time on Telegram.
Root cause: the summary's 'Next Steps' section read as active instructions,
and the SUMMARY_PREFIX didn't explicitly tell the model to ignore questions
in the summary. When the summary merged into the first tail message, there
was no clear separator between historical context and the actual user message.
Changes inspired by competitor analysis (Claude Code, OpenCode, Codex):
1. SUMMARY_PREFIX rewritten with explicit 'Do NOT answer questions from
this summary — respond ONLY to the latest user message AFTER it'
2. Summarizer preamble (shared by both prompts) adds:
- 'Do NOT respond to any questions' (from OpenCode's approach)
- 'Different assistant' framing (from Codex) to create psychological
distance between summary content and active conversation
3. New summary sections:
- '## Resolved Questions' — tracks already-answered questions with
their answers, preventing re-answering (from Claude Code's
'Pending user asks' pattern)
- '## Pending User Asks' — explicitly marks unanswered questions
- '## Remaining Work' replaces '## Next Steps' — passive framing
avoids reading as active instructions
4. merge-summary-into-tail path now inserts a clear separator:
'--- END OF CONTEXT SUMMARY — respond to the message below ---'
5. Iterative update prompt now instructs: 'Move answered questions to
Resolved Questions' to maintain the resolved/pending distinction
across multiple compactions.
809 lines
36 KiB
Python
809 lines
36 KiB
Python
"""Automatic context window compression for long conversations.
|
|
|
|
Self-contained class with its own OpenAI client for summarization.
|
|
Uses auxiliary model (cheap/fast) to summarize middle turns while
|
|
protecting head and tail context.
|
|
|
|
Improvements over v2:
|
|
- Structured summary template with Resolved/Pending question tracking
|
|
- Summarizer preamble: "Do not respond to any questions" (from OpenCode)
|
|
- Handoff framing: "different assistant" (from Codex) to create separation
|
|
- "Remaining Work" replaces "Next Steps" to avoid reading as active instructions
|
|
- Clear separator when summary merges into tail message
|
|
- Iterative summary updates (preserves info across multiple compactions)
|
|
- Token-budget tail protection instead of fixed message count
|
|
- Tool output pruning before LLM summarization (cheap pre-pass)
|
|
- Scaled summary budget (proportional to compressed content)
|
|
- Richer tool call/result detail in summarizer input
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from agent.auxiliary_client import call_llm
|
|
from agent.context_engine import ContextEngine
|
|
from agent.model_metadata import (
|
|
MINIMUM_CONTEXT_LENGTH,
|
|
get_model_context_length,
|
|
estimate_messages_tokens_rough,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SUMMARY_PREFIX = (
|
|
"[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted "
|
|
"into the summary below. This is a handoff from a previous context "
|
|
"window — treat it as background reference, NOT as active instructions. "
|
|
"Do NOT answer questions or fulfill requests mentioned in this summary; "
|
|
"they were already addressed. Respond ONLY to the latest user message "
|
|
"that appears AFTER this summary. The current session state (files, "
|
|
"config, etc.) may reflect work described here — avoid repeating it:"
|
|
)
|
|
LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:"
|
|
|
|
# Minimum tokens for the summary output
|
|
_MIN_SUMMARY_TOKENS = 2000
|
|
# Proportion of compressed content to allocate for summary
|
|
_SUMMARY_RATIO = 0.20
|
|
# Absolute ceiling for summary tokens (even on very large context windows)
|
|
_SUMMARY_TOKENS_CEILING = 12_000
|
|
|
|
# Placeholder used when pruning old tool results
|
|
_PRUNED_TOOL_PLACEHOLDER = "[Old tool output cleared to save context space]"
|
|
|
|
# Chars per token rough estimate
|
|
_CHARS_PER_TOKEN = 4
|
|
_SUMMARY_FAILURE_COOLDOWN_SECONDS = 600
|
|
|
|
|
|
class ContextCompressor(ContextEngine):
|
|
"""Default context engine — compresses conversation context via lossy summarization.
|
|
|
|
Algorithm:
|
|
1. Prune old tool results (cheap, no LLM call)
|
|
2. Protect head messages (system prompt + first exchange)
|
|
3. Protect tail messages by token budget (most recent ~20K tokens)
|
|
4. Summarize middle turns with structured LLM prompt
|
|
5. On subsequent compactions, iteratively update the previous summary
|
|
"""
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "compressor"
|
|
|
|
def on_session_reset(self) -> None:
|
|
"""Reset all per-session state for /new or /reset."""
|
|
super().on_session_reset()
|
|
self._context_probed = False
|
|
self._context_probe_persistable = False
|
|
self._previous_summary = None
|
|
|
|
def update_model(
|
|
self,
|
|
model: str,
|
|
context_length: int,
|
|
base_url: str = "",
|
|
api_key: str = "",
|
|
provider: str = "",
|
|
) -> None:
|
|
"""Update model info after a model switch or fallback activation."""
|
|
self.model = model
|
|
self.base_url = base_url
|
|
self.api_key = api_key
|
|
self.provider = provider
|
|
self.context_length = context_length
|
|
self.threshold_tokens = max(
|
|
int(context_length * self.threshold_percent),
|
|
MINIMUM_CONTEXT_LENGTH,
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
model: str,
|
|
threshold_percent: float = 0.50,
|
|
protect_first_n: int = 3,
|
|
protect_last_n: int = 20,
|
|
summary_target_ratio: float = 0.20,
|
|
quiet_mode: bool = False,
|
|
summary_model_override: str = None,
|
|
base_url: str = "",
|
|
api_key: str = "",
|
|
config_context_length: int | None = None,
|
|
provider: str = "",
|
|
):
|
|
self.model = model
|
|
self.base_url = base_url
|
|
self.api_key = api_key
|
|
self.provider = provider
|
|
self.threshold_percent = threshold_percent
|
|
self.protect_first_n = protect_first_n
|
|
self.protect_last_n = protect_last_n
|
|
self.summary_target_ratio = max(0.10, min(summary_target_ratio, 0.80))
|
|
self.quiet_mode = quiet_mode
|
|
|
|
self.context_length = get_model_context_length(
|
|
model, base_url=base_url, api_key=api_key,
|
|
config_context_length=config_context_length,
|
|
provider=provider,
|
|
)
|
|
# Floor: never compress below MINIMUM_CONTEXT_LENGTH tokens even if
|
|
# the percentage would suggest a lower value. This prevents premature
|
|
# compression on large-context models at 50% while keeping the % sane
|
|
# for models right at the minimum.
|
|
self.threshold_tokens = max(
|
|
int(self.context_length * threshold_percent),
|
|
MINIMUM_CONTEXT_LENGTH,
|
|
)
|
|
self.compression_count = 0
|
|
|
|
# Derive token budgets: ratio is relative to the threshold, not total context
|
|
target_tokens = int(self.threshold_tokens * self.summary_target_ratio)
|
|
self.tail_token_budget = target_tokens
|
|
self.max_summary_tokens = min(
|
|
int(self.context_length * 0.05), _SUMMARY_TOKENS_CEILING,
|
|
)
|
|
|
|
if not quiet_mode:
|
|
logger.info(
|
|
"Context compressor initialized: model=%s context_length=%d "
|
|
"threshold=%d (%.0f%%) target_ratio=%.0f%% tail_budget=%d "
|
|
"provider=%s base_url=%s",
|
|
model, self.context_length, self.threshold_tokens,
|
|
threshold_percent * 100, self.summary_target_ratio * 100,
|
|
self.tail_token_budget,
|
|
provider or "none", base_url or "none",
|
|
)
|
|
self._context_probed = False # True after a step-down from context error
|
|
|
|
self.last_prompt_tokens = 0
|
|
self.last_completion_tokens = 0
|
|
|
|
self.summary_model = summary_model_override or ""
|
|
|
|
# Stores the previous compaction summary for iterative updates
|
|
self._previous_summary: Optional[str] = None
|
|
self._summary_failure_cooldown_until: float = 0.0
|
|
|
|
def update_from_response(self, usage: Dict[str, Any]):
|
|
"""Update tracked token usage from API response."""
|
|
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
|
|
self.last_completion_tokens = usage.get("completion_tokens", 0)
|
|
|
|
def should_compress(self, prompt_tokens: int = None) -> bool:
|
|
"""Check if context exceeds the compression threshold."""
|
|
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
|
|
return tokens >= self.threshold_tokens
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tool output pruning (cheap pre-pass, no LLM call)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _prune_old_tool_results(
|
|
self, messages: List[Dict[str, Any]], protect_tail_count: int,
|
|
protect_tail_tokens: int | None = None,
|
|
) -> tuple[List[Dict[str, Any]], int]:
|
|
"""Replace old tool result contents with a short placeholder.
|
|
|
|
Walks backward from the end, protecting the most recent messages that
|
|
fall within ``protect_tail_tokens`` (when provided) OR the last
|
|
``protect_tail_count`` messages (backward-compatible default).
|
|
When both are given, the token budget takes priority and the message
|
|
count acts as a hard minimum floor.
|
|
|
|
Returns (pruned_messages, pruned_count).
|
|
"""
|
|
if not messages:
|
|
return messages, 0
|
|
|
|
result = [m.copy() for m in messages]
|
|
pruned = 0
|
|
|
|
# Determine the prune boundary
|
|
if protect_tail_tokens is not None and protect_tail_tokens > 0:
|
|
# Token-budget approach: walk backward accumulating tokens
|
|
accumulated = 0
|
|
boundary = len(result)
|
|
min_protect = min(protect_tail_count, len(result) - 1)
|
|
for i in range(len(result) - 1, -1, -1):
|
|
msg = result[i]
|
|
content_len = len(msg.get("content") or "")
|
|
msg_tokens = content_len // _CHARS_PER_TOKEN + 10
|
|
for tc in msg.get("tool_calls") or []:
|
|
if isinstance(tc, dict):
|
|
args = tc.get("function", {}).get("arguments", "")
|
|
msg_tokens += len(args) // _CHARS_PER_TOKEN
|
|
if accumulated + msg_tokens > protect_tail_tokens and (len(result) - i) >= min_protect:
|
|
boundary = i
|
|
break
|
|
accumulated += msg_tokens
|
|
boundary = i
|
|
prune_boundary = max(boundary, len(result) - min_protect)
|
|
else:
|
|
prune_boundary = len(result) - protect_tail_count
|
|
|
|
for i in range(prune_boundary):
|
|
msg = result[i]
|
|
if msg.get("role") != "tool":
|
|
continue
|
|
content = msg.get("content", "")
|
|
if not content or content == _PRUNED_TOOL_PLACEHOLDER:
|
|
continue
|
|
# Only prune if the content is substantial (>200 chars)
|
|
if len(content) > 200:
|
|
result[i] = {**msg, "content": _PRUNED_TOOL_PLACEHOLDER}
|
|
pruned += 1
|
|
|
|
return result, pruned
|
|
|
|
# ------------------------------------------------------------------
|
|
# Summarization
|
|
# ------------------------------------------------------------------
|
|
|
|
def _compute_summary_budget(self, turns_to_summarize: List[Dict[str, Any]]) -> int:
|
|
"""Scale summary token budget with the amount of content being compressed.
|
|
|
|
The maximum scales with the model's context window (5% of context,
|
|
capped at ``_SUMMARY_TOKENS_CEILING``) so large-context models get
|
|
richer summaries instead of being hard-capped at 8K tokens.
|
|
"""
|
|
content_tokens = estimate_messages_tokens_rough(turns_to_summarize)
|
|
budget = int(content_tokens * _SUMMARY_RATIO)
|
|
return max(_MIN_SUMMARY_TOKENS, min(budget, self.max_summary_tokens))
|
|
|
|
# Truncation limits for the summarizer input. These bound how much of
|
|
# each message the summary model sees — the budget is the *summary*
|
|
# model's context window, not the main model's.
|
|
_CONTENT_MAX = 6000 # total chars per message body
|
|
_CONTENT_HEAD = 4000 # chars kept from the start
|
|
_CONTENT_TAIL = 1500 # chars kept from the end
|
|
_TOOL_ARGS_MAX = 1500 # tool call argument chars
|
|
_TOOL_ARGS_HEAD = 1200 # kept from the start of tool args
|
|
|
|
def _serialize_for_summary(self, turns: List[Dict[str, Any]]) -> str:
|
|
"""Serialize conversation turns into labeled text for the summarizer.
|
|
|
|
Includes tool call arguments and result content (up to
|
|
``_CONTENT_MAX`` chars per message) so the summarizer can preserve
|
|
specific details like file paths, commands, and outputs.
|
|
"""
|
|
parts = []
|
|
for msg in turns:
|
|
role = msg.get("role", "unknown")
|
|
content = msg.get("content") or ""
|
|
|
|
# Tool results: keep enough content for the summarizer
|
|
if role == "tool":
|
|
tool_id = msg.get("tool_call_id", "")
|
|
if len(content) > self._CONTENT_MAX:
|
|
content = content[:self._CONTENT_HEAD] + "\n...[truncated]...\n" + content[-self._CONTENT_TAIL:]
|
|
parts.append(f"[TOOL RESULT {tool_id}]: {content}")
|
|
continue
|
|
|
|
# Assistant messages: include tool call names AND arguments
|
|
if role == "assistant":
|
|
if len(content) > self._CONTENT_MAX:
|
|
content = content[:self._CONTENT_HEAD] + "\n...[truncated]...\n" + content[-self._CONTENT_TAIL:]
|
|
tool_calls = msg.get("tool_calls", [])
|
|
if tool_calls:
|
|
tc_parts = []
|
|
for tc in tool_calls:
|
|
if isinstance(tc, dict):
|
|
fn = tc.get("function", {})
|
|
name = fn.get("name", "?")
|
|
args = fn.get("arguments", "")
|
|
# Truncate long arguments but keep enough for context
|
|
if len(args) > self._TOOL_ARGS_MAX:
|
|
args = args[:self._TOOL_ARGS_HEAD] + "..."
|
|
tc_parts.append(f" {name}({args})")
|
|
else:
|
|
fn = getattr(tc, "function", None)
|
|
name = getattr(fn, "name", "?") if fn else "?"
|
|
tc_parts.append(f" {name}(...)")
|
|
content += "\n[Tool calls:\n" + "\n".join(tc_parts) + "\n]"
|
|
parts.append(f"[ASSISTANT]: {content}")
|
|
continue
|
|
|
|
# User and other roles
|
|
if len(content) > self._CONTENT_MAX:
|
|
content = content[:self._CONTENT_HEAD] + "\n...[truncated]...\n" + content[-self._CONTENT_TAIL:]
|
|
parts.append(f"[{role.upper()}]: {content}")
|
|
|
|
return "\n\n".join(parts)
|
|
|
|
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]], focus_topic: str = None) -> Optional[str]:
|
|
"""Generate a structured summary of conversation turns.
|
|
|
|
Uses a structured template (Goal, Progress, Decisions, Resolved/Pending
|
|
Questions, Files, Remaining Work) with explicit preamble telling the
|
|
summarizer not to answer questions. When a previous summary exists,
|
|
generates an iterative update instead of summarizing from scratch.
|
|
|
|
Args:
|
|
focus_topic: Optional focus string for guided compression. When
|
|
provided, the summariser prioritises preserving information
|
|
related to this topic and is more aggressive about compressing
|
|
everything else. Inspired by Claude Code's ``/compact``.
|
|
|
|
Returns None if all attempts fail — the caller should drop
|
|
the middle turns without a summary rather than inject a useless
|
|
placeholder.
|
|
"""
|
|
now = time.monotonic()
|
|
if now < self._summary_failure_cooldown_until:
|
|
logger.debug(
|
|
"Skipping context summary during cooldown (%.0fs remaining)",
|
|
self._summary_failure_cooldown_until - now,
|
|
)
|
|
return None
|
|
|
|
summary_budget = self._compute_summary_budget(turns_to_summarize)
|
|
content_to_summarize = self._serialize_for_summary(turns_to_summarize)
|
|
|
|
# Preamble shared by both first-compaction and iterative-update prompts.
|
|
# Inspired by OpenCode's "do not respond to any questions" instruction
|
|
# and Codex's "another language model" framing.
|
|
_summarizer_preamble = (
|
|
"You are a summarization agent creating a context checkpoint. "
|
|
"Your output will be injected as reference material for a DIFFERENT "
|
|
"assistant that continues the conversation. "
|
|
"Do NOT respond to any questions or requests in the conversation — "
|
|
"only output the structured summary. "
|
|
"Do NOT include any preamble, greeting, or prefix."
|
|
)
|
|
|
|
# Shared structured template (used by both paths).
|
|
# Key changes vs v1:
|
|
# - "Pending User Asks" section (from Claude Code) explicitly tracks
|
|
# unanswered questions so the model knows what's resolved vs open
|
|
# - "Remaining Work" replaces "Next Steps" to avoid reading as active
|
|
# instructions
|
|
# - "Resolved Questions" makes it clear which questions were already
|
|
# answered (prevents model from re-answering them)
|
|
_template_sections = f"""## Goal
|
|
[What the user is trying to accomplish]
|
|
|
|
## Constraints & Preferences
|
|
[User preferences, coding style, constraints, important decisions]
|
|
|
|
## Progress
|
|
### Done
|
|
[Completed work — include specific file paths, commands run, results obtained]
|
|
### In Progress
|
|
[Work currently underway]
|
|
### Blocked
|
|
[Any blockers or issues encountered]
|
|
|
|
## Key Decisions
|
|
[Important technical decisions and why they were made]
|
|
|
|
## Resolved Questions
|
|
[Questions the user asked that were ALREADY answered — include the answer so the next assistant does not re-answer them]
|
|
|
|
## Pending User Asks
|
|
[Questions or requests from the user that have NOT yet been answered or fulfilled. If none, write "None."]
|
|
|
|
## Relevant Files
|
|
[Files read, modified, or created — with brief note on each]
|
|
|
|
## Remaining Work
|
|
[What remains to be done — framed as context, not instructions]
|
|
|
|
## Critical Context
|
|
[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation]
|
|
|
|
## Tools & Patterns
|
|
[Which tools were used, how they were used effectively, and any tool-specific discoveries]
|
|
|
|
Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions.
|
|
|
|
Write only the summary body. Do not include any preamble or prefix."""
|
|
|
|
if self._previous_summary:
|
|
# Iterative update: preserve existing info, add new progress
|
|
prompt = f"""{_summarizer_preamble}
|
|
|
|
You are updating a context compaction summary. A previous compaction produced the summary below. New conversation turns have occurred since then and need to be incorporated.
|
|
|
|
PREVIOUS SUMMARY:
|
|
{self._previous_summary}
|
|
|
|
NEW TURNS TO INCORPORATE:
|
|
{content_to_summarize}
|
|
|
|
Update the summary using this exact structure. PRESERVE all existing information that is still relevant. ADD new progress. Move items from "In Progress" to "Done" when completed. Move answered questions to "Resolved Questions". Remove information only if it is clearly obsolete.
|
|
|
|
{_template_sections}"""
|
|
else:
|
|
# First compaction: summarize from scratch
|
|
prompt = f"""{_summarizer_preamble}
|
|
|
|
Create a structured handoff summary for a different assistant that will continue this conversation after earlier turns are compacted. The next assistant should be able to understand what happened without re-reading the original turns.
|
|
|
|
TURNS TO SUMMARIZE:
|
|
{content_to_summarize}
|
|
|
|
Use this exact structure:
|
|
|
|
{_template_sections}"""
|
|
|
|
# Inject focus topic guidance when the user provides one via /compress <focus>.
|
|
# This goes at the end of the prompt so it takes precedence.
|
|
if focus_topic:
|
|
prompt += f"""
|
|
|
|
FOCUS TOPIC: "{focus_topic}"
|
|
The user has requested that this compaction PRIORITISE preserving all information related to the focus topic above. For content related to "{focus_topic}", include full detail — exact values, file paths, command outputs, error messages, and decisions. For content NOT related to the focus topic, summarise more aggressively (brief one-liners or omit if truly irrelevant). The focus topic sections should receive roughly 60-70% of the summary token budget."""
|
|
|
|
try:
|
|
call_kwargs = {
|
|
"task": "compression",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": summary_budget * 2,
|
|
# timeout resolved from auxiliary.compression.timeout config by call_llm
|
|
}
|
|
if self.summary_model:
|
|
call_kwargs["model"] = self.summary_model
|
|
response = call_llm(**call_kwargs)
|
|
content = response.choices[0].message.content
|
|
# Handle cases where content is not a string (e.g., dict from llama.cpp)
|
|
if not isinstance(content, str):
|
|
content = str(content) if content else ""
|
|
summary = content.strip()
|
|
# Store for iterative updates on next compaction
|
|
self._previous_summary = summary
|
|
self._summary_failure_cooldown_until = 0.0
|
|
return self._with_summary_prefix(summary)
|
|
except RuntimeError:
|
|
self._summary_failure_cooldown_until = time.monotonic() + _SUMMARY_FAILURE_COOLDOWN_SECONDS
|
|
logging.warning("Context compression: no provider available for "
|
|
"summary. Middle turns will be dropped without summary "
|
|
"for %d seconds.",
|
|
_SUMMARY_FAILURE_COOLDOWN_SECONDS)
|
|
return None
|
|
except Exception as e:
|
|
self._summary_failure_cooldown_until = time.monotonic() + _SUMMARY_FAILURE_COOLDOWN_SECONDS
|
|
logging.warning(
|
|
"Failed to generate context summary: %s. "
|
|
"Further summary attempts paused for %d seconds.",
|
|
e,
|
|
_SUMMARY_FAILURE_COOLDOWN_SECONDS,
|
|
)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _with_summary_prefix(summary: str) -> str:
|
|
"""Normalize summary text to the current compaction handoff format."""
|
|
text = (summary or "").strip()
|
|
for prefix in (LEGACY_SUMMARY_PREFIX, SUMMARY_PREFIX):
|
|
if text.startswith(prefix):
|
|
text = text[len(prefix):].lstrip()
|
|
break
|
|
return f"{SUMMARY_PREFIX}\n{text}" if text else SUMMARY_PREFIX
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tool-call / tool-result pair integrity helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _get_tool_call_id(tc) -> str:
|
|
"""Extract the call ID from a tool_call entry (dict or SimpleNamespace)."""
|
|
if isinstance(tc, dict):
|
|
return tc.get("id", "")
|
|
return getattr(tc, "id", "") or ""
|
|
|
|
def _sanitize_tool_pairs(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Fix orphaned tool_call / tool_result pairs after compression.
|
|
|
|
Two failure modes:
|
|
1. A tool *result* references a call_id whose assistant tool_call was
|
|
removed (summarized/truncated). The API rejects this with
|
|
"No tool call found for function call output with call_id ...".
|
|
2. An assistant message has tool_calls whose results were dropped.
|
|
The API rejects this because every tool_call must be followed by
|
|
a tool result with the matching call_id.
|
|
|
|
This method removes orphaned results and inserts stub results for
|
|
orphaned calls so the message list is always well-formed.
|
|
"""
|
|
surviving_call_ids: set = set()
|
|
for msg in messages:
|
|
if msg.get("role") == "assistant":
|
|
for tc in msg.get("tool_calls") or []:
|
|
cid = self._get_tool_call_id(tc)
|
|
if cid:
|
|
surviving_call_ids.add(cid)
|
|
|
|
result_call_ids: set = set()
|
|
for msg in messages:
|
|
if msg.get("role") == "tool":
|
|
cid = msg.get("tool_call_id")
|
|
if cid:
|
|
result_call_ids.add(cid)
|
|
|
|
# 1. Remove tool results whose call_id has no matching assistant tool_call
|
|
orphaned_results = result_call_ids - surviving_call_ids
|
|
if orphaned_results:
|
|
messages = [
|
|
m for m in messages
|
|
if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results)
|
|
]
|
|
if not self.quiet_mode:
|
|
logger.info("Compression sanitizer: removed %d orphaned tool result(s)", len(orphaned_results))
|
|
|
|
# 2. Add stub results for assistant tool_calls whose results were dropped
|
|
missing_results = surviving_call_ids - result_call_ids
|
|
if missing_results:
|
|
patched: List[Dict[str, Any]] = []
|
|
for msg in messages:
|
|
patched.append(msg)
|
|
if msg.get("role") == "assistant":
|
|
for tc in msg.get("tool_calls") or []:
|
|
cid = self._get_tool_call_id(tc)
|
|
if cid in missing_results:
|
|
patched.append({
|
|
"role": "tool",
|
|
"content": "[Result from earlier conversation — see context summary above]",
|
|
"tool_call_id": cid,
|
|
})
|
|
messages = patched
|
|
if not self.quiet_mode:
|
|
logger.info("Compression sanitizer: added %d stub tool result(s)", len(missing_results))
|
|
|
|
return messages
|
|
|
|
def _align_boundary_forward(self, messages: List[Dict[str, Any]], idx: int) -> int:
|
|
"""Push a compress-start boundary forward past any orphan tool results.
|
|
|
|
If ``messages[idx]`` is a tool result, slide forward until we hit a
|
|
non-tool message so we don't start the summarised region mid-group.
|
|
"""
|
|
while idx < len(messages) and messages[idx].get("role") == "tool":
|
|
idx += 1
|
|
return idx
|
|
|
|
def _align_boundary_backward(self, messages: List[Dict[str, Any]], idx: int) -> int:
|
|
"""Pull a compress-end boundary backward to avoid splitting a
|
|
tool_call / result group.
|
|
|
|
If the boundary falls in the middle of a tool-result group (i.e.
|
|
there are consecutive tool messages before ``idx``), walk backward
|
|
past all of them to find the parent assistant message. If found,
|
|
move the boundary before the assistant so the entire
|
|
assistant + tool_results group is included in the summarised region
|
|
rather than being split (which causes silent data loss when
|
|
``_sanitize_tool_pairs`` removes the orphaned tail results).
|
|
"""
|
|
if idx <= 0 or idx >= len(messages):
|
|
return idx
|
|
# Walk backward past consecutive tool results
|
|
check = idx - 1
|
|
while check >= 0 and messages[check].get("role") == "tool":
|
|
check -= 1
|
|
# If we landed on the parent assistant with tool_calls, pull the
|
|
# boundary before it so the whole group gets summarised together.
|
|
if check >= 0 and messages[check].get("role") == "assistant" and messages[check].get("tool_calls"):
|
|
idx = check
|
|
return idx
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tail protection by token budget
|
|
# ------------------------------------------------------------------
|
|
|
|
def _find_tail_cut_by_tokens(
|
|
self, messages: List[Dict[str, Any]], head_end: int,
|
|
token_budget: int | None = None,
|
|
) -> int:
|
|
"""Walk backward from the end of messages, accumulating tokens until
|
|
the budget is reached. Returns the index where the tail starts.
|
|
|
|
``token_budget`` defaults to ``self.tail_token_budget`` which is
|
|
derived from ``summary_target_ratio * context_length``, so it
|
|
scales automatically with the model's context window.
|
|
|
|
Token budget is the primary criterion. A hard minimum of 3 messages
|
|
is always protected, but the budget is allowed to exceed by up to
|
|
1.5x to avoid cutting inside an oversized message (tool output, file
|
|
read, etc.). If even the minimum 3 messages exceed 1.5x the budget
|
|
the cut is placed right after the head so compression still runs.
|
|
|
|
Never cuts inside a tool_call/result group.
|
|
"""
|
|
if token_budget is None:
|
|
token_budget = self.tail_token_budget
|
|
n = len(messages)
|
|
# Hard minimum: always keep at least 3 messages in the tail
|
|
min_tail = min(3, n - head_end - 1) if n - head_end > 1 else 0
|
|
soft_ceiling = int(token_budget * 1.5)
|
|
accumulated = 0
|
|
cut_idx = n # start from beyond the end
|
|
|
|
for i in range(n - 1, head_end - 1, -1):
|
|
msg = messages[i]
|
|
content = msg.get("content") or ""
|
|
msg_tokens = len(content) // _CHARS_PER_TOKEN + 10 # +10 for role/metadata
|
|
# Include tool call arguments in estimate
|
|
for tc in msg.get("tool_calls") or []:
|
|
if isinstance(tc, dict):
|
|
args = tc.get("function", {}).get("arguments", "")
|
|
msg_tokens += len(args) // _CHARS_PER_TOKEN
|
|
# Stop once we exceed the soft ceiling (unless we haven't hit min_tail yet)
|
|
if accumulated + msg_tokens > soft_ceiling and (n - i) >= min_tail:
|
|
break
|
|
accumulated += msg_tokens
|
|
cut_idx = i
|
|
|
|
# Ensure we protect at least min_tail messages
|
|
fallback_cut = n - min_tail
|
|
if cut_idx > fallback_cut:
|
|
cut_idx = fallback_cut
|
|
|
|
# If the token budget would protect everything (small conversations),
|
|
# force a cut after the head so compression can still remove middle turns.
|
|
if cut_idx <= head_end:
|
|
cut_idx = max(fallback_cut, head_end + 1)
|
|
|
|
# Align to avoid splitting tool groups
|
|
cut_idx = self._align_boundary_backward(messages, cut_idx)
|
|
|
|
return max(cut_idx, head_end + 1)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Main compression entry point
|
|
# ------------------------------------------------------------------
|
|
|
|
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None, focus_topic: str = None) -> List[Dict[str, Any]]:
|
|
"""Compress conversation messages by summarizing middle turns.
|
|
|
|
Algorithm:
|
|
1. Prune old tool results (cheap pre-pass, no LLM call)
|
|
2. Protect head messages (system prompt + first exchange)
|
|
3. Find tail boundary by token budget (~20K tokens of recent context)
|
|
4. Summarize middle turns with structured LLM prompt
|
|
5. On re-compression, iteratively update the previous summary
|
|
|
|
After compression, orphaned tool_call / tool_result pairs are cleaned
|
|
up so the API never receives mismatched IDs.
|
|
|
|
Args:
|
|
focus_topic: Optional focus string for guided compression. When
|
|
provided, the summariser will prioritise preserving information
|
|
related to this topic and be more aggressive about compressing
|
|
everything else. Inspired by Claude Code's ``/compact``.
|
|
"""
|
|
n_messages = len(messages)
|
|
# Only need head + 3 tail messages minimum (token budget decides the real tail size)
|
|
_min_for_compress = self.protect_first_n + 3 + 1
|
|
if n_messages <= _min_for_compress:
|
|
if not self.quiet_mode:
|
|
logger.warning(
|
|
"Cannot compress: only %d messages (need > %d)",
|
|
n_messages, _min_for_compress,
|
|
)
|
|
return messages
|
|
|
|
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
|
|
|
|
# Phase 1: Prune old tool results (cheap, no LLM call)
|
|
messages, pruned_count = self._prune_old_tool_results(
|
|
messages, protect_tail_count=self.protect_last_n,
|
|
protect_tail_tokens=self.tail_token_budget,
|
|
)
|
|
if pruned_count and not self.quiet_mode:
|
|
logger.info("Pre-compression: pruned %d old tool result(s)", pruned_count)
|
|
|
|
# Phase 2: Determine boundaries
|
|
compress_start = self.protect_first_n
|
|
compress_start = self._align_boundary_forward(messages, compress_start)
|
|
|
|
# Use token-budget tail protection instead of fixed message count
|
|
compress_end = self._find_tail_cut_by_tokens(messages, compress_start)
|
|
|
|
if compress_start >= compress_end:
|
|
return messages
|
|
|
|
turns_to_summarize = messages[compress_start:compress_end]
|
|
|
|
if not self.quiet_mode:
|
|
logger.info(
|
|
"Context compression triggered (%d tokens >= %d threshold)",
|
|
display_tokens,
|
|
self.threshold_tokens,
|
|
)
|
|
logger.info(
|
|
"Model context limit: %d tokens (%.0f%% = %d)",
|
|
self.context_length,
|
|
self.threshold_percent * 100,
|
|
self.threshold_tokens,
|
|
)
|
|
tail_msgs = n_messages - compress_end
|
|
logger.info(
|
|
"Summarizing turns %d-%d (%d turns), protecting %d head + %d tail messages",
|
|
compress_start + 1,
|
|
compress_end,
|
|
len(turns_to_summarize),
|
|
compress_start,
|
|
tail_msgs,
|
|
)
|
|
|
|
# Phase 3: Generate structured summary
|
|
summary = self._generate_summary(turns_to_summarize, focus_topic=focus_topic)
|
|
|
|
# Phase 4: Assemble compressed message list
|
|
compressed = []
|
|
for i in range(compress_start):
|
|
msg = messages[i].copy()
|
|
if i == 0 and msg.get("role") == "system" and self.compression_count == 0:
|
|
msg["content"] = (
|
|
(msg.get("content") or "")
|
|
+ "\n\n[Note: Some earlier conversation turns have been compacted into a handoff summary to preserve context space. The current session state may still reflect earlier work, so build on that summary and state rather than re-doing work.]"
|
|
)
|
|
compressed.append(msg)
|
|
|
|
# If LLM summary failed, insert a static fallback so the model
|
|
# knows context was lost rather than silently dropping everything.
|
|
if not summary:
|
|
if not self.quiet_mode:
|
|
logger.warning("Summary generation failed — inserting static fallback context marker")
|
|
n_dropped = compress_end - compress_start
|
|
summary = (
|
|
f"{SUMMARY_PREFIX}\n"
|
|
f"Summary generation was unavailable. {n_dropped} conversation turns were "
|
|
f"removed to free context space but could not be summarized. The removed "
|
|
f"turns contained earlier work in this session. Continue based on the "
|
|
f"recent messages below and the current state of any files or resources."
|
|
)
|
|
|
|
_merge_summary_into_tail = False
|
|
last_head_role = messages[compress_start - 1].get("role", "user") if compress_start > 0 else "user"
|
|
first_tail_role = messages[compress_end].get("role", "user") if compress_end < n_messages else "user"
|
|
# Pick a role that avoids consecutive same-role with both neighbors.
|
|
# Priority: avoid colliding with head (already committed), then tail.
|
|
if last_head_role in ("assistant", "tool"):
|
|
summary_role = "user"
|
|
else:
|
|
summary_role = "assistant"
|
|
# If the chosen role collides with the tail AND flipping wouldn't
|
|
# collide with the head, flip it.
|
|
if summary_role == first_tail_role:
|
|
flipped = "assistant" if summary_role == "user" else "user"
|
|
if flipped != last_head_role:
|
|
summary_role = flipped
|
|
else:
|
|
# Both roles would create consecutive same-role messages
|
|
# (e.g. head=assistant, tail=user — neither role works).
|
|
# Merge the summary into the first tail message instead
|
|
# of inserting a standalone message that breaks alternation.
|
|
_merge_summary_into_tail = True
|
|
if not _merge_summary_into_tail:
|
|
compressed.append({"role": summary_role, "content": summary})
|
|
|
|
for i in range(compress_end, n_messages):
|
|
msg = messages[i].copy()
|
|
if _merge_summary_into_tail and i == compress_end:
|
|
original = msg.get("content") or ""
|
|
msg["content"] = (
|
|
summary
|
|
+ "\n\n--- END OF CONTEXT SUMMARY — "
|
|
"respond to the message below, not the summary above ---\n\n"
|
|
+ original
|
|
)
|
|
_merge_summary_into_tail = False
|
|
compressed.append(msg)
|
|
|
|
self.compression_count += 1
|
|
|
|
compressed = self._sanitize_tool_pairs(compressed)
|
|
|
|
if not self.quiet_mode:
|
|
new_estimate = estimate_messages_tokens_rough(compressed)
|
|
saved_estimate = display_tokens - new_estimate
|
|
logger.info(
|
|
"Compressed: %d -> %d messages (~%d tokens saved)",
|
|
n_messages,
|
|
len(compressed),
|
|
saved_estimate,
|
|
)
|
|
logger.info("Compression #%d complete", self.compression_count)
|
|
|
|
return compressed
|