Updating path vars and dataset loading

This commit is contained in:
Sam Herring 2026-02-24 18:58:19 -08:00
parent f1c2f8a414
commit b93ad43191

View file

@ -91,12 +91,12 @@ class EndlessTerminalsEnvConfig(HermesAgentEnvConfig):
max_agent_turns: int = Field(default=32, description="Max turns for agent (increased for long traces)")
class EndlessTerminalsEnv(HermesAgentBaseEnv[EndlessTerminalsEnvConfig]):
class EndlessTerminalsEnv(HermesAgentBaseEnv):
"""
Endless Terminals environment using pre-generated HuggingFace dataset.
Loads terminal tasks from dataset, runs agent with terminal tools,
and scores by executing tests in Apptainer containers.
and scores by executing tests in the agent's sandbox using ToolContext.
"""
name = "endless_terminals_env"
@ -109,12 +109,57 @@ class EndlessTerminalsEnv(HermesAgentBaseEnv[EndlessTerminalsEnvConfig]):
self._current_index = 0
async def setup(self):
"""Load HuggingFace dataset."""
"""Load dataset from HuggingFace or local directory."""
if not self.config.use_dataset:
print("[EndlessTerminalsEnv] Using procedural task generation (not implemented yet)", flush=True)
return
print(f"[EndlessTerminalsEnv] Loading dataset: {self.config.dataset_name}", flush=True)
# If tasks_base_dir is set, load from local directory instead of HuggingFace
if self.config.tasks_base_dir:
tasks_base = Path(os.path.expanduser(self.config.tasks_base_dir))
# Resolve to absolute path if relative
if not tasks_base.is_absolute():
tasks_base = Path.cwd() / tasks_base
tasks_base = tasks_base.resolve()
if not tasks_base.exists():
raise RuntimeError(f"tasks_base_dir not found: {tasks_base}")
print(f"[EndlessTerminalsEnv] Loading tasks from local directory: {tasks_base}", flush=True)
# Find all task_* directories
task_dirs = sorted(tasks_base.glob("task_*"))
print(f"[EndlessTerminalsEnv] Found {len(task_dirs)} task directories", flush=True)
if not task_dirs:
# Debug: show what's actually in the directory
all_items = list(tasks_base.iterdir())
print(f"[EndlessTerminalsEnv] Directory contains {len(all_items)} items:", flush=True)
for item in all_items[:10]:
print(f" - {item.name} ({'dir' if item.is_dir() else 'file'})", flush=True)
raise RuntimeError(f"No task_* directories found in {tasks_base}")
# Create fake dataset items (just the directory paths)
self._dataset = [
{
"description": f"Task from {task_dir.name}",
"extra_info": {"task_dir": str(task_dir)},
}
for task_dir in task_dirs
]
# Create shuffled indices
self._dataset_indices = list(range(len(self._dataset)))
random.shuffle(self._dataset_indices)
self._current_index = 0
print(f"[EndlessTerminalsEnv] Loaded {len(self._dataset)} tasks from local directory", flush=True)
return
# Otherwise, load from HuggingFace
print(f"[EndlessTerminalsEnv] Loading dataset from HuggingFace: {self.config.dataset_name}", flush=True)
try:
from datasets import load_dataset
@ -133,7 +178,7 @@ class EndlessTerminalsEnv(HermesAgentBaseEnv[EndlessTerminalsEnvConfig]):
random.shuffle(self._dataset_indices)
self._current_index = 0
print(f"[EndlessTerminalsEnv] Loaded {len(self._dataset)} tasks from dataset", flush=True)
print(f"[EndlessTerminalsEnv] Loaded {len(self._dataset)} tasks from HuggingFace", flush=True)
except Exception as e:
print(f"[EndlessTerminalsEnv] ERROR loading dataset: {e}", flush=True)
@ -161,13 +206,17 @@ class EndlessTerminalsEnv(HermesAgentBaseEnv[EndlessTerminalsEnvConfig]):
if not task_dir:
task_dir = task.get("reward_spec", {}).get("ground_truth")
# If tasks_base_dir is configured, reconstruct path
if self.config.tasks_base_dir:
original_path = Path(task_dir)
task_name = original_path.name
task_dir_path = Path(self.config.tasks_base_dir) / task_name
else:
# Resolve task directory path
if task_dir:
task_dir_path = Path(task_dir)
# If tasks_base_dir is configured and path doesn't exist, reconstruct it
if self.config.tasks_base_dir and not task_dir_path.exists():
original_path = Path(task_dir)
task_name = original_path.name
task_dir_path = Path(os.path.expanduser(self.config.tasks_base_dir)) / task_name
else:
logger.error("No task directory path found in dataset item")
return await self.get_next_item()
# Verify directory exists
if not task_dir_path.exists():
@ -175,22 +224,52 @@ class EndlessTerminalsEnv(HermesAgentBaseEnv[EndlessTerminalsEnvConfig]):
print(f"[EndlessTerminalsEnv] Hint: Set tasks_base_dir to directory containing task_* folders", flush=True)
return await self.get_next_item() # Try next task
container_sif = task_dir_path / "container.sif"
final_test = task_dir_path / "test_final_state.py"
# Look for test file in tests/ subdirectory first, then at root
final_test = task_dir_path / "tests" / "test_final_state.py"
if not final_test.exists():
final_test = task_dir_path / "test_final_state.py"
# Verify test file exists
if not final_test.exists():
print(f"[EndlessTerminalsEnv] WARNING: Missing test file in {task_dir_path}", flush=True)
logger.warning(f"Missing test file in {task_dir_path} (checked tests/ and root)")
return await self.get_next_item()
# Parse container.def to extract Docker image
container_def = task_dir_path / "container.def"
# Check environment/ subdirectory first, then root
container_def = task_dir_path / "environment" / "container.def"
if not container_def.exists():
container_def = task_dir_path / "container.def"
docker_image = self._parse_docker_image_from_def(container_def)
# Try to load description from instruction.md or task.json
description = task.get("description", "")
# First try instruction.md
instruction_md = task_dir_path / "instruction.md"
if not description and instruction_md.exists():
try:
description = instruction_md.read_text().strip()
except Exception as e:
logger.warning(f"Failed to load instruction.md for {task_dir_path.name}: {e}")
# Fallback to task.json in environment/
if not description:
task_json = task_dir_path / "environment" / "task.json"
if task_json.exists():
try:
import json
task_data = json.loads(task_json.read_text())
description = task_data.get("description", "") or task_data.get("instruction", "")
except Exception as e:
logger.warning(f"Failed to load task.json for {task_dir_path.name}: {e}")
if not description:
description = f"Complete the task in {task_dir_path.name}"
return {
"task_id": f"{task_dir_path.name}",
"task_name": task_dir_path.name,
"description": task.get("description", ""),
"description": description,
"task_dir": str(task_dir_path),
"final_test": str(final_test),
"docker_image": docker_image,
@ -249,24 +328,32 @@ class EndlessTerminalsEnv(HermesAgentBaseEnv[EndlessTerminalsEnvConfig]):
task_name = item.get("task_name", "unknown")
docker_image = item.get("docker_image", self.config.default_docker_image)
print(f"[DEBUG] collect_trajectory START for {task_name}", flush=True)
# Register Docker image override for this task_id
print(f"[DEBUG] Registering Docker image: {docker_image}", flush=True)
register_task_env_overrides(task_id, {"modal_image": docker_image})
logger.info(
f"Task {task_name}: registered Docker image {docker_image} for task_id {task_id[:8]}"
)
print(f"[DEBUG] Docker image registered", flush=True)
try:
# Get group-level tools (resolved once in collect_trajectories)
print(f"[DEBUG] Resolving tools...", flush=True)
if self._current_group_tools is None:
tools, valid_names = self._resolve_tools_for_group()
else:
tools, valid_names = self._current_group_tools
print(f"[DEBUG] Tools resolved: {len(tools)} tools", flush=True)
# Build initial messages
print(f"[DEBUG] Building initial messages...", flush=True)
messages: List[Dict[str, Any]] = []
if self.config.system_prompt:
messages.append({"role": "system", "content": self.config.system_prompt})
messages.append({"role": "user", "content": self.format_prompt(item)})
print(f"[DEBUG] Messages built, starting agent loop...", flush=True)
# Run the agent loop
result: AgentResult