Add mini-swe-agent runner and trajectory compressor

- Introduced mini_swe_runner.py for executing tasks using mini-swe-agent environments (local, Docker, Modal) and outputting trajectories in Hermes format.
- Implemented trajectory_compressor.py to post-process agent trajectories, compressing them within a target token budget while preserving essential content.
- Added trajectory_compression.yaml configuration file for customizable compression settings.
- Created sample_and_compress.py script to download, sample, and compress trajectories from HuggingFace datasets.
- Enhanced logging and error handling across new modules for improved usability and debugging.
This commit is contained in:
teknium 2026-01-23 00:52:46 +00:00
parent 6eb76c7c1a
commit 47555602d7
4 changed files with 2455 additions and 0 deletions

View file

@ -0,0 +1,97 @@
# Trajectory Compression Configuration
#
# Post-processes completed agent trajectories to fit within a target token budget.
# Compression preserves head/tail turns and summarizes middle content only as needed.
# Tokenizer settings for accurate token counting
tokenizer:
# HuggingFace tokenizer name
name: "moonshotai/Kimi-K2-Thinking"
# Trust remote code (required for some tokenizers)
trust_remote_code: true
# Compression targets and behavior
compression:
# Target maximum tokens for compressed trajectory
target_max_tokens: 29000
# Target size for summary (in tokens)
# This is factored into calculations when determining what to compress
summary_target_tokens: 750
# Protected turns that should NEVER be compressed
protected_turns:
# Always protect the first system message (tool definitions)
first_system: true
# Always protect the first human message (original request)
first_human: true
# Always protect the first gpt message (initial response/tool_call)
first_gpt: true
# Always protect the first tool response (result of first action)
first_tool: true
# Always protect the last 2 complete turn pairs (gpt+tool or gpt only)
# This ensures the model's final actions and conclusions are preserved
last_n_turns: 4
# LLM settings for generating summaries (OpenRouter only)
summarization:
# Model to use for summarization (should be fast and cheap)
# Using OpenRouter model path format
model: "google/gemini-3-flash-preview"
# OpenRouter API settings
base_url: "https://openrouter.ai/api/v1"
# Environment variable containing OpenRouter API key
api_key_env: "OPENROUTER_API_KEY"
# Temperature for summarization (lower = more deterministic)
temperature: 0.3
# Max retries for API failures
max_retries: 3
# Delay between retries (seconds)
retry_delay: 2
# Output settings
output:
# Add notice to system message about potential summarization
add_summary_notice: true
# Text to append to system message
summary_notice_text: "\n\nSome of the conversation may be summarized to preserve context."
# Output directory suffix (appended to input directory name)
output_suffix: "_compressed"
# Processing settings
processing:
# Number of parallel workers for batch processing
num_workers: 4
# Maximum concurrent API calls for summarization (async parallelism)
max_concurrent_requests: 50
# Skip trajectories that are already under target length
skip_under_target: true
# If true, save trajectories even if compression can't get under target
# (will compress as much as possible)
save_over_limit: true
# Metrics to track
metrics:
# Log detailed compression statistics
enabled: true
# Save per-trajectory metrics in output
per_trajectory: false
# Metrics file name (saved in output directory)
output_file: "compression_metrics.json"

704
mini_swe_runner.py Normal file
View file

@ -0,0 +1,704 @@
#!/usr/bin/env python3
"""
Mini-SWE-Agent Runner with Hermes Trajectory Format
This module provides a runner that uses mini-swe-agent's execution environments
(local, docker, modal) but outputs trajectories in the Hermes-Agent format
compatible with batch_runner.py and trajectory_compressor.py.
Features:
- Uses mini-swe-agent's Docker, Modal, or Local environments for command execution
- Outputs trajectories in Hermes format (from/value pairs with <tool_call>/<tool_response> XML)
- Compatible with the trajectory compression pipeline
- Supports batch processing from JSONL prompt files
Usage:
# Run a single task with local environment
python mini_swe_runner.py --task "Create a hello world Python script" --env local
# Run with Docker
python mini_swe_runner.py --task "List files in /tmp" --env docker --image python:3.11-slim
# Run with Modal (cloud)
python mini_swe_runner.py --task "Install numpy and test it" --env modal --image python:3.11-slim
# Batch mode from JSONL file
python mini_swe_runner.py --prompts_file prompts.jsonl --output_file trajectories.jsonl --env docker
"""
import json
import logging
import os
import sys
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Literal
import fire
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Add mini-swe-agent to path if not installed
mini_swe_path = Path(__file__).parent / "mini-swe-agent" / "src"
if mini_swe_path.exists():
sys.path.insert(0, str(mini_swe_path))
# ============================================================================
# Terminal Tool Definition (matches Hermes-Agent format)
# ============================================================================
TERMINAL_TOOL_DEFINITION = {
"type": "function",
"function": {
"name": "terminal",
"description": """Execute bash commands in a sandboxed environment.
**Environment:**
- Isolated execution environment (local, Docker, or Modal cloud)
- Filesystem persists between tool calls within the same task
- Internet access available
**Command Execution:**
- Provide the command to execute via the 'command' parameter
- Optional 'timeout' parameter in seconds (default: 60)
**Examples:**
- Run command: `{"command": "ls -la"}`
- With timeout: `{"command": "long_task.sh", "timeout": 300}`
**Best Practices:**
- Use non-interactive commands (avoid vim, nano, interactive python)
- Pipe to cat if output might be large
- Install tools with apt-get or pip as needed
**Completion:**
- When task is complete, output: echo "MINI_SWE_AGENT_FINAL_OUTPUT" followed by your result
""",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
},
"timeout": {
"type": "integer",
"description": "Command timeout in seconds (default: 60)"
}
},
"required": ["command"]
}
}
}
# ============================================================================
# Environment Factory
# ============================================================================
def create_environment(
env_type: str = "local",
image: str = "python:3.11-slim",
cwd: str = "/tmp",
timeout: int = 60,
**kwargs
):
"""
Create an execution environment from mini-swe-agent.
Args:
env_type: One of "local", "docker", "modal"
image: Docker/Modal image name (ignored for local)
cwd: Working directory
timeout: Default command timeout
**kwargs: Additional environment-specific options
Returns:
Environment instance with execute() method
"""
if env_type == "local":
from minisweagent.environments.local import LocalEnvironment
return LocalEnvironment(cwd=cwd, timeout=timeout)
elif env_type == "docker":
from minisweagent.environments.docker import DockerEnvironment
return DockerEnvironment(image=image, cwd=cwd, timeout=timeout, **kwargs)
elif env_type == "modal":
from minisweagent.environments.extra.swerex_modal import SwerexModalEnvironment
return SwerexModalEnvironment(image=image, cwd=cwd, timeout=timeout, **kwargs)
else:
raise ValueError(f"Unknown environment type: {env_type}. Use 'local', 'docker', or 'modal'")
# ============================================================================
# Mini-SWE Runner with Hermes Trajectory Format
# ============================================================================
class MiniSWERunner:
"""
Agent runner that uses mini-swe-agent environments but outputs
trajectories in Hermes-Agent format.
"""
def __init__(
self,
model: str = "claude-sonnet-4-20250514",
base_url: str = None,
api_key: str = None,
env_type: str = "local",
image: str = "python:3.11-slim",
cwd: str = "/tmp",
max_iterations: int = 15,
command_timeout: int = 60,
verbose: bool = False,
):
"""
Initialize the Mini-SWE Runner.
Args:
model: Model name for OpenAI-compatible API
base_url: API base URL (optional, uses env vars if not provided)
api_key: API key (optional, uses env vars if not provided)
env_type: Environment type - "local", "docker", or "modal"
image: Docker/Modal image (ignored for local)
cwd: Working directory for commands
max_iterations: Maximum tool-calling iterations
command_timeout: Default timeout for commands
verbose: Enable verbose logging
"""
self.model = model
self.max_iterations = max_iterations
self.command_timeout = command_timeout
self.verbose = verbose
self.env_type = env_type
self.image = image
self.cwd = cwd
# Setup logging
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
self.logger = logging.getLogger(__name__)
# Initialize OpenAI client
from openai import OpenAI
client_kwargs = {}
if base_url:
client_kwargs["base_url"] = base_url
# Handle API key with fallbacks
if api_key:
client_kwargs["api_key"] = api_key
else:
client_kwargs["api_key"] = os.getenv(
"OPENROUTER_API_KEY",
os.getenv("ANTHROPIC_API_KEY", os.getenv("OPENAI_API_KEY", ""))
)
self.client = OpenAI(**client_kwargs)
# Environment will be created per-task
self.env = None
# Tool definition
self.tools = [TERMINAL_TOOL_DEFINITION]
print(f"🤖 Mini-SWE Runner initialized")
print(f" Model: {self.model}")
print(f" Environment: {self.env_type}")
if self.env_type != "local":
print(f" Image: {self.image}")
print(f" Max iterations: {self.max_iterations}")
def _create_env(self):
"""Create the execution environment."""
print(f"🔧 Creating {self.env_type} environment...")
self.env = create_environment(
env_type=self.env_type,
image=self.image,
cwd=self.cwd,
timeout=self.command_timeout
)
print(f"✅ Environment ready")
def _cleanup_env(self):
"""Cleanup the execution environment."""
if self.env is not None:
if hasattr(self.env, 'cleanup'):
self.env.cleanup()
elif hasattr(self.env, 'stop'):
self.env.stop()
self.env = None
def _execute_command(self, command: str, timeout: int = None) -> Dict[str, Any]:
"""
Execute a command in the environment.
Args:
command: Bash command to execute
timeout: Optional timeout override
Returns:
Dict with 'output' and 'returncode'
"""
if self.env is None:
self._create_env()
try:
result = self.env.execute(command, timeout=timeout or self.command_timeout)
return {
"output": result.get("output", ""),
"exit_code": result.get("returncode", 0),
"error": None
}
except Exception as e:
return {
"output": "",
"exit_code": -1,
"error": str(e)
}
def _format_tools_for_system_message(self) -> str:
"""Format tool definitions for the system message."""
formatted_tools = []
for tool in self.tools:
func = tool["function"]
formatted_tools.append({
"name": func["name"],
"description": func.get("description", ""),
"parameters": func.get("parameters", {}),
"required": None
})
return json.dumps(formatted_tools, ensure_ascii=False)
def _convert_to_hermes_format(
self,
messages: List[Dict[str, Any]],
user_query: str,
completed: bool
) -> List[Dict[str, Any]]:
"""
Convert internal message format to Hermes trajectory format.
This produces the exact format used by batch_runner.py.
"""
trajectory = []
# System message with tool definitions
system_msg = (
"You are a function calling AI model. You are provided with function signatures within <tools> </tools> XML tags. "
"You may call one or more functions to assist with the user query. If available tools are not relevant in assisting "
"with user query, just respond in natural conversational language. Don't make assumptions about what values to plug "
"into functions. After calling & executing the functions, you will be provided with function results within "
"<tool_response> </tool_response> XML tags. Here are the available tools:\n"
f"<tools>\n{self._format_tools_for_system_message()}\n</tools>\n"
"For each function call return a JSON object, with the following pydantic model json schema for each:\n"
"{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, "
"'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n"
"Each function call should be enclosed within <tool_call> </tool_call> XML tags.\n"
"Example:\n<tool_call>\n{'name': <function-name>,'arguments': <args-dict>}\n</tool_call>"
)
trajectory.append({"from": "system", "value": system_msg})
trajectory.append({"from": "human", "value": user_query})
# Process messages (skip first user message as we already added it)
i = 1
while i < len(messages):
msg = messages[i]
if msg["role"] == "assistant":
if "tool_calls" in msg and msg["tool_calls"]:
# Assistant message with tool calls
content = ""
# Add reasoning if present
if msg.get("reasoning"):
content = f"<think>{msg['reasoning']}</think>"
if msg.get("content"):
content += msg["content"] + "\n"
# Add tool calls in XML format
for tool_call in msg["tool_calls"]:
try:
arguments = json.loads(tool_call["function"]["arguments"]) \
if isinstance(tool_call["function"]["arguments"], str) \
else tool_call["function"]["arguments"]
except json.JSONDecodeError:
arguments = {}
tool_call_json = {
"name": tool_call["function"]["name"],
"arguments": arguments
}
content += f"<tool_call>\n{json.dumps(tool_call_json, ensure_ascii=False)}\n</tool_call>\n"
trajectory.append({"from": "gpt", "value": content.rstrip()})
# Collect subsequent tool responses
tool_responses = []
j = i + 1
while j < len(messages) and messages[j]["role"] == "tool":
tool_msg = messages[j]
tool_content = tool_msg["content"]
# Try to parse as JSON
try:
if tool_content.strip().startswith(("{", "[")):
tool_content = json.loads(tool_content)
except (json.JSONDecodeError, AttributeError):
pass
tool_response = f"<tool_response>\n"
tool_response += json.dumps({
"tool_call_id": tool_msg.get("tool_call_id", ""),
"name": msg["tool_calls"][len(tool_responses)]["function"]["name"] \
if len(tool_responses) < len(msg["tool_calls"]) else "unknown",
"content": tool_content
}, ensure_ascii=False)
tool_response += "\n</tool_response>"
tool_responses.append(tool_response)
j += 1
if tool_responses:
trajectory.append({"from": "tool", "value": "\n".join(tool_responses)})
i = j - 1
else:
# Regular assistant message (no tool calls)
content = ""
if msg.get("reasoning"):
content = f"<think>{msg['reasoning']}</think>"
content += msg.get("content") or ""
trajectory.append({"from": "gpt", "value": content})
elif msg["role"] == "user":
trajectory.append({"from": "human", "value": msg["content"]})
i += 1
return trajectory
def run_task(self, task: str) -> Dict[str, Any]:
"""
Run a single task and return the result with trajectory.
Args:
task: The task/prompt to execute
Returns:
Dict with trajectory, completion status, and metadata
"""
print(f"\n{'='*60}")
print(f"📝 Task: {task[:80]}{'...' if len(task) > 80 else ''}")
print(f"{'='*60}")
# Initialize environment
self._create_env()
# Message history
messages = [{"role": "user", "content": task}]
# System prompt for the LLM (ephemeral - not saved to trajectory)
system_prompt = """You are an AI agent that can execute bash commands to complete tasks.
When you need to run commands, use the 'terminal' tool with your bash command.
**Important:**
- When you have completed the task successfully, run: echo "MINI_SWE_AGENT_FINAL_OUTPUT" followed by a summary
- Be concise and efficient in your approach
- Install any needed tools with apt-get or pip
- Avoid interactive commands (no vim, nano, less, etc.)
Complete the user's task step by step."""
api_call_count = 0
completed = False
final_response = None
try:
while api_call_count < self.max_iterations:
api_call_count += 1
print(f"\n🔄 API call #{api_call_count}/{self.max_iterations}")
# Prepare API messages
api_messages = [{"role": "system", "content": system_prompt}] + messages
# Make API call
try:
response = self.client.chat.completions.create(
model=self.model,
messages=api_messages,
tools=self.tools,
timeout=300.0
)
except Exception as e:
self.logger.error(f"API call failed: {e}")
break
assistant_message = response.choices[0].message
# Log assistant response
if assistant_message.content:
print(f"🤖 Assistant: {assistant_message.content[:100]}...")
# Check for tool calls
if assistant_message.tool_calls:
print(f"🔧 Tool calls: {len(assistant_message.tool_calls)}")
# Add assistant message with tool calls
messages.append({
"role": "assistant",
"content": assistant_message.content,
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in assistant_message.tool_calls
]
})
# Execute each tool call
for tc in assistant_message.tool_calls:
try:
args = json.loads(tc.function.arguments)
except json.JSONDecodeError:
args = {}
command = args.get("command", "echo 'No command provided'")
timeout = args.get("timeout", self.command_timeout)
print(f" 📞 terminal: {command[:60]}...")
# Execute command
result = self._execute_command(command, timeout)
# Format result
result_json = json.dumps({
"content": {
"output": result["output"],
"exit_code": result["exit_code"],
"error": result["error"]
}
}, ensure_ascii=False)
# Check for task completion signal
if "MINI_SWE_AGENT_FINAL_OUTPUT" in result["output"]:
print(f" ✅ Task completion signal detected!")
completed = True
# Add tool response
messages.append({
"role": "tool",
"content": result_json,
"tool_call_id": tc.id
})
print(f" ✅ exit_code={result['exit_code']}, output={len(result['output'])} chars")
# If task completed, we can stop
if completed:
final_response = assistant_message.content
break
else:
# No tool calls - final response
final_response = assistant_message.content or ""
messages.append({
"role": "assistant",
"content": final_response
})
completed = True
print(f"🎉 Agent finished (no more tool calls)")
break
if api_call_count >= self.max_iterations:
print(f"⚠️ Reached max iterations ({self.max_iterations})")
finally:
# Cleanup environment
self._cleanup_env()
# Convert to Hermes trajectory format
trajectory = self._convert_to_hermes_format(messages, task, completed)
return {
"conversations": trajectory,
"completed": completed,
"api_calls": api_call_count,
"metadata": {
"model": self.model,
"env_type": self.env_type,
"timestamp": datetime.now().isoformat()
}
}
def run_batch(
self,
prompts: List[str],
output_file: str
) -> List[Dict[str, Any]]:
"""
Run multiple tasks and save trajectories to a JSONL file.
Args:
prompts: List of task prompts
output_file: Output JSONL file path
Returns:
List of results
"""
results = []
print(f"\n📦 Running batch of {len(prompts)} tasks")
print(f"📁 Output: {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
for i, prompt in enumerate(prompts, 1):
print(f"\n{'='*60}")
print(f"📋 Task {i}/{len(prompts)}")
print(f"{'='*60}")
try:
result = self.run_task(prompt)
results.append(result)
# Write to file immediately
f.write(json.dumps(result, ensure_ascii=False) + "\n")
f.flush()
print(f"✅ Task {i} completed (api_calls={result['api_calls']})")
except Exception as e:
self.logger.error(f"Error on task {i}: {e}")
error_result = {
"conversations": [],
"completed": False,
"api_calls": 0,
"error": str(e),
"metadata": {"timestamp": datetime.now().isoformat()}
}
results.append(error_result)
f.write(json.dumps(error_result, ensure_ascii=False) + "\n")
f.flush()
print(f"\n✅ Batch complete! {len(results)} trajectories saved to {output_file}")
return results
# ============================================================================
# CLI Interface
# ============================================================================
def main(
task: str = None,
prompts_file: str = None,
output_file: str = "mini-swe-agent-test1.jsonl",
model: str = "claude-sonnet-4-20250514",
base_url: str = None,
api_key: str = None,
env: str = "local",
image: str = "python:3.11-slim",
cwd: str = "/tmp",
max_iterations: int = 15,
timeout: int = 60,
verbose: bool = False,
):
"""
Run mini-swe-agent tasks with Hermes trajectory format output.
Args:
task: Single task to run (use this OR prompts_file)
prompts_file: JSONL file with prompts (each line: {"prompt": "..."})
output_file: Output JSONL file for trajectories
model: Model name (default: claude-sonnet-4-20250514)
base_url: API base URL (optional)
api_key: API key (optional, uses env vars)
env: Environment type - "local", "docker", or "modal"
image: Docker/Modal image (default: python:3.11-slim)
cwd: Working directory (default: /tmp)
max_iterations: Maximum tool-calling iterations (default: 15)
timeout: Command timeout in seconds (default: 60)
verbose: Enable verbose logging
Examples:
# Single task with local environment
python mini_swe_runner.py --task "Create hello.py that prints Hello World"
# Single task with Docker
python mini_swe_runner.py --task "List files" --env docker
# Batch from file
python mini_swe_runner.py --prompts_file tasks.jsonl --output_file results.jsonl
"""
print("🚀 Mini-SWE Runner with Hermes Trajectory Format")
print("=" * 60)
# Initialize runner
runner = MiniSWERunner(
model=model,
base_url=base_url,
api_key=api_key,
env_type=env,
image=image,
cwd=cwd,
max_iterations=max_iterations,
command_timeout=timeout,
verbose=verbose,
)
if task:
# Single task mode
result = runner.run_task(task)
# Save to file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
print(f"\n📁 Trajectory saved to: {output_file}")
print(f"✅ Completed: {result['completed']}")
print(f"📞 API calls: {result['api_calls']}")
print(f"💬 Turns: {len(result['conversations'])}")
elif prompts_file:
# Batch mode
prompts = []
with open(prompts_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
entry = json.loads(line)
prompts.append(entry.get("prompt", entry.get("task", "")))
except json.JSONDecodeError:
prompts.append(line)
if not prompts:
print(f"❌ No prompts found in {prompts_file}")
return
runner.run_batch(prompts, output_file)
else:
print("❌ Please provide either --task or --prompts_file")
print(" Example: python mini_swe_runner.py --task 'Create a hello world script'")
if __name__ == "__main__":
fire.Fire(main)

View file

@ -0,0 +1,411 @@
#!/usr/bin/env python3
"""
Sample and Compress HuggingFace Datasets
Downloads trajectories from multiple HuggingFace datasets, randomly samples them,
and runs trajectory compression to fit within a target token budget.
Usage:
python scripts/sample_and_compress.py
# Custom sample size
python scripts/sample_and_compress.py --total_samples=5000
# Custom output name
python scripts/sample_and_compress.py --output_name=compressed_16k
"""
import json
import random
import os
from pathlib import Path
from typing import List, Dict, Any, Tuple
import fire
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Default datasets to sample from
DEFAULT_DATASETS = [
"NousResearch/swe-terminus-agent-glm-kimi-minimax",
"NousResearch/hermes-agent-megascience-sft1",
"NousResearch/Hermes-Agent-Thinking-GLM-4.7-SFT2",
"NousResearch/Hermes-Agent-Thinking-GLM-4.7-SFT1",
"NousResearch/terminal-tasks-glm-hermes-agent"
]
def load_dataset_from_hf(dataset_name: str) -> List[Dict[str, Any]]:
"""
Load a dataset from HuggingFace.
Args:
dataset_name: HuggingFace dataset name (e.g., "NousResearch/dataset-name")
Returns:
List of trajectory entries
"""
from datasets import load_dataset
print(f" Loading {dataset_name}...")
try:
# Try loading with default config
ds = load_dataset(dataset_name, split="train")
except Exception as e:
print(f" ⚠️ Error loading {dataset_name}: {e}")
return []
# Convert to list of dicts
entries = []
for item in ds:
# Handle different possible formats
if "conversations" in item:
entries.append({"conversations": item["conversations"]})
elif "messages" in item:
# Convert messages format to conversations format if needed
entries.append({"conversations": item["messages"]})
else:
# Assume the whole item is the entry
entries.append(dict(item))
print(f" ✅ Loaded {len(entries):,} entries from {dataset_name}")
return entries
# Global tokenizer for multiprocessing (set in worker init)
_TOKENIZER = None
def _init_tokenizer_worker(tokenizer_name: str):
"""Initialize tokenizer in worker process."""
global _TOKENIZER
from transformers import AutoTokenizer
_TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, trust_remote_code=True)
def _count_tokens_for_entry(entry: Dict) -> Tuple[Dict, int]:
"""
Count tokens for a single entry (used in parallel processing).
Args:
entry: Trajectory entry with 'conversations' field
Returns:
Tuple of (entry, token_count)
"""
global _TOKENIZER
conversations = entry.get("conversations", [])
if not conversations:
return entry, 0
total = 0
for turn in conversations:
value = turn.get("value", "")
if value:
try:
total += len(_TOKENIZER.encode(value))
except:
# Fallback to character estimate
total += len(value) // 4
return entry, total
def sample_from_datasets(
datasets: List[str],
total_samples: int,
min_tokens: int = 16000,
tokenizer_name: str = "moonshotai/Kimi-K2-Thinking",
seed: int = 42,
num_proc: int = 8
) -> List[Dict[str, Any]]:
"""
Load all datasets, filter by token count, then randomly sample from combined pool.
Args:
datasets: List of HuggingFace dataset names
total_samples: Total number of samples to collect
min_tokens: Minimum token count to include (only sample trajectories >= this)
tokenizer_name: HuggingFace tokenizer for counting tokens
seed: Random seed for reproducibility
num_proc: Number of parallel processes for tokenization
Returns:
List of sampled trajectory entries
"""
from multiprocessing import Pool
from functools import partial
random.seed(seed)
print(f"\n📥 Loading {len(datasets)} datasets...")
print(f" Minimum tokens: {min_tokens:,} (filtering smaller trajectories)")
print(f" Parallel workers: {num_proc}")
print()
# Load ALL entries from all datasets into one pool
all_entries = []
for dataset_name in datasets:
entries = load_dataset_from_hf(dataset_name)
if not entries:
print(f" ⚠️ Skipping {dataset_name} (no entries loaded)")
continue
# Add source metadata to each entry
for entry in entries:
entry["_source_dataset"] = dataset_name
all_entries.extend(entries)
print(f"\n📊 Total entries loaded: {len(all_entries):,}")
# Filter by token count using parallel processing
print(f"\n🔍 Filtering trajectories with >= {min_tokens:,} tokens (using {num_proc} workers)...")
filtered_entries = []
token_counts = []
# Use multiprocessing for token counting
with Pool(
processes=num_proc,
initializer=_init_tokenizer_worker,
initargs=(tokenizer_name,)
) as pool:
# Process in chunks and show progress
chunk_size = 1000
processed = 0
for result in pool.imap_unordered(_count_tokens_for_entry, all_entries, chunksize=100):
entry, token_count = result
processed += 1
if processed % chunk_size == 0:
print(f" Processed {processed:,}/{len(all_entries):,}...", end="\r")
if token_count >= min_tokens:
entry["_original_tokens"] = token_count
filtered_entries.append(entry)
token_counts.append(token_count)
print(f"\n ✅ Found {len(filtered_entries):,} trajectories >= {min_tokens:,} tokens")
if token_counts:
avg_tokens = sum(token_counts) / len(token_counts)
print(f" 📈 Token stats: min={min(token_counts):,}, max={max(token_counts):,}, avg={avg_tokens:,.0f}")
# Random sample from the filtered pool
if len(filtered_entries) <= total_samples:
print(f"\n⚠️ Only {len(filtered_entries):,} trajectories available, using all of them")
sampled = filtered_entries
else:
sampled = random.sample(filtered_entries, total_samples)
print(f"\n✅ Randomly sampled {len(sampled):,} trajectories from pool of {len(filtered_entries):,}")
# Show source distribution
source_counts = {}
for entry in sampled:
source = entry.get("_source_dataset", "unknown").split("/")[-1]
source_counts[source] = source_counts.get(source, 0) + 1
print(f"\n📌 Sample distribution by source:")
for source, count in sorted(source_counts.items()):
print(f" {source}: {count:,}")
# Shuffle
random.shuffle(sampled)
return sampled
def save_samples_for_compression(
samples: List[Dict[str, Any]],
output_dir: Path,
batch_size: int = 100
):
"""
Save samples to JSONL files for trajectory compression.
Args:
samples: List of trajectory entries
output_dir: Directory to save JSONL files
batch_size: Number of entries per file
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Split into batches
num_batches = (len(samples) + batch_size - 1) // batch_size
print(f"\n💾 Saving {len(samples)} samples to {output_dir}")
print(f" Batch size: {batch_size}, Total batches: {num_batches}")
for i in range(num_batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, len(samples))
batch = samples[start_idx:end_idx]
output_file = output_dir / f"batch_{i}.jsonl"
with open(output_file, 'w', encoding='utf-8') as f:
for entry in batch:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
print(f" ✅ Saved {num_batches} batch files")
def run_compression(input_dir: Path, output_dir: Path, config_path: str):
"""
Run trajectory compression on the sampled data.
Args:
input_dir: Directory containing JSONL files to compress
output_dir: Directory for compressed output
config_path: Path to compression config YAML
"""
# Import the compressor
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from trajectory_compressor import TrajectoryCompressor, CompressionConfig
print(f"\n🗜️ Running trajectory compression...")
print(f" Input: {input_dir}")
print(f" Output: {output_dir}")
print(f" Config: {config_path}")
# Load config
config = CompressionConfig.from_yaml(config_path)
# Initialize compressor
compressor = TrajectoryCompressor(config)
# Run compression
compressor.process_directory(input_dir, output_dir)
def merge_output_to_single_jsonl(input_dir: Path, output_file: Path):
"""
Merge all JSONL files in a directory into a single JSONL file.
Args:
input_dir: Directory containing JSONL files
output_file: Output JSONL file path
"""
print(f"\n📦 Merging output files into {output_file.name}...")
all_entries = []
for jsonl_file in sorted(input_dir.glob("*.jsonl")):
if jsonl_file.name == output_file.name:
continue
with open(jsonl_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
all_entries.append(json.loads(line))
# Write merged file
with open(output_file, 'w', encoding='utf-8') as f:
for entry in all_entries:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
print(f" ✅ Merged {len(all_entries):,} entries into {output_file.name}")
return output_file
def main(
total_samples: int = 2500,
output_name: str = "compressed_agentic",
datasets: str = None,
config: str = "configs/trajectory_compression.yaml",
seed: int = 42,
batch_size: int = 100,
min_tokens: int = 16000,
num_proc: int = 8,
skip_download: bool = False,
):
"""
Sample trajectories from HuggingFace datasets and run compression.
Args:
total_samples: Total number of samples to collect (default: 2500)
output_name: Name for output directory/file (default: "compressed_agentic")
datasets: Comma-separated list of dataset names (uses defaults if not provided)
config: Path to compression config YAML
seed: Random seed for reproducibility
batch_size: Number of entries per JSONL file during processing
min_tokens: Minimum token count to filter trajectories (default: 16000)
num_proc: Number of parallel workers for tokenization (default: 8)
skip_download: Skip download and use existing sampled data
"""
print("=" * 70)
print("📊 TRAJECTORY SAMPLING AND COMPRESSION")
print("=" * 70)
# Parse datasets
if datasets:
dataset_list = [d.strip() for d in datasets.split(",")]
else:
dataset_list = DEFAULT_DATASETS
print(f"\n📋 Configuration:")
print(f" Total samples: {total_samples:,}")
print(f" Min tokens filter: {min_tokens:,}")
print(f" Parallel workers: {num_proc}")
print(f" Datasets: {len(dataset_list)}")
for ds in dataset_list:
print(f" - {ds}")
print(f" Output name: {output_name}")
print(f" Config: {config}")
print(f" Seed: {seed}")
# Setup paths
base_dir = Path(__file__).parent.parent
sampled_dir = base_dir / "data" / f"{output_name}_raw"
compressed_dir = base_dir / "data" / f"{output_name}_batches"
final_output = base_dir / "data" / f"{output_name}.jsonl"
if not skip_download:
# Step 1: Download, filter by token count, and sample from combined pool
samples = sample_from_datasets(
dataset_list,
total_samples,
min_tokens=min_tokens,
seed=seed,
num_proc=num_proc
)
if not samples:
print("❌ No samples collected. Exiting.")
return
# Step 2: Save to JSONL files
save_samples_for_compression(samples, sampled_dir, batch_size)
else:
print(f"\n⏭️ Skipping download, using existing data in {sampled_dir}")
# Step 3: Run compression
config_path = base_dir / config
if not config_path.exists():
print(f"❌ Config not found: {config_path}")
return
run_compression(sampled_dir, compressed_dir, str(config_path))
# Step 4: Merge into single JSONL file
merge_output_to_single_jsonl(compressed_dir, final_output)
print("\n" + "=" * 70)
print("✅ COMPLETE!")
print("=" * 70)
print(f"\n📁 Raw samples: {sampled_dir}")
print(f"📁 Compressed batches: {compressed_dir}")
print(f"📁 Final output: {final_output}")
print(f"\nTo upload to HuggingFace:")
print(f" huggingface-cli upload NousResearch/{output_name} {final_output}")
if __name__ == "__main__":
fire.Fire(main)

1243
trajectory_compressor.py Normal file

File diff suppressed because it is too large Load diff