hermes-agent/environments/endless_terminals/endless_terminals_env.py
2026-03-03 14:42:45 -05:00

921 lines
36 KiB
Python

"""
Endless Terminals Environment for Hermes-Agent + Atropos RL.
Loads pre-generated terminal tasks from HuggingFace dataset and scores
agent performance using test execution in the agent's sandbox.
Uses hermes-agent backends (modal, docker, local) with per-task Docker images
extracted from container.def files. Tests run in the same sandbox the agent
used, following the Terminal Bench 2 pattern.
Dataset: https://huggingface.co/datasets/obiwan96/endless-terminals-train
Run:
python environments/endless_terminals/endless_terminals_env.py process \
--config environments/endless_terminals/default.yaml
"""
import asyncio
import logging
import os
import random
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from pydantic import Field
# Ensure hermes-agent root is on path
_repo_root = Path(__file__).resolve().parent.parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from atroposlib.envs.base import ScoredDataGroup, ScoredDataItem
from atroposlib.type_definitions import Item
from environments.hermes_base_env import HermesAgentBaseEnv, HermesAgentEnvConfig
from environments.agent_loop import AgentResult
from environments.tool_context import ToolContext
from tools.terminal_tool import (
register_task_env_overrides,
clear_task_env_overrides,
cleanup_vm,
)
logger = logging.getLogger(__name__)
# Add endless-terminals to path for imports
ENDLESS_TERMINALS_PATH = os.getenv(
"ENDLESS_TERMINALS_PATH",
str(Path.home() / "Desktop" / "Projects" / "endless-terminals")
)
sys.path.insert(0, ENDLESS_TERMINALS_PATH)
class EndlessTerminalsEnvConfig(HermesAgentEnvConfig):
"""Configuration for Endless Terminals environment."""
# Dataset settings
use_dataset: bool = Field(
default=True,
description="Load tasks from HuggingFace dataset (recommended). If False, generate procedurally."
)
dataset_name: str = Field(
default="obiwan96/endless-terminals-train",
description="HuggingFace dataset name"
)
dataset_split: str = Field(
default="train",
description="Dataset split to use"
)
dataset_cache_dir: str = Field(
default="~/.cache/huggingface/datasets",
description="HuggingFace datasets cache directory"
)
tasks_base_dir: str = Field(
default="",
description="Base directory containing task_* folders. If empty, uses paths from dataset."
)
# Test execution
test_timeout_s: int = Field(default=60, description="Test execution timeout (seconds)")
# Docker image fallback
default_docker_image: str = Field(
default="ubuntu:22.04",
description="Default Docker image if container.def parsing fails"
)
# Agent defaults
max_agent_turns: int = Field(default=32, description="Max turns for agent (increased for long traces)")
# Evaluation settings
num_eval_tasks: int = Field(
default=10,
description="Number of tasks to run during periodic evaluation"
)
eval_split_ratio: float = Field(
default=0.1,
description="Fraction of dataset to hold out for evaluation (0.0-1.0)"
)
class EndlessTerminalsEnv(HermesAgentBaseEnv):
"""
Endless Terminals environment using pre-generated HuggingFace dataset.
Loads terminal tasks from dataset, runs agent with terminal tools,
and scores by executing tests in the agent's sandbox using ToolContext.
"""
name = "endless_terminals_env"
env_config_cls = EndlessTerminalsEnvConfig
@classmethod
def config_init(cls) -> Tuple[EndlessTerminalsEnvConfig, List["APIServerConfig"]]:
"""
Default configuration for Endless Terminals environment.
This is used when no config file is provided, but note that when using
--config, the YAML is loaded differently and this may not be called.
"""
from atroposlib.envs.server_handling.server_manager import APIServerConfig
env_config = EndlessTerminalsEnvConfig(
enabled_toolsets=["terminal", "file"],
max_agent_turns=32,
terminal_backend="local",
use_dataset=True,
tasks_base_dir="",
group_size=1,
total_steps=1,
use_wandb=False,
)
server_configs = [
APIServerConfig(
base_url="https://openrouter.ai/api/v1",
model_name="anthropic/claude-sonnet-4.5",
server_type="openai",
api_key=os.getenv("OPENROUTER_API_KEY", ""),
health_check=False,
)
]
return env_config, server_configs
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._dataset = None
self._train_dataset = None
self._eval_dataset = None
self._dataset_indices = []
self._current_index = 0
# Metrics tracking for wandb - single buffer with dicts
self._metrics_buffer = []
# Debug: check server config
if hasattr(self, 'server') and hasattr(self.server, 'servers'):
for i, srv in enumerate(self.server.servers):
logger.debug(f"Server {i}: model_name={getattr(srv.config, 'model_name', 'NONE')}")
async def setup(self):
"""Load dataset from HuggingFace or local directory."""
if not self.config.use_dataset:
logger.info("Using procedural task generation (not implemented yet)")
return
# If tasks_base_dir is set, load from local directory instead of HuggingFace
if self.config.tasks_base_dir:
tasks_base = Path(os.path.expanduser(self.config.tasks_base_dir))
# Resolve to absolute path if relative
if not tasks_base.is_absolute():
tasks_base = Path.cwd() / tasks_base
tasks_base = tasks_base.resolve()
if not tasks_base.exists():
raise RuntimeError(f"tasks_base_dir not found: {tasks_base}")
logger.info(f"Loading tasks from local directory: {tasks_base}")
# Find all task_* directories
task_dirs = sorted(tasks_base.glob("task_*"))
logger.info(f"Found {len(task_dirs)} task directories")
if not task_dirs:
# Debug: show what's actually in the directory
all_items = list(tasks_base.iterdir())
logger.warning(f"Directory contains {len(all_items)} items:")
for item in all_items[:10]:
logger.warning(f" - {item.name} ({'dir' if item.is_dir() else 'file'})")
raise RuntimeError(f"No task_* directories found in {tasks_base}")
# Create fake dataset items (just the directory paths)
self._dataset = [
{
"description": f"Task from {task_dir.name}",
"extra_info": {"task_dir": str(task_dir)},
}
for task_dir in task_dirs
]
logger.info(f"Loaded {len(self._dataset)} tasks from local directory")
self._split_dataset()
return
# Otherwise, load from HuggingFace
logger.info(f"Loading dataset from HuggingFace: {self.config.dataset_name}")
try:
from datasets import load_dataset
self._dataset = await asyncio.get_event_loop().run_in_executor(
None,
lambda: load_dataset(
self.config.dataset_name,
split=self.config.dataset_split,
cache_dir=os.path.expanduser(self.config.dataset_cache_dir)
)
)
logger.info(f"Loaded {len(self._dataset)} tasks from HuggingFace")
self._split_dataset()
except Exception as e:
logger.error(f"ERROR loading dataset: {e}")
raise
def _split_dataset(self):
"""Split dataset into train and eval sets based on eval_split_ratio."""
if self._dataset is None or len(self._dataset) == 0:
raise RuntimeError("Cannot split empty dataset")
total_size = len(self._dataset)
eval_size = int(total_size * self.config.eval_split_ratio)
train_size = total_size - eval_size
all_indices = list(range(total_size))
random.shuffle(all_indices)
train_indices = all_indices[:train_size]
eval_indices = all_indices[train_size:]
if isinstance(self._dataset, list):
self._train_dataset = [self._dataset[i] for i in train_indices]
self._eval_dataset = [self._dataset[i] for i in eval_indices]
else:
self._train_dataset = self._dataset.select(train_indices)
self._eval_dataset = self._dataset.select(eval_indices)
self._dataset_indices = list(range(len(self._train_dataset)))
random.shuffle(self._dataset_indices)
self._current_index = 0
logger.info(
f"Split dataset: {len(self._train_dataset)} train, "
f"{len(self._eval_dataset)} eval "
f"(ratio={self.config.eval_split_ratio:.1%})"
)
async def get_next_item(self) -> Item:
"""Sample next task from training dataset."""
if self._train_dataset is None:
raise RuntimeError("Dataset not loaded. Call setup() first.")
# Get next task (with wraparound)
idx = self._dataset_indices[self._current_index]
task = self._train_dataset[idx]
# Advance to next task
self._current_index += 1
if self._current_index >= len(self._dataset_indices):
# Reshuffle for next epoch
random.shuffle(self._dataset_indices)
self._current_index = 0
logger.info("Reshuffled dataset (completed one epoch)")
# Extract task directory path
task_dir = task.get("extra_info", {}).get("task_dir")
if not task_dir:
task_dir = task.get("reward_spec", {}).get("ground_truth")
# Resolve task directory path
if task_dir:
task_dir_path = Path(task_dir)
# If tasks_base_dir is configured and path doesn't exist, reconstruct it
if self.config.tasks_base_dir and not task_dir_path.exists():
original_path = Path(task_dir)
task_name = original_path.name
task_dir_path = Path(os.path.expanduser(self.config.tasks_base_dir)) / task_name
else:
logger.error("No task directory path found in dataset item")
return await self.get_next_item()
# Verify directory exists
if not task_dir_path.exists():
logger.warning(f"Task dir not found: {task_dir_path}")
logger.warning("Hint: Set tasks_base_dir to directory containing task_* folders")
return await self.get_next_item() # Try next task
# Look for test file in tests/ subdirectory first, then at root
final_test = task_dir_path / "tests" / "test_final_state.py"
if not final_test.exists():
final_test = task_dir_path / "test_final_state.py"
# Verify test file exists
if not final_test.exists():
logger.warning(f"Missing test file in {task_dir_path} (checked tests/ and root)")
return await self.get_next_item()
# Parse container.def to extract Docker image
# Check environment/ subdirectory first, then root
container_def = task_dir_path / "environment" / "container.def"
if not container_def.exists():
container_def = task_dir_path / "container.def"
docker_image = self._parse_docker_image_from_def(container_def)
# Try to load description from instruction.md or task.json
description = task.get("description", "")
# First try instruction.md
instruction_md = task_dir_path / "instruction.md"
if not description and instruction_md.exists():
try:
description = instruction_md.read_text().strip()
except Exception as e:
logger.warning(f"Failed to load instruction.md for {task_dir_path.name}: {e}")
# Fallback to task.json in environment/
if not description:
task_json = task_dir_path / "environment" / "task.json"
if task_json.exists():
try:
import json
task_data = json.loads(task_json.read_text())
description = task_data.get("description", "") or task_data.get("instruction", "")
except Exception as e:
logger.warning(f"Failed to load task.json for {task_dir_path.name}: {e}")
if not description:
description = f"Complete the task in {task_dir_path.name}"
return {
"task_id": f"{task_dir_path.name}",
"task_name": task_dir_path.name,
"description": description,
"task_dir": str(task_dir_path),
"final_test": str(final_test),
"docker_image": docker_image,
"dataset_index": idx,
}
def format_prompt(self, item: Item) -> str:
"""Return the task description for the agent."""
return str(item.get("description", ""))
def _parse_docker_image_from_def(self, container_def_path: Path) -> str:
"""
Parse container.def file to extract the Docker base image.
Apptainer definition files typically look like:
Bootstrap: docker
From: ubuntu:22.04
Returns the image from the "From:" line, or falls back to default.
"""
if not container_def_path.exists():
logger.warning(f"container.def not found at {container_def_path}, using default image")
return self.config.default_docker_image
try:
content = container_def_path.read_text()
# Look for "From: <image>" line (case-insensitive)
match = re.search(r'^From:\s*(.+)$', content, re.MULTILINE | re.IGNORECASE)
if match:
image = match.group(1).strip()
logger.info(f"Extracted Docker image from container.def: {image}")
return image
except Exception as e:
logger.warning(f"Failed to parse {container_def_path}: {e}")
logger.warning(f"Could not extract image from {container_def_path}, using default")
return self.config.default_docker_image
async def collect_trajectory(
self, item: Item
) -> Tuple[Optional[ScoredDataItem], List[Item]]:
"""
Override to register per-task Docker image before running the agent.
Follows Terminal Bench 2 pattern: register_task_env_overrides() tells
the hermes-agent terminal backend to use a specific Docker image for
this task_id.
This is a copy of HermesAgentBaseEnv.collect_trajectory with Docker
image registration added after task_id generation.
"""
import uuid
from environments.agent_loop import HermesAgentLoop
task_id = str(uuid.uuid4())
task_name = item.get("task_name", "unknown")
docker_image = item.get("docker_image", self.config.default_docker_image)
logger.debug(f"collect_trajectory START for {task_name}")
# Register Docker image override for this task_id
logger.debug(f"Registering Docker image: {docker_image}")
register_task_env_overrides(task_id, {"modal_image": docker_image})
logger.info(
f"Task {task_name}: registered Docker image {docker_image} for task_id {task_id[:8]}"
)
logger.debug("Docker image registered")
try:
# Get group-level tools (resolved once in collect_trajectories)
logger.debug("Resolving tools...")
if self._current_group_tools is None:
tools, valid_names = self._resolve_tools_for_group()
else:
tools, valid_names = self._current_group_tools
logger.debug(f"Tools resolved: {len(tools)} tools")
# Build initial messages
logger.debug("Building initial messages...")
messages: List[Dict[str, Any]] = []
if self.config.system_prompt:
messages.append({"role": "system", "content": self.config.system_prompt})
messages.append({"role": "user", "content": self.format_prompt(item)})
logger.debug("Messages built, starting agent loop...")
# Run the agent loop
result: AgentResult
managed_state: Optional[Dict[str, Any]] = None
if self._use_managed_server():
# Phase 2: ManagedServer with parser
from environments.tool_call_parsers import get_parser
try:
tc_parser = get_parser(self.config.tool_call_parser)
except KeyError:
logger.warning(
"Tool call parser '%s' not found, falling back to 'hermes'",
self.config.tool_call_parser,
)
tc_parser = get_parser("hermes")
try:
async with self.server.managed_server(
tokenizer=self.tokenizer,
tool_call_parser=tc_parser,
) as managed:
agent = HermesAgentLoop(
server=managed,
tool_schemas=tools,
valid_tool_names=valid_names,
max_turns=self.config.max_agent_turns,
task_id=task_id,
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
)
result = await agent.run(messages)
# Get state directly from managed server while still in context
managed_state = managed.get_state()
except NotImplementedError:
# DummyManagedServer not allowed
logger.warning("ManagedServer not available. Falling back to direct server mode.")
agent = HermesAgentLoop(
server=self.server,
tool_schemas=tools,
valid_tool_names=valid_names,
max_turns=self.config.max_agent_turns,
task_id=task_id,
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
)
result = await agent.run(messages)
else:
# Phase 1: OpenAI server
agent = HermesAgentLoop(
server=self.server,
tool_schemas=tools,
valid_tool_names=valid_names,
max_turns=self.config.max_agent_turns,
task_id=task_id,
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
)
result = await agent.run(messages)
# Skip reward computation if agent produced no output
only_system_and_user = all(
msg.get("role") in ("system", "user") for msg in result.messages
)
if result.turns_used == 0 or only_system_and_user:
logger.warning(
"Agent loop produced no output (turns=%d). Skipping trajectory.",
result.turns_used,
)
# Return None to skip this trajectory (likely an API failure)
return None, []
else:
# Compute reward using ToolContext
ctx = ToolContext(task_id)
try:
reward = await self.compute_reward(item, result, ctx)
except Exception as e:
logger.error("compute_reward failed: %s", e)
reward = 0.0
finally:
ctx.cleanup()
# Track metrics for wandb logging
task_metrics = {
"test_passed": 1.0 if reward > 0.5 else 0.0,
"reward": reward,
"turns_used": result.turns_used,
"finished_naturally": result.finished_naturally,
"docker_image": docker_image,
"num_tool_errors": len(result.tool_errors),
}
# Include detailed tool errors if any occurred
if result.tool_errors:
task_metrics["tool_errors"] = [
{
"turn": err.turn,
"tool": err.tool_name,
"error": err.error[:200],
}
for err in result.tool_errors
]
self._metrics_buffer.append(task_metrics)
# ============================================================================
# Build ScoredDataGroup from ManagedServer state
# ============================================================================
# Phase 2: Extract pre-computed data from SequenceNodes
# We may have multiple trajectories in the nodes due to how interesting
# agents can be, so iterate through all nodes and return multiple sequences.
#
# Each SequenceNode contains:
# - tokens: Full unmasked token sequence [1, 2, 3, ..., N]
# - masked_tokens: Training format [-100, -100, ..., -100, actual, actual, ...]
# - logprobs: Training format [1.0, 1.0, ..., 1.0, -0.5, -0.3, ...]
# - full_text: Complete text (prompt + all completions)
#
# Phase 1: Create placeholder tokens for OpenAI-style servers
# ============================================================================
nodes = (managed_state or {}).get("nodes", []) if managed_state else []
# Create ScoredDataGroup with lists for multiple trajectories
scored_group = ScoredDataGroup()
scored_group["tokens"] = []
scored_group["masks"] = []
scored_group["scores"] = []
scored_group["messages"] = []
scored_group["inference_logprobs"] = []
if nodes:
# Phase 2: iterate through all nodes (may have multiple trajectories)
for i, node in enumerate(nodes):
scored_group["tokens"].append(node.tokens)
scored_group["masks"].append(node.masked_tokens)
scored_group["scores"].append(reward)
scored_group["messages"].append(result.messages)
if hasattr(node, "logprobs") and node.logprobs:
scored_group["inference_logprobs"].append(node.logprobs)
else:
# Placeholder logprobs if not available
scored_group["inference_logprobs"].append([1.0] * len(node.tokens))
logger.debug(f"Added trajectory {i+1}/{len(nodes)} with {len(node.tokens)} tokens")
else:
# Phase 1: create placeholder tokens for OpenAI-style servers
full_text = "\n".join(
msg.get("content", "") for msg in result.messages if msg.get("content")
)
if self.tokenizer:
tokens = self.tokenizer.encode(full_text, add_special_tokens=True)
else:
tokens = list(range(min(len(full_text) // 4, 128)))
scored_group["tokens"].append(tokens)
scored_group["masks"].append([-100] + tokens[1:])
scored_group["scores"].append(reward)
scored_group["messages"].append(result.messages)
scored_group["inference_logprobs"].append([1.0] * len(tokens))
# Return None if no trajectories collected
if len(scored_group["tokens"]) == 0:
return None, []
logger.debug(f"Returning ScoredDataGroup with {len(scored_group['tokens'])} trajectories")
return scored_group, []
finally:
# Clean up task overrides and sandbox
clear_task_env_overrides(task_id)
try:
cleanup_vm(task_id)
except Exception as e:
logger.debug(f"VM cleanup for {task_id[:8]}: {e}")
async def compute_reward(
self,
item: Item,
result: AgentResult,
ctx: ToolContext
) -> float:
"""
Run final tests in the agent's sandbox and return binary reward.
Uses ToolContext to execute pytest in the SAME sandbox the agent used,
following the Terminal Bench 2 verification pattern. No separate
Apptainer execution needed.
Returns 1.0 if tests pass, 0.0 otherwise.
"""
task_name = item.get("task_name", "unknown")
final_test_path = Path(item.get("final_test", ""))
if not final_test_path.exists():
logger.error(f"Task {task_name}: test file not found at {final_test_path}")
return 0.0
logger.info(f"Task {task_name}: running tests in sandbox...")
try:
# Run tests in a thread to avoid blocking the event loop
loop = asyncio.get_event_loop()
reward = await loop.run_in_executor(
None,
self._run_tests_in_sandbox,
final_test_path,
ctx,
task_name,
)
status = "PASS" if reward == 1.0 else "FAIL"
logger.info(f"Task {task_name}: {status} (reward={reward})")
return reward
except Exception as e:
logger.error(f"Task {task_name}: test execution failed: {e}", exc_info=True)
return 0.0
def _run_tests_in_sandbox(
self,
test_file_path: Path,
ctx: ToolContext,
task_name: str,
) -> float:
"""
Upload test file to sandbox and execute pytest.
Runs in thread pool (via run_in_executor) to avoid blocking the event loop
with synchronous ToolContext calls.
Args:
test_file_path: Local path to test_final_state.py
ctx: ToolContext scoped to the agent's sandbox
task_name: For logging
Returns:
1.0 if tests pass, 0.0 otherwise
"""
try:
# Upload test file to sandbox
test_content = test_file_path.read_text()
ctx.write_file("/workspace/test_final_state.py", test_content)
logger.debug(f"Task {task_name}: uploaded test file to /workspace/test_final_state.py")
# Run pytest in the sandbox
result = ctx.terminal(
"cd /workspace && python -m pytest -q test_final_state.py",
timeout=self.config.test_timeout_s,
)
exit_code = result.get("exit_code", -1)
output = result.get("output", "")
if exit_code == 0:
logger.debug(f"Task {task_name}: tests passed")
return 1.0
else:
# Log failure output (last 500 chars for debugging)
output_preview = output[-500:] if output else "(no output)"
logger.info(
f"Task {task_name}: tests failed (exit_code={exit_code})\n{output_preview}"
)
return 0.0
except Exception as e:
logger.error(f"Task {task_name}: error running tests: {e}")
return 0.0
async def evaluate(self):
"""
Periodic evaluation on holdout eval set.
Runs the agent on num_eval_tasks from the held-out eval set
(never seen during training). Returns metrics for wandb logging.
"""
if self._eval_dataset is None:
logger.warning("Cannot evaluate: eval dataset not loaded")
return {}
if len(self._eval_dataset) == 0:
logger.warning("Eval dataset is empty")
return {}
# Use min of num_eval_tasks and actual eval set size
num_tasks = min(self.config.num_eval_tasks, len(self._eval_dataset))
logger.info(f"Starting evaluation on {num_tasks} held-out tasks...")
eval_metrics = {
"rewards": [],
"passes": [],
"turns": [],
"natural_finishes": [],
}
# Sample from eval set (holdout)
import random
eval_indices = random.sample(range(len(self._eval_dataset)), num_tasks)
for idx in eval_indices:
task = self._eval_dataset[idx]
# Build item using same logic as get_next_item
task_dir = task.get("extra_info", {}).get("task_dir")
if not task_dir:
task_dir = task.get("reward_spec", {}).get("ground_truth")
if not task_dir:
continue
task_dir_path = Path(task_dir)
if self.config.tasks_base_dir and not task_dir_path.exists():
original_path = Path(task_dir)
task_name = original_path.name
task_dir_path = Path(os.path.expanduser(self.config.tasks_base_dir)) / task_name
if not task_dir_path.exists():
continue
# Find test file
final_test = task_dir_path / "tests" / "test_final_state.py"
if not final_test.exists():
final_test = task_dir_path / "test_final_state.py"
if not final_test.exists():
continue
# Parse Docker image
container_def = task_dir_path / "environment" / "container.def"
if not container_def.exists():
container_def = task_dir_path / "container.def"
docker_image = self._parse_docker_image_from_def(container_def)
# Load description
description = task.get("description", "")
instruction_md = task_dir_path / "instruction.md"
if not description and instruction_md.exists():
try:
description = instruction_md.read_text().strip()
except Exception:
pass
item = {
"description": description,
"final_test": str(final_test),
"docker_image": docker_image,
}
# Run agent on this task
try:
import uuid
task_id = str(uuid.uuid4())
# Register task environment
from model_tools import register_task_env_overrides
register_task_env_overrides(task_id, {"modal_image": docker_image})
# Build messages
messages = [
{"role": "system", "content": self.config.system_prompt},
{"role": "user", "content": description or "Complete the task."},
]
# Get tools
from model_tools import get_tool_definitions
tools = get_tool_definitions(self.config.enabled_toolsets)
valid_names = {t["function"]["name"] for t in tools}
# Run agent
from environments.agent_loop import HermesAgentLoop
agent = HermesAgentLoop(
server=self.server,
tool_schemas=tools,
valid_tool_names=valid_names,
max_turns=self.config.max_agent_turns,
task_id=task_id,
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
)
result = await agent.run(messages)
# Compute reward
from environments.tool_context import ToolContext
ctx = ToolContext(task_id)
try:
reward = await self.compute_reward(item, result, ctx)
except Exception as e:
logger.warning(f"Eval reward computation failed: {e}")
reward = 0.0
finally:
ctx.cleanup()
# Track metrics
eval_metrics["rewards"].append(reward)
eval_metrics["passes"].append(1.0 if reward > 0.5 else 0.0)
eval_metrics["turns"].append(result.turns_used)
eval_metrics["natural_finishes"].append(1.0 if result.finished_naturally else 0.0)
except Exception as e:
logger.error(f"Eval task failed: {e}")
continue
finally:
# Cleanup
from model_tools import clear_task_env_overrides, cleanup_vm
clear_task_env_overrides(task_id)
cleanup_vm(task_id)
# Aggregate metrics
if not eval_metrics["rewards"]:
logger.warning("No eval tasks completed successfully")
return {}
aggregated = {
"eval/pass_rate": sum(eval_metrics["passes"]) / len(eval_metrics["passes"]),
"eval/avg_reward": sum(eval_metrics["rewards"]) / len(eval_metrics["rewards"]),
"eval/avg_turns": sum(eval_metrics["turns"]) / len(eval_metrics["turns"]),
"eval/natural_finish_rate": sum(eval_metrics["natural_finishes"]) / len(eval_metrics["natural_finishes"]),
"eval/num_tasks": len(eval_metrics["rewards"]),
}
logger.info(f"Evaluation complete: pass_rate={aggregated['eval/pass_rate']:.2%}, avg_turns={aggregated['eval/avg_turns']:.1f}")
return aggregated
async def wandb_log(self, wandb_metrics: Optional[Dict] = None):
"""Log Endless Terminals specific metrics to wandb."""
if wandb_metrics is None:
wandb_metrics = {}
# Aggregate metrics from buffer
if self._metrics_buffer:
# Test pass rate
test_passes = [m["test_passed"] for m in self._metrics_buffer]
wandb_metrics["endless_terminals/test_pass_rate"] = sum(test_passes) / len(test_passes)
wandb_metrics["endless_terminals/num_tests_passed"] = sum(test_passes)
wandb_metrics["endless_terminals/num_tests_total"] = len(test_passes)
# Turns used statistics
turns = [m["turns_used"] for m in self._metrics_buffer]
wandb_metrics["endless_terminals/avg_turns_used"] = sum(turns) / len(turns)
wandb_metrics["endless_terminals/max_turns_used"] = max(turns)
wandb_metrics["endless_terminals/min_turns_used"] = min(turns)
# Natural finish rate (did model stop on its own vs hitting max turns)
natural_finishes = [1.0 if m["finished_naturally"] else 0.0 for m in self._metrics_buffer]
wandb_metrics["endless_terminals/natural_finish_rate"] = sum(natural_finishes) / len(natural_finishes)
# Tool error statistics
total_tool_errors = sum(m["num_tool_errors"] for m in self._metrics_buffer)
wandb_metrics["endless_terminals/total_tool_errors"] = total_tool_errors
wandb_metrics["endless_terminals/avg_tool_errors_per_task"] = total_tool_errors / len(self._metrics_buffer)
# Docker image distribution (count unique images used)
docker_images = [m["docker_image"] for m in self._metrics_buffer]
unique_images = set(docker_images)
wandb_metrics["endless_terminals/num_unique_docker_images"] = len(unique_images)
# Log most common errors if any
all_errors = []
for m in self._metrics_buffer:
if "tool_errors" in m:
all_errors.extend(m["tool_errors"])
if all_errors:
# Count error types
error_tools = {}
for err in all_errors:
tool = err["tool"]
error_tools[tool] = error_tools.get(tool, 0) + 1
# Log top 3 error-prone tools
for i, (tool, count) in enumerate(sorted(error_tools.items(), key=lambda x: x[1], reverse=True)[:3]):
wandb_metrics[f"endless_terminals/errors_by_tool/{tool}"] = count
# Clear buffer after logging
self._metrics_buffer = []
await super().wandb_log(wandb_metrics)
if __name__ == "__main__":
EndlessTerminalsEnv.cli()