refactor: move standalone scripts to scripts/ directory

Move batch_runner, trajectory_compressor, mini_swe_runner, and rl_cli
from the project root into scripts/, update all imports, logger names,
pyproject.toml, and downstream test references.
This commit is contained in:
alt-glitch 2026-04-21 15:23:23 +05:30
parent 224e6d46d9
commit ca2b6a529e
20 changed files with 51 additions and 41 deletions

0
scripts/__init__.py Normal file
View file

1295
scripts/batch_runner.py Normal file

File diff suppressed because it is too large Load diff

739
scripts/mini_swe_runner.py Normal file
View file

@ -0,0 +1,739 @@
#!/usr/bin/env python3
"""
SWE Runner with Hermes Trajectory Format
A runner that uses Hermes-Agent's built-in execution environments
(local, docker, modal) and outputs trajectories in the Hermes-Agent format
compatible with batch_runner.py and trajectory_compressor.py.
Features:
- Uses Hermes-Agent's Docker, Modal, or Local environments for command execution
- Outputs trajectories in Hermes format (from/value pairs with <tool_call>/<tool_response> XML)
- Compatible with the trajectory compression pipeline
- Supports batch processing from JSONL prompt files
Usage:
# Run a single task with local environment
python mini_swe_runner.py --task "Create a hello world Python script" --env local
# Run with Docker
python mini_swe_runner.py --task "List files in /tmp" --env docker --image python:3.11-slim
# Run with Modal (cloud)
python mini_swe_runner.py --task "Install numpy and test it" --env modal --image python:3.11-slim
# Batch mode from JSONL file
python mini_swe_runner.py --prompts_file prompts.jsonl --output_file trajectories.jsonl --env docker
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
import logging
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Literal
import fire
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def _effective_temperature_for_model(
model: str,
base_url: Optional[str] = None,
) -> Optional[float]:
"""Return a fixed temperature for models with strict sampling contracts.
Returns ``None`` when the model manages temperature server-side (Kimi);
callers must omit the ``temperature`` kwarg entirely in that case.
"""
try:
from agent.auxiliary_client import _fixed_temperature_for_model, OMIT_TEMPERATURE
except Exception:
return None
result = _fixed_temperature_for_model(model, base_url)
if result is OMIT_TEMPERATURE:
return None # caller must omit temperature
return result
# ============================================================================
# Terminal Tool Definition (matches Hermes-Agent format)
# ============================================================================
TERMINAL_TOOL_DEFINITION = {
"type": "function",
"function": {
"name": "terminal",
"description": """Execute bash commands in a sandboxed environment.
**Environment:**
- Isolated execution environment (local, Docker, or Modal cloud)
- Filesystem persists between tool calls within the same task
- Internet access available
**Command Execution:**
- Provide the command to execute via the 'command' parameter
- Optional 'timeout' parameter in seconds (default: 60)
**Examples:**
- Run command: `{"command": "ls -la"}`
- With timeout: `{"command": "long_task.sh", "timeout": 300}`
**Best Practices:**
- Use non-interactive commands (avoid vim, nano, interactive python)
- Pipe to cat if output might be large
- Install tools with apt-get or pip as needed
**Completion:**
- When task is complete, output: echo "MINI_SWE_AGENT_FINAL_OUTPUT" followed by your result
""",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
},
"timeout": {
"type": "integer",
"description": "Command timeout in seconds (default: 60)"
}
},
"required": ["command"]
}
}
}
# ============================================================================
# Environment Factory
# ============================================================================
def create_environment(
env_type: str = "local",
image: str = "python:3.11-slim",
cwd: str = "/tmp",
timeout: int = 60,
**kwargs
):
"""
Create an execution environment using Hermes-Agent's built-in backends.
Args:
env_type: One of "local", "docker", "modal"
image: Docker/Modal image name (ignored for local)
cwd: Working directory
timeout: Default command timeout
**kwargs: Additional environment-specific options
Returns:
Environment instance with execute() and cleanup() methods
"""
if env_type == "local":
from tools.environments.local import LocalEnvironment
return LocalEnvironment(cwd=cwd, timeout=timeout)
elif env_type == "docker":
from tools.environments.docker import DockerEnvironment
return DockerEnvironment(image=image, cwd=cwd, timeout=timeout, **kwargs)
elif env_type == "modal":
from tools.environments.modal import ModalEnvironment
return ModalEnvironment(image=image, cwd=cwd, timeout=timeout, **kwargs)
else:
raise ValueError(f"Unknown environment type: {env_type}. Use 'local', 'docker', or 'modal'")
# ============================================================================
# Mini-SWE Runner with Hermes Trajectory Format
# ============================================================================
class MiniSWERunner:
"""
Agent runner that uses Hermes-Agent's built-in execution environments
and outputs trajectories in Hermes-Agent format.
"""
def __init__(
self,
model: str = "anthropic/claude-sonnet-4.6",
base_url: str = None,
api_key: str = None,
env_type: str = "local",
image: str = "python:3.11-slim",
cwd: str = "/tmp",
max_iterations: int = 15,
command_timeout: int = 60,
verbose: bool = False,
):
"""
Initialize the Mini-SWE Runner.
Args:
model: Model name for OpenAI-compatible API
base_url: API base URL (optional, uses env vars if not provided)
api_key: API key (optional, uses env vars if not provided)
env_type: Environment type - "local", "docker", or "modal"
image: Docker/Modal image (ignored for local)
cwd: Working directory for commands
max_iterations: Maximum tool-calling iterations
command_timeout: Default timeout for commands
verbose: Enable verbose logging
"""
self.model = model
self.max_iterations = max_iterations
self.command_timeout = command_timeout
self.verbose = verbose
self.env_type = env_type
self.image = image
self.cwd = cwd
# Setup logging
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
self.logger = logging.getLogger(__name__)
# Initialize LLM client via centralized provider router.
# If explicit api_key/base_url are provided (e.g. from CLI args),
# construct directly. Otherwise use the router for OpenRouter.
if api_key or base_url:
from openai import OpenAI
client_kwargs = {
"base_url": base_url or "https://openrouter.ai/api/v1",
"api_key": api_key or os.getenv(
"OPENROUTER_API_KEY",
os.getenv("ANTHROPIC_API_KEY",
os.getenv("OPENAI_API_KEY", ""))),
}
self.client = OpenAI(**client_kwargs)
else:
from agent.auxiliary_client import resolve_provider_client
self.client, _ = resolve_provider_client("openrouter", model=model)
if self.client is None:
# Fallback: try auto-detection
self.client, _ = resolve_provider_client("auto", model=model)
if self.client is None:
from openai import OpenAI
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY", ""))
# Environment will be created per-task
self.env = None
# Tool definition
self.tools = [TERMINAL_TOOL_DEFINITION]
print("🤖 Mini-SWE Runner initialized")
print(f" Model: {self.model}")
print(f" Environment: {self.env_type}")
if self.env_type != "local":
print(f" Image: {self.image}")
print(f" Max iterations: {self.max_iterations}")
def _create_env(self):
"""Create the execution environment."""
print(f"🔧 Creating {self.env_type} environment...")
self.env = create_environment(
env_type=self.env_type,
image=self.image,
cwd=self.cwd,
timeout=self.command_timeout
)
print("✅ Environment ready")
def _cleanup_env(self):
"""Cleanup the execution environment."""
if self.env is not None:
if hasattr(self.env, 'cleanup'):
self.env.cleanup()
elif hasattr(self.env, 'stop'):
self.env.stop()
self.env = None
def _execute_command(self, command: str, timeout: int = None) -> Dict[str, Any]:
"""
Execute a command in the environment.
Args:
command: Bash command to execute
timeout: Optional timeout override
Returns:
Dict with 'output' and 'returncode'
"""
if self.env is None:
self._create_env()
try:
result = self.env.execute(command, timeout=timeout or self.command_timeout)
return {
"output": result.get("output", ""),
"exit_code": result.get("returncode", 0),
"error": None
}
except Exception as e:
return {
"output": "",
"exit_code": -1,
"error": str(e)
}
def _format_tools_for_system_message(self) -> str:
"""Format tool definitions for the system message."""
formatted_tools = []
for tool in self.tools:
func = tool["function"]
formatted_tools.append({
"name": func["name"],
"description": func.get("description", ""),
"parameters": func.get("parameters", {}),
"required": None
})
return json.dumps(formatted_tools, ensure_ascii=False)
def _convert_to_hermes_format(
self,
messages: List[Dict[str, Any]],
user_query: str,
completed: bool
) -> List[Dict[str, Any]]:
"""
Convert internal message format to Hermes trajectory format.
This produces the exact format used by batch_runner.py.
"""
trajectory = []
# System message with tool definitions
system_msg = (
"You are a function calling AI model. You are provided with function signatures within <tools> </tools> XML tags. "
"You may call one or more functions to assist with the user query. If available tools are not relevant in assisting "
"with user query, just respond in natural conversational language. Don't make assumptions about what values to plug "
"into functions. After calling & executing the functions, you will be provided with function results within "
"<tool_response> </tool_response> XML tags. Here are the available tools:\n"
f"<tools>\n{self._format_tools_for_system_message()}\n</tools>\n"
"For each function call return a JSON object, with the following pydantic model json schema for each:\n"
"{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, "
"'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n"
"Each function call should be enclosed within <tool_call> </tool_call> XML tags.\n"
"Example:\n<tool_call>\n{'name': <function-name>,'arguments': <args-dict>}\n</tool_call>"
)
trajectory.append({"from": "system", "value": system_msg})
trajectory.append({"from": "human", "value": user_query})
# Process messages (skip first user message as we already added it)
i = 1
while i < len(messages):
msg = messages[i]
if msg["role"] == "assistant":
if "tool_calls" in msg and msg["tool_calls"]:
# Assistant message with tool calls
content = ""
# Add reasoning if present
if msg.get("reasoning"):
content = f"<think>{msg['reasoning']}</think>"
if msg.get("content"):
content += msg["content"] + "\n"
# Add tool calls in XML format
for tool_call in msg["tool_calls"]:
if not tool_call or not isinstance(tool_call, dict): continue
try:
arguments = json.loads(tool_call["function"]["arguments"]) \
if isinstance(tool_call["function"]["arguments"], str) \
else tool_call["function"]["arguments"]
except json.JSONDecodeError:
arguments = {}
tool_call_json = {
"name": tool_call["function"]["name"],
"arguments": arguments
}
content += f"<tool_call>\n{json.dumps(tool_call_json, ensure_ascii=False)}\n</tool_call>\n"
trajectory.append({"from": "gpt", "value": content.rstrip()})
# Collect subsequent tool responses
tool_responses = []
j = i + 1
while j < len(messages) and messages[j]["role"] == "tool":
tool_msg = messages[j]
tool_content = tool_msg["content"]
# Try to parse as JSON
try:
if tool_content.strip().startswith(("{", "[")):
tool_content = json.loads(tool_content)
except (json.JSONDecodeError, AttributeError):
pass
tool_response = "<tool_response>\n"
tool_response += json.dumps({
"tool_call_id": tool_msg.get("tool_call_id", ""),
"name": msg["tool_calls"][len(tool_responses)]["function"]["name"] \
if len(tool_responses) < len(msg["tool_calls"]) else "unknown",
"content": tool_content
}, ensure_ascii=False)
tool_response += "\n</tool_response>"
tool_responses.append(tool_response)
j += 1
if tool_responses:
trajectory.append({"from": "tool", "value": "\n".join(tool_responses)})
i = j - 1
else:
# Regular assistant message (no tool calls)
content = ""
if msg.get("reasoning"):
content = f"<think>{msg['reasoning']}</think>"
content += msg.get("content") or ""
trajectory.append({"from": "gpt", "value": content})
elif msg["role"] == "user":
trajectory.append({"from": "human", "value": msg["content"]})
i += 1
return trajectory
def run_task(self, task: str) -> Dict[str, Any]:
"""
Run a single task and return the result with trajectory.
Args:
task: The task/prompt to execute
Returns:
Dict with trajectory, completion status, and metadata
"""
print(f"\n{'='*60}")
print(f"📝 Task: {task[:80]}{'...' if len(task) > 80 else ''}")
print(f"{'='*60}")
# Initialize environment
self._create_env()
# Message history
messages = [{"role": "user", "content": task}]
# System prompt for the LLM (ephemeral - not saved to trajectory)
system_prompt = """You are an AI agent that can execute bash commands to complete tasks.
When you need to run commands, use the 'terminal' tool with your bash command.
**Important:**
- When you have completed the task successfully, run: echo "MINI_SWE_AGENT_FINAL_OUTPUT" followed by a summary
- Be concise and efficient in your approach
- Install any needed tools with apt-get or pip
- Avoid interactive commands (no vim, nano, less, etc.)
Complete the user's task step by step."""
api_call_count = 0
completed = False
final_response = None
try:
while api_call_count < self.max_iterations:
api_call_count += 1
print(f"\n🔄 API call #{api_call_count}/{self.max_iterations}")
# Prepare API messages
api_messages = [{"role": "system", "content": system_prompt}] + messages
# Make API call
try:
api_kwargs = {
"model": self.model,
"messages": api_messages,
"tools": self.tools,
"timeout": 300.0,
}
fixed_temperature = _effective_temperature_for_model(
self.model,
str(getattr(self.client, "base_url", "") or ""),
)
if fixed_temperature is not None:
api_kwargs["temperature"] = fixed_temperature
response = self.client.chat.completions.create(**api_kwargs)
except Exception as e:
self.logger.error(f"API call failed: {e}")
break
assistant_message = response.choices[0].message
# Log assistant response
if assistant_message.content:
print(f"🤖 Assistant: {assistant_message.content[:100]}...")
# Check for tool calls
if assistant_message.tool_calls:
print(f"🔧 Tool calls: {len(assistant_message.tool_calls)}")
# Add assistant message with tool calls
messages.append({
"role": "assistant",
"content": assistant_message.content,
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in assistant_message.tool_calls
]
})
# Execute each tool call
for tc in assistant_message.tool_calls:
try:
args = json.loads(tc.function.arguments)
except json.JSONDecodeError:
args = {}
command = args.get("command", "echo 'No command provided'")
timeout = args.get("timeout", self.command_timeout)
print(f" 📞 terminal: {command[:60]}...")
# Execute command
result = self._execute_command(command, timeout)
# Format result
result_json = json.dumps({
"content": {
"output": result["output"],
"exit_code": result["exit_code"],
"error": result["error"]
}
}, ensure_ascii=False)
# Check for task completion signal
if "MINI_SWE_AGENT_FINAL_OUTPUT" in result["output"]:
print(" ✅ Task completion signal detected!")
completed = True
# Add tool response
messages.append({
"role": "tool",
"content": result_json,
"tool_call_id": tc.id
})
print(f" ✅ exit_code={result['exit_code']}, output={len(result['output'])} chars")
# If task completed, we can stop
if completed:
final_response = assistant_message.content
break
else:
# No tool calls - final response
final_response = assistant_message.content or ""
messages.append({
"role": "assistant",
"content": final_response
})
completed = True
print("🎉 Agent finished (no more tool calls)")
break
if api_call_count >= self.max_iterations:
print(f"⚠️ Reached max iterations ({self.max_iterations})")
finally:
# Cleanup environment
self._cleanup_env()
# Convert to Hermes trajectory format
trajectory = self._convert_to_hermes_format(messages, task, completed)
return {
"conversations": trajectory,
"completed": completed,
"api_calls": api_call_count,
"metadata": {
"model": self.model,
"env_type": self.env_type,
"timestamp": datetime.now().isoformat()
}
}
def run_batch(
self,
prompts: List[str],
output_file: str
) -> List[Dict[str, Any]]:
"""
Run multiple tasks and save trajectories to a JSONL file.
Args:
prompts: List of task prompts
output_file: Output JSONL file path
Returns:
List of results
"""
results = []
print(f"\n📦 Running batch of {len(prompts)} tasks")
print(f"📁 Output: {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
for i, prompt in enumerate(prompts, 1):
print(f"\n{'='*60}")
print(f"📋 Task {i}/{len(prompts)}")
print(f"{'='*60}")
try:
result = self.run_task(prompt)
results.append(result)
# Write to file immediately
f.write(json.dumps(result, ensure_ascii=False) + "\n")
f.flush()
print(f"✅ Task {i} completed (api_calls={result['api_calls']})")
except Exception as e:
self.logger.error(f"Error on task {i}: {e}")
error_result = {
"conversations": [],
"completed": False,
"api_calls": 0,
"error": str(e),
"metadata": {"timestamp": datetime.now().isoformat()}
}
results.append(error_result)
f.write(json.dumps(error_result, ensure_ascii=False) + "\n")
f.flush()
print(f"\n✅ Batch complete! {len(results)} trajectories saved to {output_file}")
return results
# ============================================================================
# CLI Interface
# ============================================================================
def main(
task: str = None,
prompts_file: str = None,
output_file: str = "swe-runner-test1.jsonl",
model: str = "claude-sonnet-4-20250514",
base_url: str = None,
api_key: str = None,
env: str = "local",
image: str = "python:3.11-slim",
cwd: str = "/tmp",
max_iterations: int = 15,
timeout: int = 60,
verbose: bool = False,
):
"""
Run SWE tasks with Hermes trajectory format output.
Args:
task: Single task to run (use this OR prompts_file)
prompts_file: JSONL file with prompts (each line: {"prompt": "..."})
output_file: Output JSONL file for trajectories
model: Model name (default: claude-sonnet-4-20250514)
base_url: API base URL (optional)
api_key: API key (optional, uses env vars)
env: Environment type - "local", "docker", or "modal"
image: Docker/Modal image (default: python:3.11-slim)
cwd: Working directory (default: /tmp)
max_iterations: Maximum tool-calling iterations (default: 15)
timeout: Command timeout in seconds (default: 60)
verbose: Enable verbose logging
Examples:
# Single task with local environment
python mini_swe_runner.py --task "Create hello.py that prints Hello World"
# Single task with Docker
python mini_swe_runner.py --task "List files" --env docker
# Batch from file
python mini_swe_runner.py --prompts_file tasks.jsonl --output_file results.jsonl
"""
print("🚀 Mini-SWE Runner with Hermes Trajectory Format")
print("=" * 60)
# Initialize runner
runner = MiniSWERunner(
model=model,
base_url=base_url,
api_key=api_key,
env_type=env,
image=image,
cwd=cwd,
max_iterations=max_iterations,
command_timeout=timeout,
verbose=verbose,
)
if task:
# Single task mode
result = runner.run_task(task)
# Save to file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
print(f"\n📁 Trajectory saved to: {output_file}")
print(f"✅ Completed: {result['completed']}")
print(f"📞 API calls: {result['api_calls']}")
print(f"💬 Turns: {len(result['conversations'])}")
elif prompts_file:
# Batch mode
prompts = []
with open(prompts_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
entry = json.loads(line)
prompts.append(entry.get("prompt", entry.get("task", "")))
except json.JSONDecodeError:
prompts.append(line)
if not prompts:
print(f"❌ No prompts found in {prompts_file}")
return
runner.run_batch(prompts, output_file)
else:
print("❌ Please provide either --task or --prompts_file")
print(" Example: python mini_swe_runner.py --task 'Create a hello world script'")
if __name__ == "__main__":
fire.Fire(main)

449
scripts/rl_cli.py Normal file
View file

@ -0,0 +1,449 @@
#!/usr/bin/env python3
"""
RL Training CLI Runner
Dedicated CLI runner for RL training workflows with:
- Extended timeouts for long-running training
- RL-focused system prompts
- Full toolset including RL training tools
- Special handling for 30-minute check intervals
Usage:
python rl_cli.py "Train a model on GSM8k for math reasoning"
python rl_cli.py --interactive
python rl_cli.py --list-environments
Environment Variables:
TINKER_API_KEY: API key for Tinker service (required)
WANDB_API_KEY: API key for WandB metrics (required)
OPENROUTER_API_KEY: API key for OpenRouter (required for agent)
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from pathlib import Path
import fire
import yaml
from hermes_constants import get_hermes_home, OPENROUTER_BASE_URL
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
_hermes_home = get_hermes_home()
_project_env = Path(__file__).parent.parent / '.env'
from hermes_cli.env_loader import load_hermes_dotenv
_loaded_env_paths = load_hermes_dotenv(hermes_home=_hermes_home, project_env=_project_env)
for _env_path in _loaded_env_paths:
print(f"✅ Loaded environment variables from {_env_path}")
# Set terminal working directory to tinker-atropos submodule
# This ensures terminal commands run in the right context for RL work
tinker_atropos_dir = Path(__file__).parent / 'tinker-atropos'
if tinker_atropos_dir.exists():
os.environ['TERMINAL_CWD'] = str(tinker_atropos_dir)
os.environ['HERMES_QUIET'] = '1' # Disable temp subdirectory creation
print(f"📂 Terminal working directory: {tinker_atropos_dir}")
else:
# Fall back to hermes-agent directory if submodule not found
os.environ['TERMINAL_CWD'] = str(Path(__file__).parent)
os.environ['HERMES_QUIET'] = '1'
print(f"⚠️ tinker-atropos submodule not found, using: {Path(__file__).parent}")
# Import agent and tools
from run_agent import AIAgent
from tools.rl_training_tool import get_missing_keys
# ============================================================================
# Config Loading
# ============================================================================
DEFAULT_MODEL = "anthropic/claude-opus-4.5"
DEFAULT_BASE_URL = OPENROUTER_BASE_URL
def load_hermes_config() -> dict:
"""
Load configuration from ~/.hermes/config.yaml.
Returns:
dict: Configuration with model, base_url, etc.
"""
config_path = _hermes_home / 'config.yaml'
config = {
"model": DEFAULT_MODEL,
"base_url": DEFAULT_BASE_URL,
}
if config_path.exists():
try:
with open(config_path, "r") as f:
file_config = yaml.safe_load(f) or {}
# Get model from config
if "model" in file_config:
if isinstance(file_config["model"], str):
config["model"] = file_config["model"]
elif isinstance(file_config["model"], dict):
config["model"] = file_config["model"].get("default", DEFAULT_MODEL)
# Get base_url if specified
if "base_url" in file_config:
config["base_url"] = file_config["base_url"]
except Exception as e:
print(f"⚠️ Warning: Failed to load config.yaml: {e}")
return config
# ============================================================================
# RL-Specific Configuration
# ============================================================================
# Extended timeouts for long-running RL operations
RL_MAX_ITERATIONS = 200 # Allow many more iterations for long workflows
# RL-focused system prompt
RL_SYSTEM_PROMPT = """You are an automated post-training engineer specializing in reinforcement learning for language models.
## Your Capabilities
You have access to RL training tools for running reinforcement learning on models through Tinker-Atropos:
1. **DISCOVER**: Use `rl_list_environments` to see available RL environments
2. **INSPECT**: Read environment files to understand how they work (verifiers, data loading, rewards)
3. **INSPECT DATA**: Use terminal to explore HuggingFace datasets and understand their format
4. **CREATE**: Copy existing environments as templates, modify for your needs
5. **CONFIGURE**: Use `rl_select_environment` and `rl_edit_config` to set up training
6. **TEST**: Always use `rl_test_inference` before full training to validate your setup
7. **TRAIN**: Use `rl_start_training` to begin, `rl_check_status` to monitor
8. **EVALUATE**: Use `rl_get_results` and analyze WandB metrics to assess performance
## Environment Files
Environment files are located in: `tinker-atropos/tinker_atropos/environments/`
Study existing environments to learn patterns. Look for:
- `load_dataset()` calls - how data is loaded
- `score_answer()` / `score()` - verification logic
- `get_next_item()` - prompt formatting
- `system_prompt` - instruction format
- `config_init()` - default configuration
## Creating New Environments
To create a new environment:
1. Read an existing environment file (e.g., gsm8k_tinker.py)
2. Use terminal to explore the target dataset format
3. Copy the environment file as a template
4. Modify the dataset loading, prompt formatting, and verifier logic
5. Test with `rl_test_inference` before training
## Important Guidelines
- **Always test before training**: Training runs take hours - verify everything works first
- **Monitor metrics**: Check WandB for reward/mean and percent_correct
- **Status check intervals**: Wait at least 30 minutes between status checks
- **Early stopping**: Stop training early if metrics look bad or stagnant
- **Iterate quickly**: Start with small total_steps to validate, then scale up
## Available Toolsets
You have access to:
- **RL tools**: Environment discovery, config management, training, testing
- **Terminal**: Run commands, inspect files, explore datasets
- **Web**: Search for information, documentation, papers
- **File tools**: Read and modify code files
When asked to train a model, follow this workflow:
1. List available environments
2. Select and configure the appropriate environment
3. Test with sample prompts
4. Start training with conservative settings
5. Monitor progress and adjust as needed
"""
# Toolsets to enable for RL workflows
RL_TOOLSETS = ["terminal", "web", "rl"]
# ============================================================================
# Helper Functions
# ============================================================================
def check_requirements():
"""Check that all required environment variables and services are available."""
errors = []
# Check API keys
if not os.getenv("OPENROUTER_API_KEY"):
errors.append("OPENROUTER_API_KEY not set - required for agent")
missing_rl_keys = get_missing_keys()
if missing_rl_keys:
errors.append(f"Missing RL API keys: {', '.join(missing_rl_keys)}")
if errors:
print("❌ Missing requirements:")
for error in errors:
print(f" - {error}")
print("\nPlease set these environment variables in your .env file or shell.")
return False
return True
def check_tinker_atropos():
"""Check if tinker-atropos submodule is properly set up."""
tinker_path = Path(__file__).parent / "tinker-atropos"
if not tinker_path.exists():
return False, "tinker-atropos submodule not found. Run: git submodule update --init"
envs_path = tinker_path / "tinker_atropos" / "environments"
if not envs_path.exists():
return False, f"environments directory not found at {envs_path}"
env_files = list(envs_path.glob("*.py"))
env_files = [f for f in env_files if not f.name.startswith("_")]
return True, {"path": str(tinker_path), "environments_count": len(env_files)}
def list_environments_sync():
"""List available environments (synchronous wrapper)."""
from tools.rl_training_tool import rl_list_environments
import json
async def _list():
result = await rl_list_environments()
return json.loads(result)
return asyncio.run(_list())
# ============================================================================
# Main CLI
# ============================================================================
def main(
task: str = None,
model: str = None,
api_key: str = None,
base_url: str = None,
max_iterations: int = RL_MAX_ITERATIONS,
interactive: bool = False,
list_environments: bool = False,
check_server: bool = False,
verbose: bool = False,
save_trajectories: bool = True,
):
"""
RL Training CLI - Dedicated runner for RL training workflows.
Args:
task: The training task/goal (e.g., "Train a model on GSM8k for math")
model: Model to use for the agent (reads from ~/.hermes/config.yaml if not provided)
api_key: OpenRouter API key (uses OPENROUTER_API_KEY env var if not provided)
base_url: API base URL (reads from config or defaults to OpenRouter)
max_iterations: Maximum agent iterations (default: 200 for long workflows)
interactive: Run in interactive mode (multiple conversations)
list_environments: Just list available RL environments and exit
check_server: Check if RL API server is running and exit
verbose: Enable verbose logging
save_trajectories: Save conversation trajectories (default: True for RL)
Examples:
# Train on a specific environment
python rl_cli.py "Train a model on GSM8k math problems"
# Interactive mode
python rl_cli.py --interactive
# List available environments
python rl_cli.py --list-environments
# Check server status
python rl_cli.py --check-server
"""
# Load config from ~/.hermes/config.yaml
config = load_hermes_config()
# Use config values if not explicitly provided
if model is None:
model = config["model"]
if base_url is None:
base_url = config["base_url"]
print("🎯 RL Training Agent")
print("=" * 60)
# Handle setup check
if check_server:
print("\n🔍 Checking tinker-atropos setup...")
ok, result = check_tinker_atropos()
if ok:
print("✅ tinker-atropos submodule found")
print(f" Path: {result.get('path')}")
print(f" Environments found: {result.get('environments_count', 0)}")
# Also check API keys
missing = get_missing_keys()
if missing:
print(f"\n⚠️ Missing API keys: {', '.join(missing)}")
print(" Add them to ~/.hermes/.env")
else:
print("✅ API keys configured")
else:
print(f"❌ tinker-atropos not set up: {result}")
print("\nTo set up:")
print(" git submodule update --init")
print(" pip install -e ./tinker-atropos")
return
# Handle environment listing
if list_environments:
print("\n📋 Available RL Environments:")
print("-" * 40)
try:
data = list_environments_sync()
if "error" in data:
print(f"❌ Error: {data['error']}")
return
envs = data.get("environments", [])
if not envs:
print("No environments found.")
print("\nMake sure tinker-atropos is set up:")
print(" git submodule update --init")
return
for env in envs:
print(f"\n 📦 {env['name']}")
print(f" Class: {env['class_name']}")
print(f" Path: {env['file_path']}")
if env.get('description'):
desc = env['description'][:100] + "..." if len(env.get('description', '')) > 100 else env.get('description', '')
print(f" Description: {desc}")
print(f"\n📊 Total: {len(envs)} environments")
print("\nUse `rl_select_environment(name)` to select an environment for training.")
except Exception as e:
print(f"❌ Error listing environments: {e}")
print("\nMake sure tinker-atropos is set up:")
print(" git submodule update --init")
print(" pip install -e ./tinker-atropos")
return
# Check requirements
if not check_requirements():
sys.exit(1)
# Set default task if none provided
if not task and not interactive:
print("\n⚠️ No task provided. Use --interactive for interactive mode or provide a task.")
print("\nExamples:")
print(' python rl_cli.py "Train a model on GSM8k math problems"')
print(' python rl_cli.py "Create an RL environment for code generation"')
print(' python rl_cli.py --interactive')
return
# Get API key
api_key = api_key or os.getenv("OPENROUTER_API_KEY")
if not api_key:
print("❌ No API key provided. Set OPENROUTER_API_KEY or pass --api-key")
sys.exit(1)
print(f"\n🤖 Model: {model}")
print(f"🔧 Max iterations: {max_iterations}")
print(f"📁 Toolsets: {', '.join(RL_TOOLSETS)}")
print("=" * 60)
# Create agent with RL configuration
agent = AIAgent(
base_url=base_url,
api_key=api_key,
model=model,
max_iterations=max_iterations,
enabled_toolsets=RL_TOOLSETS,
save_trajectories=save_trajectories,
verbose_logging=verbose,
quiet_mode=False,
ephemeral_system_prompt=RL_SYSTEM_PROMPT,
)
if interactive:
# Interactive mode - multiple conversations
print("\n🔄 Interactive RL Training Mode")
print("Type 'quit' or 'exit' to end the session.")
print("Type 'status' to check active training runs.")
print("-" * 40)
while True:
try:
user_input = input("\n🎯 RL Task> ").strip()
if not user_input:
continue
if user_input.lower() in ('quit', 'exit', 'q'):
print("\n👋 Goodbye!")
break
if user_input.lower() == 'status':
# Quick status check
from tools.rl_training_tool import rl_list_runs
import json
result = asyncio.run(rl_list_runs())
runs = json.loads(result)
if isinstance(runs, list) and runs:
print("\n📊 Active Runs:")
for run in runs:
print(f" - {run['run_id']}: {run['environment']} ({run['status']})")
else:
print("\nNo active runs.")
continue
# Run the agent
print("\n" + "=" * 60)
response = agent.run_conversation(user_input)
print("\n" + "=" * 60)
except KeyboardInterrupt:
print("\n\n👋 Interrupted. Goodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}")
if verbose:
import traceback
traceback.print_exc()
else:
# Single task mode
print(f"\n📝 Task: {task}")
print("-" * 40)
try:
response = agent.run_conversation(task)
print("\n" + "=" * 60)
print("✅ Task completed")
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n❌ Error: {e}")
if verbose:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
fire.Fire(main)

View file

@ -267,7 +267,7 @@ def run_compression(input_dir: Path, output_dir: Path, config_path: str):
# Import the compressor
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from trajectory_compressor import TrajectoryCompressor, CompressionConfig
from scripts.trajectory_compressor import TrajectoryCompressor, CompressionConfig
print(f"\n🗜️ Running trajectory compression...")
print(f" Input: {input_dir}")

File diff suppressed because it is too large Load diff