Enhance batch processing and tool validation

- Added support for tracking partial results and tool error counts in batch processing.
- Implemented filtering of corrupted entries during batch file combination based on valid tool names.
- Updated terminal tool to improve command execution and error handling, including retry logic for transient failures.
- Refactored model tools to use a simple terminal tool with no session persistence.
- Improved logging and error messages for invalid API responses and tool calls.
- Introduced chunked processing for large content in web tools to manage size limitations effectively.
This commit is contained in:
teknium 2026-01-10 05:56:26 +00:00
parent 21f9e2df40
commit 4071ba29da
8 changed files with 572 additions and 111 deletions

View file

@ -192,6 +192,7 @@ def _process_single_prompt(
"trajectory": trajectory,
"tool_stats": tool_stats,
"completed": result["completed"],
"partial": result.get("partial", False),
"api_calls": result["api_calls"],
"toolsets_used": selected_toolsets,
"metadata": {
@ -272,13 +273,23 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
# Save trajectory if successful
if result["success"] and result["trajectory"]:
# Create tool_error_counts mapping tool names to their failure counts
tool_stats = result.get("tool_stats", {})
tool_error_counts = {
tool_name: stats.get("failure", 0)
for tool_name, stats in tool_stats.items()
}
trajectory_entry = {
"prompt_index": prompt_index,
"conversations": result["trajectory"],
"metadata": result["metadata"],
"completed": result["completed"],
"partial": result.get("partial", False), # True if stopped due to invalid tool calls
"api_calls": result["api_calls"],
"toolsets_used": result["toolsets_used"]
"toolsets_used": result["toolsets_used"],
"tool_stats": tool_stats, # Full stats: {tool: {count, success, failure}}
"tool_error_counts": tool_error_counts # Simple: {tool: failure_count}
}
# Append to batch output file
@ -601,18 +612,44 @@ class BatchRunner:
stats["failure_rate"] = 0.0
# Combine all batch files into a single trajectories.jsonl file
# Also filter out corrupted entries (where model generated invalid tool names)
combined_file = self.output_dir / "trajectories.jsonl"
print(f"\n📦 Combining batch files into {combined_file.name}...")
VALID_TOOLS = {'web_search', 'web_extract', 'web_crawl', 'terminal', 'vision_analyze',
'image_generate', 'mixture_of_agents'}
total_entries = 0
filtered_entries = 0
with open(combined_file, 'w', encoding='utf-8') as outfile:
for batch_num in range(len(self.batches)):
batch_file = self.output_dir / f"batch_{batch_num}.jsonl"
if batch_file.exists():
with open(batch_file, 'r', encoding='utf-8') as infile:
for line in infile:
outfile.write(line)
total_entries += 1
try:
data = json.loads(line)
tool_stats = data.get('tool_stats', {})
# Check for invalid tool names (model hallucinations)
invalid_tools = [k for k in tool_stats.keys() if k not in VALID_TOOLS]
if invalid_tools:
filtered_entries += 1
invalid_preview = invalid_tools[0][:50] + "..." if len(invalid_tools[0]) > 50 else invalid_tools[0]
print(f" ⚠️ Filtering corrupted entry (batch {batch_num}): invalid tool '{invalid_preview}'")
continue
outfile.write(line)
except json.JSONDecodeError:
filtered_entries += 1
print(f" ⚠️ Filtering invalid JSON entry (batch {batch_num})")
print(f"✅ Combined {len(self.batches)} batch files into trajectories.jsonl")
if filtered_entries > 0:
print(f"⚠️ Filtered {filtered_entries} corrupted entries out of {total_entries} total")
print(f"✅ Combined {len(self.batches)} batch files into trajectories.jsonl ({total_entries - filtered_entries} entries)")
# Save final statistics
final_stats = {

View file

@ -8,7 +8,7 @@ for defining tools and executing function calls.
Currently supports:
- Web tools (search, extract, crawl) from web_tools.py
- Terminal tools (command execution with interactive sessions) from terminal_tool.py
- Terminal tools (simple command execution, no session persistence) from simple_terminal_tool.py
- Vision tools (image analysis) from vision_tools.py
- Mixture of Agents tools (collaborative multi-model reasoning) from mixture_of_agents_tool.py
- Image generation tools (text-to-image with upscaling) from image_generation_tool.py

View file

@ -43,7 +43,7 @@ else:
# Import our tool system
from model_tools import get_tool_definitions, handle_function_call, check_toolset_requirements
from tools.terminal_tool import cleanup_vm
from tools.simple_terminal_tool import cleanup_vm
class AIAgent:
@ -177,9 +177,11 @@ class AIAgent:
disabled_toolsets=disabled_toolsets
)
# Show tool configuration
# Show tool configuration and store valid tool names for validation
self.valid_tool_names = set()
if self.tools:
tool_names = [tool["function"]["name"] for tool in self.tools]
self.valid_tool_names = {tool["function"]["name"] for tool in self.tools}
tool_names = sorted(self.valid_tool_names)
print(f"🛠️ Loaded {len(self.tools)} tools: {', '.join(tool_names)}")
# Show filtering info if applied
@ -495,6 +497,49 @@ class AIAgent:
if self.verbose_logging:
logging.debug(f"API Response received - Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}")
# Validate response has valid choices before proceeding
if response is None or not hasattr(response, 'choices') or response.choices is None or len(response.choices) == 0:
# This is often rate limiting or provider returning malformed response
retry_count += 1
error_details = []
if response is None:
error_details.append("response is None")
elif not hasattr(response, 'choices'):
error_details.append("response has no 'choices' attribute")
elif response.choices is None:
error_details.append("response.choices is None")
else:
error_details.append("response.choices is empty")
# Check for error field in response (some providers include this)
error_msg = "Unknown"
if response and hasattr(response, 'error') and response.error:
error_msg = str(response.error)
elif response and hasattr(response, 'message') and response.message:
error_msg = str(response.message)
print(f"{self.log_prefix}⚠️ Invalid API response (attempt {retry_count}/{max_retries}): {', '.join(error_details)}")
print(f"{self.log_prefix} 📝 Provider message: {error_msg[:200]}")
print(f"{self.log_prefix} ⏱️ Response time: {api_duration:.2f}s (fast response often indicates rate limiting)")
if retry_count > max_retries:
print(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded for invalid responses. Giving up.")
logging.error(f"{self.log_prefix}Invalid API response after {max_retries} retries.")
return {
"messages": messages,
"completed": False,
"api_calls": api_call_count,
"error": f"Invalid API response (choices is None/empty). Likely rate limited by provider.",
"failed": True # Mark as failure for filtering
}
# Longer backoff for rate limiting (likely cause of None choices)
wait_time = min(5 * (2 ** (retry_count - 1)), 120) # 5s, 10s, 20s, 40s, 80s, 120s
print(f"{self.log_prefix}⏳ Retrying in {wait_time}s (extended backoff for possible rate limit)...")
logging.warning(f"Invalid API response (retry {retry_count}/{max_retries}): {', '.join(error_details)}")
time.sleep(wait_time)
continue # Retry the API call
break # Success, exit retry loop
except Exception as api_error:
@ -503,13 +548,32 @@ class AIAgent:
# Enhanced error logging
error_type = type(api_error).__name__
error_msg = str(api_error)
error_msg = str(api_error).lower()
print(f"{self.log_prefix}⚠️ API call failed (attempt {retry_count}/{max_retries}): {error_type}")
print(f"{self.log_prefix} ⏱️ Time elapsed before failure: {elapsed_time:.2f}s")
print(f"{self.log_prefix} 📝 Error: {error_msg[:200]}")
print(f"{self.log_prefix} 📝 Error: {str(api_error)[:200]}")
print(f"{self.log_prefix} 📊 Request context: {len(api_messages)} messages, ~{approx_tokens:,} tokens, {len(self.tools) if self.tools else 0} tools")
# Check for non-retryable errors (context length exceeded)
is_context_length_error = any(phrase in error_msg for phrase in [
'context length', 'maximum context', 'token limit',
'too many tokens', 'reduce the length', 'exceeds the limit'
])
if is_context_length_error:
print(f"{self.log_prefix}❌ Context length exceeded - this error cannot be resolved by retrying.")
print(f"{self.log_prefix} 💡 The conversation has accumulated too much content from tool responses.")
logging.error(f"{self.log_prefix}Context length exceeded: {approx_tokens:,} tokens. Cannot continue.")
# Return a partial result instead of crashing
return {
"messages": messages,
"completed": False,
"api_calls": api_call_count,
"error": f"Context length exceeded ({approx_tokens:,} tokens). Conversation terminated early.",
"partial": True
}
if retry_count > max_retries:
print(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.")
logging.error(f"{self.log_prefix}API call failed after {max_retries} retries. Last error: {api_error}")
@ -537,6 +601,43 @@ class AIAgent:
for tc in assistant_message.tool_calls:
logging.debug(f"Tool call: {tc.function.name} with args: {tc.function.arguments[:200]}...")
# Validate tool call names - detect model hallucinations
invalid_tool_calls = [
tc.function.name for tc in assistant_message.tool_calls
if tc.function.name not in self.valid_tool_names
]
if invalid_tool_calls:
# Track retries for invalid tool calls
if not hasattr(self, '_invalid_tool_retries'):
self._invalid_tool_retries = 0
self._invalid_tool_retries += 1
invalid_preview = invalid_tool_calls[0][:80] + "..." if len(invalid_tool_calls[0]) > 80 else invalid_tool_calls[0]
print(f"{self.log_prefix}⚠️ Invalid tool call detected: '{invalid_preview}'")
print(f"{self.log_prefix} Valid tools: {sorted(self.valid_tool_names)}")
if self._invalid_tool_retries < 3:
print(f"{self.log_prefix}🔄 Retrying API call ({self._invalid_tool_retries}/3)...")
# Don't add anything to messages, just retry the API call
continue
else:
print(f"{self.log_prefix}❌ Max retries (3) for invalid tool calls exceeded. Stopping as partial.")
# Return partial result - don't include the bad tool call in messages
self._invalid_tool_retries = 0 # Reset for next conversation
return {
"final_response": None,
"messages": messages, # Messages up to last valid point
"api_calls": api_call_count,
"completed": False,
"partial": True,
"error": f"Model generated invalid tool call: {invalid_preview}"
}
# Reset retry counter on successful tool call validation
if hasattr(self, '_invalid_tool_retries'):
self._invalid_tool_retries = 0
# Extract reasoning from response if available (for reasoning models like minimax, kimi, etc.)
reasoning_content = None
if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning:
@ -669,7 +770,8 @@ class AIAgent:
"final_response": final_response,
"messages": messages,
"api_calls": api_call_count,
"completed": completed
"completed": completed,
"partial": False # True only when stopped due to invalid tool calls
}
def chat(self, message: str) -> str:

26
run_datagen_glm4.7.sh Executable file
View file

@ -0,0 +1,26 @@
#!/bin/bash
# Create logs directory if it doesn't exist
mkdir -p logs
# Generate log filename with timestamp
LOG_FILE="logs/glm4.7-thinking-sft1_$(date +%Y%m%d_%H%M%S).log"
echo "📝 Logging output to: $LOG_FILE"
python batch_runner.py \
--dataset_file="source-data/hermes-agent-agent-tasks-1/agent_tasks_sft_1.jsonl" \
--batch_size=25 \
--run_name="megascience_glm4.7-thinking-sft1" \
--distribution="science" \
--model="z-ai/glm-4.7" \
--base_url="https://openrouter.ai/api/v1" \
--providers_allowed="gmicloud,siliconflow,atlas-cloud,z-ai,novita" \
--num_workers=10 \
--max_turns=60 \
--ephemeral_system_prompt="You have access to a variety of tools to help you solve scientific, math, and technology problems presented to you. You can use them in sequence and build off of the results of prior tools you've used results. Always use the terminal or search tool if it can provide additional context, verify formulas, double check concepts and recent studies and understanding, doing all calculations, etc. You should only be confident in your own reasoning, knowledge, or calculations if you've exhaustively used all tools available to you to that can help you verify or validate your work. Always pip install any packages you need to use the python scripts you want to run. If you need to use a tool that isn't available, you can use the terminal tool to install or create it in many cases as well. Do not use the terminal tool to communicate with the user, as they cannot see your commands, only your final response after completing the task. Search for at least 3 sources, but not more than 12." \
2>&1 | tee "$LOG_FILE"
echo "✅ Log saved to: $LOG_FILE"
# --verbose \

View file

@ -6,7 +6,7 @@ This package contains all the specific tool implementations for the Hermes Agent
Each module provides specialized functionality for different capabilities:
- web_tools: Web search, content extraction, and crawling
- terminal_tool: Command execution on virtual machines
- simple_terminal_tool: Simple command execution on virtual machines (no session persistence)
- vision_tools: Image analysis and understanding
- mixture_of_agents_tool: Multi-model collaborative reasoning
- image_generation_tool: Text-to-image generation with upscaling
@ -23,10 +23,11 @@ from .web_tools import (
check_firecrawl_api_key
)
from .terminal_tool import (
terminal_tool,
check_hecate_requirements,
TERMINAL_TOOL_DESCRIPTION
from .simple_terminal_tool import (
simple_terminal_tool,
check_requirements as check_terminal_requirements,
cleanup_vm,
SIMPLE_TERMINAL_TOOL_DESCRIPTION
)
from .vision_tools import (
@ -50,10 +51,11 @@ __all__ = [
'web_extract_tool',
'web_crawl_tool',
'check_firecrawl_api_key',
# Terminal tools
'terminal_tool',
'check_hecate_requirements',
'TERMINAL_TOOL_DESCRIPTION',
# Terminal tools (simple - no session persistence)
'simple_terminal_tool',
'check_terminal_requirements',
'cleanup_vm',
'SIMPLE_TERMINAL_TOOL_DESCRIPTION',
# Vision tools
'vision_analyze_tool',
'check_vision_requirements',

View file

@ -100,6 +100,12 @@ def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300):
print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)")
else:
print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}")
# Always remove from tracking dicts to prevent infinite retry loops
if task_id in _active_instances:
del _active_instances[task_id]
if task_id in _last_activity:
del _last_activity[task_id]
def _cleanup_thread_worker():
@ -171,48 +177,36 @@ def cleanup_vm(task_id: str):
atexit.register(_stop_cleanup_thread)
def _execute_ssh_command(instance, command: str, timeout: Optional[int] = None) -> Dict[str, Any]:
def _execute_command(instance, command: str, timeout: Optional[int] = None) -> Dict[str, Any]:
"""
Execute a command via SSH on the VM instance.
Execute a command on the VM instance using instance.exec() for proper stderr capture.
Args:
instance: MorphVM instance
command: Command to execute
timeout: Optional timeout in seconds
timeout: Optional timeout in seconds (Note: exec() may not support timeout directly)
Returns:
dict with stdout, stderr, returncode
"""
ssh_context_manager = None
try:
# Use the instance's SSH context manager
ssh_context_manager = instance.ssh()
ssh_context = ssh_context_manager.__enter__()
# Execute the command
result = ssh_context.run(command, get_pty=False, timeout=timeout or 120)
# Close the SSH connection
if ssh_context_manager:
try:
ssh_context_manager.__exit__(None, None, None)
except:
pass
# Use instance.exec() which properly captures both stdout and stderr
# (unlike ssh.run() which doesn't capture stderr correctly)
result = instance.exec(command)
# Debug logging only for verbose mode or unusual cases
# Note: Non-zero exit codes are normal (model's command failed) - not a tool error
if result.exit_code != 0 and not result.stdout and not result.stderr:
# Only log if we got absolutely no output - might indicate an issue
print(f"⚠️ Command returned exit={result.exit_code} with no output")
return {
"stdout": result.stdout or "",
"stderr": result.stderr or "",
"returncode": result.returncode
"returncode": result.exit_code
}
except Exception as e:
# Close connection on error
if ssh_context_manager:
try:
ssh_context_manager.__exit__(None, None, None)
except:
pass
# Check if it's a timeout
error_str = str(e).lower()
if "timeout" in error_str:
@ -224,7 +218,7 @@ def _execute_ssh_command(instance, command: str, timeout: Optional[int] = None)
return {
"stdout": "",
"stderr": f"SSH execution failed: {str(e)}",
"stderr": f"Command execution failed: {str(e)}",
"returncode": -1
}
@ -312,7 +306,7 @@ def simple_terminal_tool(
if background:
# Run in background with nohup and redirect output
exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &"
result = _execute_ssh_command(instance, exec_command, timeout=10)
result = _execute_command(instance, exec_command, timeout=10)
# For background tasks, return immediately with info
if result["returncode"] == 0:
@ -322,24 +316,72 @@ def simple_terminal_tool(
"error": None
}, ensure_ascii=False)
else:
# Include stderr in output but don't set error (command failure, not tool failure)
bg_output = result["stdout"]
if result["stderr"]:
bg_output = f"{bg_output}\n{result['stderr']}" if bg_output else result["stderr"]
return json.dumps({
"output": result["stdout"],
"output": bg_output,
"exit_code": result["returncode"],
"error": result["stderr"]
"error": None # Only set for actual tool failures
}, ensure_ascii=False)
else:
# Run foreground command
result = _execute_ssh_command(instance, command, timeout=timeout)
# Run foreground command with retry logic for transient failures
max_retries = 3
retry_count = 0
result = None
while retry_count <= max_retries:
result = _execute_command(instance, command, timeout=timeout)
# Check if we should retry (only for transient errors, not normal results)
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
returncode = result.get("returncode", 0)
should_retry = False
retry_reason = ""
# NOTE: Empty output with exit_code=0 is NORMAL for many commands:
# - File writes: cat > file, echo > file
# - Directory ops: mkdir, cd
# - Silent installs: pip install --quiet
# So we do NOT retry on exit_code=0, even with empty output.
# Only retry on special error codes that suggest transient/infra issues
if not stdout and not stderr and returncode in [-1, 124]:
should_retry = True
retry_reason = f"transient error (code {returncode})"
if should_retry and retry_count < max_retries:
retry_count += 1
wait_time = 2 ** retry_count # Exponential backoff: 2s, 4s, 8s
print(f"⚠️ Terminal: {retry_reason}, retrying in {wait_time}s (attempt {retry_count}/{max_retries})")
time.sleep(wait_time)
continue
# Got a result (success or normal command failure) - exit retry loop
break
# Combine stdout and stderr for output
output = result["stdout"]
if result["stderr"] and result["returncode"] != 0:
output = f"{output}\n{result['stderr']}" if output else result["stderr"]
# Truncate output if too long (max 50,000 chars to avoid context explosion)
MAX_OUTPUT_CHARS = 50000
if len(output) > MAX_OUTPUT_CHARS:
truncated_notice = f"\n\n... [OUTPUT TRUNCATED - showing last {MAX_OUTPUT_CHARS} chars of {len(output)} total] ..."
output = truncated_notice + output[-MAX_OUTPUT_CHARS:]
# NOTE: error is only set for FUNCTIONAL tool failures (VM issues, timeouts, etc.)
# Non-zero exit codes from the model's commands are NOT tool failures -
# the model can self-correct. The exit_code field tells the model if the command succeeded.
# Retries that eventually succeed also don't count as failures.
return json.dumps({
"output": output.strip(),
"exit_code": result["returncode"],
"error": result["stderr"] if result["returncode"] != 0 else None
"error": None # Only set for actual tool failures, not command failures
}, ensure_ascii=False)
except Exception as e:

View file

@ -270,6 +270,7 @@ def terminal_tool(
except ImportError as import_error:
return json.dumps({
"output": "",
"stderr": "",
"screen": "",
"exit_code": -1,
"error": f"Terminal tool is disabled due to import error: {import_error}",
@ -287,6 +288,7 @@ def terminal_tool(
if not morph_api_key:
return json.dumps({
"output": "",
"stderr": "",
"screen": "",
"exit_code": -1,
"error": "MORPH_API_KEY environment variable not set",
@ -349,29 +351,85 @@ def terminal_tool(
# Generate unique tool block ID
tool_block_id = f"tool_{uuid.uuid4().hex[:8]}"
# Execute the tool with hecate
result = run_tool(
tool_call=tool_call,
instance=instance,
console=console,
tool_block_id=tool_block_id,
ctx=ctx
)
# Retry configuration for handling transient empty responses
max_retries = 3
retry_count = 0
while retry_count <= max_retries:
# Execute the tool with hecate
result = run_tool(
tool_call=tool_call,
instance=instance,
console=console,
tool_block_id=tool_block_id,
ctx=ctx
)
# Format the result with only essential fields for the LLM
# Map hecate's "stdout" to "output" for compatibility
formatted_result = {
"output": result.get("stdout", result.get("output", "")),
"screen": result.get("screen", ""),
"exit_code": result.get("returncode", result.get("exit_code", -1)),
"error": result.get("error")
}
# Format the result with only essential fields for the LLM
# Map hecate's "stdout" to "output" for compatibility
stdout = result.get("stdout", result.get("output", ""))
stderr = result.get("stderr", "")
exit_code = result.get("returncode", result.get("exit_code", -1))
error = result.get("error")
screen = result.get("screen", "")
# If there's no explicit error but there's stderr, include it in error field
# This helps capture why commands failed even without an explicit error message
if not error and stderr:
error = stderr
# If exit code is non-zero but no error info, note that
elif not error and exit_code and exit_code != 0 and not stdout:
error = f"Command exited with code {exit_code}"
# Check if we should retry:
# 1. Empty output with non-zero exit code (clear failure)
# 2. Completely empty response (may indicate timing/VM issue)
should_retry = False
retry_reason = ""
if not stdout and not stderr and not screen and not error and exit_code == 0:
# Completely empty response - might be a timing issue
should_retry = True
retry_reason = "completely empty response (possible timing issue)"
elif not stdout and not stderr and exit_code != 0 and exit_code != -1:
# Non-zero exit with no output at all - might be transient
should_retry = True
retry_reason = f"empty output with exit code {exit_code}"
if should_retry and retry_count < max_retries:
retry_count += 1
wait_time = 2 ** retry_count # Exponential backoff: 2s, 4s, 8s
print(f"⚠️ Terminal: {retry_reason}, retrying in {wait_time}s (attempt {retry_count}/{max_retries})")
time.sleep(wait_time)
continue
# Success or max retries reached - return the result
formatted_result = {
"output": stdout,
"stderr": stderr, # Now capturing stderr separately too
"screen": screen,
"exit_code": exit_code,
"error": error
}
if retry_count > 0:
formatted_result["retries"] = retry_count
return json.dumps(formatted_result, ensure_ascii=False)
return json.dumps(formatted_result, ensure_ascii=False)
# Should never reach here, but just in case
return json.dumps({
"output": "",
"stderr": "",
"screen": "",
"exit_code": -1,
"error": "Terminal tool: max retries exceeded"
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"output": "",
"stderr": "",
"screen": "",
"exit_code": -1,
"error": f"Failed to execute terminal command: {str(e)}",

View file

@ -139,6 +139,9 @@ async def process_content_with_llm(
to intelligently extract key information and create markdown summaries,
significantly reducing token usage while preserving all important information.
For very large content (>500k chars), uses chunked processing with synthesis.
For extremely large content (>2M chars), refuses to process entirely.
Args:
content (str): The raw content to process
url (str): The source URL (for context, optional)
@ -149,13 +152,25 @@ async def process_content_with_llm(
Returns:
Optional[str]: Processed markdown content, or None if content too short or processing fails
"""
# Size thresholds
MAX_CONTENT_SIZE = 2_000_000 # 2M chars - refuse entirely above this
CHUNK_THRESHOLD = 500_000 # 500k chars - use chunked processing above this
CHUNK_SIZE = 100_000 # 100k chars per chunk
MAX_OUTPUT_SIZE = 5000 # Hard cap on final output size
try:
# Skip processing if content is too short
if len(content) < min_length:
print(f"📏 Content too short ({len(content)} < {min_length} chars), skipping LLM processing")
return None
content_len = len(content)
print(f"🧠 Processing content with LLM ({len(content)} characters)")
# Refuse if content is absurdly large
if content_len > MAX_CONTENT_SIZE:
size_mb = content_len / 1_000_000
print(f"🚫 Content too large ({size_mb:.1f}MB > 2MB limit). Refusing to process.")
return f"[Content too large to process: {size_mb:.1f}MB. Try using web_crawl with specific extraction instructions, or search for a more focused source.]"
# Skip processing if content is too short
if content_len < min_length:
print(f"📏 Content too short ({content_len} < {min_length} chars), skipping LLM processing")
return None
# Create context information
context_info = []
@ -163,10 +178,83 @@ async def process_content_with_llm(
context_info.append(f"Title: {title}")
if url:
context_info.append(f"Source: {url}")
context_str = "\n".join(context_info) + "\n\n" if context_info else ""
# Simplified prompt for better quality markdown output
# Check if we need chunked processing
if content_len > CHUNK_THRESHOLD:
print(f"📦 Content large ({content_len:,} chars). Using chunked processing...")
return await _process_large_content_chunked(
content, context_str, model, CHUNK_SIZE, MAX_OUTPUT_SIZE
)
# Standard single-pass processing for normal content
print(f"🧠 Processing content with LLM ({content_len} characters)")
processed_content = await _call_summarizer_llm(content, context_str, model)
if processed_content:
# Enforce output cap
if len(processed_content) > MAX_OUTPUT_SIZE:
processed_content = processed_content[:MAX_OUTPUT_SIZE] + "\n\n[... summary truncated for context management ...]"
# Log compression metrics
processed_length = len(processed_content)
compression_ratio = processed_length / content_len if content_len > 0 else 1.0
print(f"✅ Content processed: {content_len}{processed_length} chars ({compression_ratio:.1%})")
return processed_content
except Exception as e:
print(f"❌ Error processing content with LLM: {str(e)}")
return f"[Failed to process content: {str(e)[:100]}. Content size: {len(content):,} chars]"
async def _call_summarizer_llm(
content: str,
context_str: str,
model: str,
max_tokens: int = 4000,
is_chunk: bool = False,
chunk_info: str = ""
) -> Optional[str]:
"""
Make a single LLM call to summarize content.
Args:
content: The content to summarize
context_str: Context information (title, URL)
model: Model to use
max_tokens: Maximum output tokens
is_chunk: Whether this is a chunk of a larger document
chunk_info: Information about chunk position (e.g., "Chunk 2/5")
Returns:
Summarized content or None on failure
"""
if is_chunk:
# Chunk-specific prompt - aware that this is partial content
system_prompt = """You are an expert content analyst processing a SECTION of a larger document. Your job is to extract and summarize the key information from THIS SECTION ONLY.
Important guidelines for chunk processing:
1. Do NOT write introductions or conclusions - this is a partial document
2. Focus on extracting ALL key facts, figures, data points, and insights from this section
3. Preserve important quotes, code snippets, and specific details verbatim
4. Use bullet points and structured formatting for easy synthesis later
5. Note any references to other sections (e.g., "as mentioned earlier", "see below") without trying to resolve them
Your output will be combined with summaries of other sections, so focus on thorough extraction rather than narrative flow."""
user_prompt = f"""Extract key information from this SECTION of a larger document:
{context_str}{chunk_info}
SECTION CONTENT:
{content}
Extract all important information from this section in a structured format. Focus on facts, data, insights, and key details. Do not add introductions or conclusions."""
else:
# Standard full-document prompt
system_prompt = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
@ -183,49 +271,155 @@ Your goal is to preserve ALL important information while reducing length. Never
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
# Call the LLM asynchronously with retry logic for flaky API
max_retries = 6
retry_delay = 2 # Start with 2 seconds
last_error = None
# Call the LLM with retry logic
max_retries = 6
retry_delay = 2
last_error = None
for attempt in range(max_retries):
try:
response = await summarizer_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1, # Low temperature for consistent extraction
max_tokens=4000 # Generous limit for comprehensive processing
)
break # Success, exit retry loop
except Exception as api_error:
last_error = api_error
if attempt < max_retries - 1:
print(f"⚠️ LLM API call failed (attempt {attempt + 1}/{max_retries}): {str(api_error)[:100]}")
print(f" Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60) # Exponential backoff: 2s, 4s, 8s, 16s, 32s, 60s
else:
# All retries exhausted
raise last_error
for attempt in range(max_retries):
try:
response = await summarizer_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1,
max_tokens=max_tokens
)
return response.choices[0].message.content.strip()
except Exception as api_error:
last_error = api_error
if attempt < max_retries - 1:
print(f"⚠️ LLM API call failed (attempt {attempt + 1}/{max_retries}): {str(api_error)[:100]}")
print(f" Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
else:
raise last_error
return None
async def _process_large_content_chunked(
content: str,
context_str: str,
model: str,
chunk_size: int,
max_output_size: int
) -> Optional[str]:
"""
Process large content by chunking, summarizing each chunk in parallel,
then synthesizing the summaries.
Args:
content: The large content to process
context_str: Context information
model: Model to use
chunk_size: Size of each chunk in characters
max_output_size: Maximum final output size
# Get the markdown response directly
processed_content = response.choices[0].message.content.strip()
Returns:
Synthesized summary or None on failure
"""
# Split content into chunks
chunks = []
for i in range(0, len(content), chunk_size):
chunk = content[i:i + chunk_size]
chunks.append(chunk)
print(f" 📦 Split into {len(chunks)} chunks of ~{chunk_size:,} chars each")
# Summarize each chunk in parallel
async def summarize_chunk(chunk_idx: int, chunk_content: str) -> tuple[int, Optional[str]]:
"""Summarize a single chunk."""
try:
chunk_info = f"[Processing chunk {chunk_idx + 1} of {len(chunks)}]"
summary = await _call_summarizer_llm(
chunk_content,
context_str,
model,
max_tokens=2000,
is_chunk=True,
chunk_info=chunk_info
)
if summary:
print(f" ✅ Chunk {chunk_idx + 1}/{len(chunks)} summarized: {len(chunk_content):,}{len(summary):,} chars")
return chunk_idx, summary
except Exception as e:
print(f" ⚠️ Chunk {chunk_idx + 1}/{len(chunks)} failed: {str(e)[:50]}")
return chunk_idx, None
# Run all chunk summarizations in parallel
tasks = [summarize_chunk(i, chunk) for i, chunk in enumerate(chunks)]
results = await asyncio.gather(*tasks)
# Collect successful summaries in order
summaries = []
for chunk_idx, summary in sorted(results, key=lambda x: x[0]):
if summary:
summaries.append(f"## Section {chunk_idx + 1}\n{summary}")
if not summaries:
print(f" ❌ All chunk summarizations failed")
return "[Failed to process large content: all chunk summarizations failed]"
print(f" 📊 Got {len(summaries)}/{len(chunks)} chunk summaries")
# If only one chunk succeeded, just return it (with cap)
if len(summaries) == 1:
result = summaries[0]
if len(result) > max_output_size:
result = result[:max_output_size] + "\n\n[... truncated ...]"
return result
# Synthesize the summaries into a final summary
print(f" 🔗 Synthesizing {len(summaries)} summaries...")
combined_summaries = "\n\n---\n\n".join(summaries)
synthesis_prompt = f"""You have been given summaries of different sections of a large document.
Synthesize these into ONE cohesive, comprehensive summary that:
1. Removes redundancy between sections
2. Preserves all key facts, figures, and actionable information
3. Is well-organized with clear structure
4. Is under {max_output_size} characters
{context_str}SECTION SUMMARIES:
{combined_summaries}
Create a single, unified markdown summary."""
try:
response = await summarizer_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You synthesize multiple summaries into one cohesive, comprehensive summary. Be thorough but concise."},
{"role": "user", "content": synthesis_prompt}
],
temperature=0.1,
max_tokens=4000
)
final_summary = response.choices[0].message.content.strip()
# Calculate compression metrics for logging
original_length = len(content)
processed_length = len(processed_content)
compression_ratio = processed_length / original_length if original_length > 0 else 1.0
# Enforce hard cap
if len(final_summary) > max_output_size:
final_summary = final_summary[:max_output_size] + "\n\n[... summary truncated for context management ...]"
print(f"✅ Content processed: {original_length}{processed_length} chars ({compression_ratio:.1%})")
original_len = len(content)
final_len = len(final_summary)
compression = final_len / original_len if original_len > 0 else 1.0
return processed_content
print(f" ✅ Synthesis complete: {original_len:,}{final_len:,} chars ({compression:.2%})")
return final_summary
except Exception as e:
print(f"❌ Error processing content with LLM: {str(e)}")
return None
print(f" ⚠️ Synthesis failed: {str(e)[:100]}")
# Fall back to concatenated summaries with truncation
fallback = "\n\n".join(summaries)
if len(fallback) > max_output_size:
fallback = fallback[:max_output_size] + "\n\n[... truncated due to synthesis failure ...]"
return fallback
def clean_base64_images(text: str) -> str: