Fix Web Tools, Upgrade MoA to GPT5, Add Trajectory Saving

This commit is contained in:
Teknium 2025-08-31 03:04:10 -07:00
parent 4ece87efb0
commit 587d1cf720
5 changed files with 1090 additions and 131 deletions

View file

@ -65,7 +65,7 @@ nous_client = AsyncOpenAI(
REFERENCE_MODELS = [
"claude-opus-4-20250514",
"gemini-2.5-pro",
"o4-mini",
"gpt-5",
"deepseek-r1"
]
@ -164,7 +164,7 @@ async def _run_reference_model_safe(
model: str,
user_prompt: str,
temperature: float = REFERENCE_TEMPERATURE,
max_tokens: int = 128000,
max_tokens: int = 32000,
max_retries: int = 3
) -> tuple[str, str, bool]:
"""
@ -184,12 +184,18 @@ async def _run_reference_model_safe(
try:
print(f"🤖 Querying {model} (attempt {attempt + 1}/{max_retries})")
response = await nous_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": user_prompt}],
temperature=temperature,
max_tokens=max_tokens
)
# Build parameters for the API call
api_params = {
"model": model,
"messages": [{"role": "user", "content": user_prompt}]
}
# GPT models (especially gpt-4o-mini) don't support custom temperature values
# Only include temperature for non-GPT models
if not model.lower().startswith('gpt-'):
api_params["temperature"] = temperature
response = await nous_client.chat.completions.create(**api_params)
content = response.choices[0].message.content.strip()
print(f"{model} responded ({len(content)} characters)")
@ -220,7 +226,7 @@ async def _run_aggregator_model(
system_prompt: str,
user_prompt: str,
temperature: float = AGGREGATOR_TEMPERATURE,
max_tokens: int = 16000
max_tokens: int = None
) -> str:
"""
Run the aggregator model to synthesize the final response.
@ -236,15 +242,21 @@ async def _run_aggregator_model(
"""
print(f"🧠 Running aggregator model: {AGGREGATOR_MODEL}")
response = await nous_client.chat.completions.create(
model=AGGREGATOR_MODEL,
messages=[
# Build parameters for the API call
api_params = {
"model": AGGREGATOR_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=temperature,
max_tokens=max_tokens
)
]
}
# GPT models (especially gpt-4o-mini) don't support custom temperature values
# Only include temperature for non-GPT models
if not AGGREGATOR_MODEL.lower().startswith('gpt-'):
api_params["temperature"] = temperature
response = await nous_client.chat.completions.create(**api_params)
content = response.choices[0].message.content.strip()
print(f"✅ Aggregation complete ({len(content)} characters)")

View file

@ -42,7 +42,7 @@ def get_web_tool_definitions() -> List[Dict[str, Any]]:
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for information on any topic. Returns relevant results with titles, URLs, content snippets, and answers. Uses advanced search depth for comprehensive results.",
"description": "Search the web for information on any topic. Returns relevant results with titles and URLs. Uses advanced search depth for comprehensive results.",
"parameters": {
"type": "object",
"properties": {

View file

@ -26,6 +26,7 @@ import time
from typing import List, Dict, Any, Optional
from openai import OpenAI
import fire
from datetime import datetime
# Import our tool system
from model_tools import get_tool_definitions, handle_function_call, check_toolset_requirements
@ -49,7 +50,8 @@ class AIAgent:
enabled_tools: List[str] = None,
disabled_tools: List[str] = None,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None
disabled_toolsets: List[str] = None,
save_trajectories: bool = False
):
"""
Initialize the AI Agent.
@ -64,10 +66,12 @@ class AIAgent:
disabled_tools (List[str]): Disable these specific tools (optional)
enabled_toolsets (List[str]): Only enable tools from these toolsets (optional)
disabled_toolsets (List[str]): Disable tools from these toolsets (optional)
save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False)
"""
self.model = model
self.max_iterations = max_iterations
self.tool_delay = tool_delay
self.save_trajectories = save_trajectories
# Store tool filtering options
self.enabled_tools = enabled_tools
@ -123,31 +127,184 @@ class AIAgent:
missing_reqs = [name for name, available in requirements.items() if not available]
if missing_reqs:
print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}")
# Show trajectory saving status
if self.save_trajectories:
print("📝 Trajectory saving enabled")
def create_system_message(self, custom_system: str = None) -> str:
def _format_tools_for_system_message(self) -> str:
"""
Create the system message for the agent.
Format tool definitions for the system message in the trajectory format.
Returns:
str: JSON string representation of tool definitions
"""
if not self.tools:
return "[]"
# Convert tool definitions to the format expected in trajectories
formatted_tools = []
for tool in self.tools:
func = tool["function"]
formatted_tool = {
"name": func["name"],
"description": func.get("description", ""),
"parameters": func.get("parameters", {}),
"required": None # Match the format in the example
}
formatted_tools.append(formatted_tool)
return json.dumps(formatted_tools)
def _convert_to_trajectory_format(self, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]:
"""
Convert internal message format to trajectory format for saving.
Args:
custom_system (str): Custom system message (optional)
messages (List[Dict]): Internal message history
user_query (str): Original user query
completed (bool): Whether the conversation completed successfully
Returns:
str: System message content
List[Dict]: Messages in trajectory format
"""
if custom_system:
return custom_system
trajectory = []
return (
"You are an AI assistant that provides helpful responses. You may use extremely long chains of thought "
"to deeply consider the problem and deliberate with yourself via systematic reasoning processes to help "
"come to a correct solution prior to answering. You should enclose your thoughts and internal monologue "
"inside <thinking> tags.\n\n"
"You are equipped with web research tools that allow you to search the web, extract content from web pages, "
"and crawl websites. Use these tools to gather current information and provide accurate, well-researched responses. "
"You can call multiple tools in parallel if they are not reliant on each other's results. You can also use "
"sequential tool calls to build on data you've collected from previous tool calls. Continue using tools until "
"you feel confident you have enough information to provide a comprehensive answer."
# Add system message with tool definitions
system_msg = (
"You are a function calling AI model. You are provided with function signatures within <tools> </tools> XML tags. "
"You may call one or more functions to assist with the user query. If available tools are not relevant in assisting "
"with user query, just respond in natural conversational language. Don't make assumptions about what values to plug "
"into functions. After calling & executing the functions, you will be provided with function results within "
"<tool_response> </tool_response> XML tags. Here are the available tools:\n"
f"<tools>\n{self._format_tools_for_system_message()}\n</tools>\n"
"For each function call return a JSON object, with the following pydantic model json schema for each:\n"
"{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, "
"'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n"
"Each function call should be enclosed within <tool_call> </tool_call> XML tags.\n"
"Example:\n<tool_call>\n{'name': <function-name>,'arguments': <args-dict>}\n</tool_call>"
)
trajectory.append({
"from": "system",
"value": system_msg
})
# Add the initial user message
trajectory.append({
"from": "human",
"value": user_query
})
# Process remaining messages
i = 1 # Skip the first user message as we already added it
while i < len(messages):
msg = messages[i]
if msg["role"] == "assistant":
# Check if this message has tool calls
if "tool_calls" in msg and msg["tool_calls"]:
# Format assistant message with tool calls
content = ""
if msg.get("content") and msg["content"].strip():
content = msg["content"] + "\n"
# Add tool calls wrapped in XML tags
for tool_call in msg["tool_calls"]:
tool_call_json = {
"name": tool_call["function"]["name"],
"arguments": json.loads(tool_call["function"]["arguments"]) if isinstance(tool_call["function"]["arguments"], str) else tool_call["function"]["arguments"]
}
content += f"<tool_call>\n{json.dumps(tool_call_json)}\n</tool_call>\n"
trajectory.append({
"from": "gpt",
"value": content.rstrip()
})
# Collect all subsequent tool responses
tool_responses = []
j = i + 1
while j < len(messages) and messages[j]["role"] == "tool":
tool_msg = messages[j]
# Format tool response with XML tags
tool_response = f"<tool_response>\n"
# Try to parse tool content as JSON if it looks like JSON
tool_content = tool_msg["content"]
try:
if tool_content.strip().startswith(("{", "[")):
tool_content = json.loads(tool_content)
except (json.JSONDecodeError, AttributeError):
pass # Keep as string if not valid JSON
tool_response += json.dumps({
"tool_call_id": tool_msg.get("tool_call_id", ""),
"name": msg["tool_calls"][len(tool_responses)]["function"]["name"] if len(tool_responses) < len(msg["tool_calls"]) else "unknown",
"content": tool_content
})
tool_response += "\n</tool_response>"
tool_responses.append(tool_response)
j += 1
# Add all tool responses as a single message
if tool_responses:
trajectory.append({
"from": "tool",
"value": "\n".join(tool_responses)
})
i = j - 1 # Skip the tool messages we just processed
else:
# Regular assistant message without tool calls
trajectory.append({
"from": "gpt",
"value": msg["content"] or ""
})
elif msg["role"] == "user":
trajectory.append({
"from": "human",
"value": msg["content"]
})
i += 1
return trajectory
def _save_trajectory(self, messages: List[Dict[str, Any]], user_query: str, completed: bool):
"""
Save conversation trajectory to JSONL file.
Args:
messages (List[Dict]): Complete message history
user_query (str): Original user query
completed (bool): Whether the conversation completed successfully
"""
if not self.save_trajectories:
return
# Convert messages to trajectory format
trajectory = self._convert_to_trajectory_format(messages, user_query, completed)
# Determine which file to save to
filename = "trajectory_samples.jsonl" if completed else "failed_trajectories.jsonl"
# Create trajectory entry
entry = {
"conversations": trajectory,
"timestamp": datetime.now().isoformat(),
"model": self.model,
"completed": completed
}
# Append to JSONL file
try:
with open(filename, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"💾 Trajectory saved to {filename}")
except Exception as e:
print(f"⚠️ Failed to save trajectory: {e}")
def run_conversation(
self,
@ -169,13 +326,6 @@ class AIAgent:
# Initialize conversation
messages = conversation_history or []
# Add system message if not already present
if not messages or messages[0]["role"] != "system":
messages.insert(0, {
"role": "system",
"content": self.create_system_message(system_message)
})
# Add user message
messages.append({
"role": "user",
@ -292,11 +442,17 @@ class AIAgent:
if final_response is None:
final_response = "I've reached the maximum number of iterations. Here's what I found so far."
# Determine if conversation completed successfully
completed = final_response is not None and api_call_count < self.max_iterations
# Save trajectory if enabled
self._save_trajectory(messages, user_message, completed)
return {
"final_response": final_response,
"messages": messages,
"api_calls": api_call_count,
"completed": final_response is not None
"completed": completed
}
def chat(self, message: str) -> str:
@ -323,7 +479,8 @@ def main(
disabled_tools: str = None,
enabled_toolsets: str = None,
disabled_toolsets: str = None,
list_tools: bool = False
list_tools: bool = False,
save_trajectories: bool = False
):
"""
Main function for running the agent directly.
@ -339,6 +496,7 @@ def main(
enabled_toolsets (str): Comma-separated list of toolsets to enable (e.g., "web_tools")
disabled_toolsets (str): Comma-separated list of toolsets to disable (e.g., "terminal_tools")
list_tools (bool): Just list available tools and exit
save_trajectories (bool): Save conversation trajectories to JSONL files. Defaults to False.
"""
print("🤖 AI Agent with Tool Calling")
print("=" * 50)
@ -373,6 +531,8 @@ def main(
print(f" python run_agent.py --enabled_tools=web_search,web_extract --query='research topic'")
print(f" # Run without terminal tools")
print(f" python run_agent.py --disabled_tools=terminal --query='web research only'")
print(f" # Run with trajectory saving enabled")
print(f" python run_agent.py --save_trajectories --query='your question here'")
return
# Parse tool selection arguments
@ -397,6 +557,11 @@ def main(
disabled_toolsets_list = [t.strip() for t in disabled_toolsets.split(",")]
print(f"🚫 Disabled toolsets: {disabled_toolsets_list}")
if save_trajectories:
print(f"💾 Trajectory saving: ENABLED")
print(f" - Successful conversations → trajectory_samples.jsonl")
print(f" - Failed conversations → failed_trajectories.jsonl")
# Initialize agent with provided parameters
try:
agent = AIAgent(
@ -407,7 +572,8 @@ def main(
enabled_tools=enabled_tools_list,
disabled_tools=disabled_tools_list,
enabled_toolsets=enabled_toolsets_list,
disabled_toolsets=disabled_toolsets_list
disabled_toolsets=disabled_toolsets_list,
save_trajectories=save_trajectories
)
except RuntimeError as e:
print(f"❌ Failed to initialize agent: {e}")

620
test_web_tools.py Normal file
View file

@ -0,0 +1,620 @@
#!/usr/bin/env python3
"""
Comprehensive Test Suite for Web Tools Module
This script tests all web tools functionality to ensure they work correctly.
Run this after any updates to the web_tools.py module or Firecrawl library.
Usage:
python test_web_tools.py # Run all tests
python test_web_tools.py --no-llm # Skip LLM processing tests
python test_web_tools.py --verbose # Show detailed output
Requirements:
- FIRECRAWL_API_KEY environment variable must be set
- NOUS_API_KEY environment vitinariable (optional, for LLM tests)
"""
import json
import asyncio
import sys
import os
import argparse
from datetime import datetime
from typing import List, Dict, Any
# Import the web tools to test
from web_tools import (
web_search_tool,
web_extract_tool,
web_crawl_tool,
check_firecrawl_api_key,
check_nous_api_key,
get_debug_session_info
)
class Colors:
"""ANSI color codes for terminal output"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def print_header(text: str):
"""Print a formatted header"""
print(f"\n{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{text}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}")
def print_section(text: str):
"""Print a formatted section header"""
print(f"\n{Colors.CYAN}{Colors.BOLD}📌 {text}{Colors.ENDC}")
print(f"{Colors.CYAN}{'-'*50}{Colors.ENDC}")
def print_success(text: str):
"""Print success message"""
print(f"{Colors.GREEN}{text}{Colors.ENDC}")
def print_error(text: str):
"""Print error message"""
print(f"{Colors.FAIL}{text}{Colors.ENDC}")
def print_warning(text: str):
"""Print warning message"""
print(f"{Colors.WARNING}⚠️ {text}{Colors.ENDC}")
def print_info(text: str, indent: int = 0):
"""Print info message"""
indent_str = " " * indent
print(f"{indent_str}{Colors.BLUE} {text}{Colors.ENDC}")
class WebToolsTester:
"""Test suite for web tools"""
def __init__(self, verbose: bool = False, test_llm: bool = True):
self.verbose = verbose
self.test_llm = test_llm
self.test_results = {
"passed": [],
"failed": [],
"skipped": []
}
self.start_time = None
self.end_time = None
def log_result(self, test_name: str, status: str, details: str = ""):
"""Log test result"""
result = {
"test": test_name,
"status": status,
"details": details,
"timestamp": datetime.now().isoformat()
}
if status == "passed":
self.test_results["passed"].append(result)
print_success(f"{test_name}: {details}" if details else test_name)
elif status == "failed":
self.test_results["failed"].append(result)
print_error(f"{test_name}: {details}" if details else test_name)
elif status == "skipped":
self.test_results["skipped"].append(result)
print_warning(f"{test_name} skipped: {details}" if details else f"{test_name} skipped")
def test_environment(self) -> bool:
"""Test environment setup and API keys"""
print_section("Environment Check")
# Check Firecrawl API key
if not check_firecrawl_api_key():
self.log_result("Firecrawl API Key", "failed", "FIRECRAWL_API_KEY not set")
return False
else:
self.log_result("Firecrawl API Key", "passed", "Found")
# Check Nous API key (optional)
if not check_nous_api_key():
self.log_result("Nous API Key", "skipped", "NOUS_API_KEY not set (LLM tests will be skipped)")
self.test_llm = False
else:
self.log_result("Nous API Key", "passed", "Found")
# Check debug mode
debug_info = get_debug_session_info()
if debug_info["enabled"]:
print_info(f"Debug mode enabled - Session: {debug_info['session_id']}")
print_info(f"Debug log: {debug_info['log_path']}")
return True
def test_web_search(self) -> List[str]:
"""Test web search functionality"""
print_section("Test 1: Web Search")
test_queries = [
("Python web scraping tutorial", 5),
("Firecrawl API documentation", 3),
("inflammatory arthritis symptoms treatment", 8) # Test medical query from your example
]
extracted_urls = []
for query, limit in test_queries:
try:
print(f"\n Testing search: '{query}' (limit={limit})")
if self.verbose:
print(f" Calling web_search_tool(query='{query}', limit={limit})")
# Perform search
result = web_search_tool(query, limit)
# Parse result
try:
data = json.loads(result)
except json.JSONDecodeError as e:
self.log_result(f"Search: {query[:30]}...", "failed", f"Invalid JSON: {e}")
if self.verbose:
print(f" Raw response (first 500 chars): {result[:500]}...")
continue
if "error" in data:
self.log_result(f"Search: {query[:30]}...", "failed", f"API error: {data['error']}")
continue
# Check structure
if "success" not in data or "data" not in data:
self.log_result(f"Search: {query[:30]}...", "failed", "Missing success or data fields")
if self.verbose:
print(f" Response keys: {list(data.keys())}")
continue
web_results = data.get("data", {}).get("web", [])
if not web_results:
self.log_result(f"Search: {query[:30]}...", "failed", "Empty web results array")
if self.verbose:
print(f" data.web content: {data.get('data', {}).get('web')}")
continue
# Validate each result
valid_results = 0
missing_fields = []
for i, result in enumerate(web_results):
required_fields = ["url", "title", "description"]
has_all_fields = all(key in result for key in required_fields)
if has_all_fields:
valid_results += 1
# Collect URLs for extraction test
if len(extracted_urls) < 3:
extracted_urls.append(result["url"])
if self.verbose:
print(f" Result {i+1}: ✓ {result['title'][:50]}...")
print(f" URL: {result['url'][:60]}...")
else:
missing = [f for f in required_fields if f not in result]
missing_fields.append(f"Result {i+1} missing: {missing}")
if self.verbose:
print(f" Result {i+1}: ✗ Missing fields: {missing}")
# Log results
if valid_results == len(web_results):
self.log_result(
f"Search: {query[:30]}...",
"passed",
f"All {valid_results} results valid"
)
else:
self.log_result(
f"Search: {query[:30]}...",
"failed",
f"Only {valid_results}/{len(web_results)} valid. Issues: {'; '.join(missing_fields[:3])}"
)
except Exception as e:
self.log_result(f"Search: {query[:30]}...", "failed", f"Exception: {type(e).__name__}: {str(e)}")
if self.verbose:
import traceback
print(f" Traceback: {traceback.format_exc()}")
if self.verbose and extracted_urls:
print(f"\n URLs collected for extraction test: {len(extracted_urls)}")
for url in extracted_urls:
print(f" - {url}")
return extracted_urls
async def test_web_extract(self, urls: List[str] = None):
"""Test web content extraction"""
print_section("Test 2: Web Extract (without LLM)")
# Use provided URLs or defaults
if not urls:
urls = [
"https://docs.firecrawl.dev/introduction",
"https://www.python.org/about/"
]
print(f" Using default URLs for testing")
else:
print(f" Using {len(urls)} URLs from search results")
# Test extraction
if urls:
try:
test_urls = urls[:2] # Test with max 2 URLs
print(f"\n Extracting content from {len(test_urls)} URL(s)...")
for url in test_urls:
print(f" - {url}")
if self.verbose:
print(f" Calling web_extract_tool(urls={test_urls}, format='markdown', use_llm_processing=False)")
result = await web_extract_tool(
test_urls,
format="markdown",
use_llm_processing=False
)
# Parse result
try:
data = json.loads(result)
except json.JSONDecodeError as e:
self.log_result("Extract (no LLM)", "failed", f"Invalid JSON: {e}")
if self.verbose:
print(f" Raw response (first 500 chars): {result[:500]}...")
return
if "error" in data:
self.log_result("Extract (no LLM)", "failed", f"API error: {data['error']}")
return
results = data.get("results", [])
if not results:
self.log_result("Extract (no LLM)", "failed", "No results in response")
if self.verbose:
print(f" Response keys: {list(data.keys())}")
return
# Validate each result
valid_results = 0
failed_results = 0
total_content_length = 0
extraction_details = []
for i, result in enumerate(results):
title = result.get("title", "No title")
content = result.get("content", "")
error = result.get("error")
if error:
failed_results += 1
extraction_details.append(f"Page {i+1}: ERROR - {error}")
if self.verbose:
print(f" Page {i+1}: ✗ Error - {error}")
elif content:
content_len = len(content)
total_content_length += content_len
valid_results += 1
extraction_details.append(f"Page {i+1}: {title[:40]}... ({content_len} chars)")
if self.verbose:
print(f" Page {i+1}: ✓ {title[:50]}... - {content_len} characters")
print(f" First 100 chars: {content[:100]}...")
else:
extraction_details.append(f"Page {i+1}: {title[:40]}... (EMPTY)")
if self.verbose:
print(f" Page {i+1}: ⚠ {title[:50]}... - Empty content")
# Log results
if valid_results > 0:
self.log_result(
"Extract (no LLM)",
"passed",
f"{valid_results}/{len(results)} pages extracted, {total_content_length} total chars"
)
else:
self.log_result(
"Extract (no LLM)",
"failed",
f"No valid content. {failed_results} errors, {len(results) - failed_results} empty"
)
if self.verbose:
print(f"\n Extraction details:")
for detail in extraction_details:
print(f" {detail}")
except Exception as e:
self.log_result("Extract (no LLM)", "failed", f"Exception: {type(e).__name__}: {str(e)}")
if self.verbose:
import traceback
print(f" Traceback: {traceback.format_exc()}")
async def test_web_extract_with_llm(self, urls: List[str] = None):
"""Test web extraction with LLM processing"""
print_section("Test 3: Web Extract (with Gemini LLM)")
if not self.test_llm:
self.log_result("Extract (with LLM)", "skipped", "LLM testing disabled")
return
# Use a URL likely to have substantial content
test_url = urls[0] if urls else "https://docs.firecrawl.dev/features/scrape"
try:
print(f"\n Extracting and processing: {test_url}")
result = await web_extract_tool(
[test_url],
format="markdown",
use_llm_processing=True,
min_length=1000 # Lower threshold for testing
)
data = json.loads(result)
if "error" in data:
self.log_result("Extract (with LLM)", "failed", data["error"])
return
results = data.get("results", [])
if not results:
self.log_result("Extract (with LLM)", "failed", "No results returned")
return
result = results[0]
content = result.get("content", "")
if content:
content_len = len(content)
# Check if content was actually processed (should be shorter than typical raw content)
if content_len > 0:
self.log_result(
"Extract (with LLM)",
"passed",
f"Content processed: {content_len} chars"
)
if self.verbose:
print(f"\n First 300 chars of processed content:")
print(f" {content[:300]}...")
else:
self.log_result("Extract (with LLM)", "failed", "No content after processing")
else:
self.log_result("Extract (with LLM)", "failed", "No content field in result")
except json.JSONDecodeError as e:
self.log_result("Extract (with LLM)", "failed", f"Invalid JSON: {e}")
except Exception as e:
self.log_result("Extract (with LLM)", "failed", str(e))
async def test_web_crawl(self):
"""Test web crawling functionality"""
print_section("Test 4: Web Crawl")
test_sites = [
("https://docs.firecrawl.dev", None, 2), # Test docs site
("https://firecrawl.dev", None, 3), # Test main site
]
for url, instructions, expected_min_pages in test_sites:
try:
print(f"\n Testing crawl of: {url}")
if instructions:
print(f" Instructions: {instructions}")
else:
print(f" No instructions (general crawl)")
print(f" Expected minimum pages: {expected_min_pages}")
# Show what's being called
if self.verbose:
print(f" Calling web_crawl_tool(url='{url}', instructions={instructions}, use_llm_processing=False)")
result = await web_crawl_tool(
url,
instructions=instructions,
use_llm_processing=False # Disable LLM for faster testing
)
# Check if result is valid JSON
try:
data = json.loads(result)
except json.JSONDecodeError as e:
self.log_result(f"Crawl: {url}", "failed", f"Invalid JSON response: {e}")
if self.verbose:
print(f" Raw response (first 500 chars): {result[:500]}...")
continue
# Check for errors
if "error" in data:
self.log_result(f"Crawl: {url}", "failed", f"API error: {data['error']}")
continue
# Get results
results = data.get("results", [])
if not results:
self.log_result(f"Crawl: {url}", "failed", "No pages in results array")
if self.verbose:
print(f" Full response: {json.dumps(data, indent=2)[:1000]}...")
continue
# Analyze pages
valid_pages = 0
empty_pages = 0
total_content = 0
page_details = []
for i, page in enumerate(results):
content = page.get("content", "")
title = page.get("title", "Untitled")
error = page.get("error")
if error:
page_details.append(f"Page {i+1}: ERROR - {error}")
elif content:
valid_pages += 1
content_len = len(content)
total_content += content_len
page_details.append(f"Page {i+1}: {title[:40]}... ({content_len} chars)")
else:
empty_pages += 1
page_details.append(f"Page {i+1}: {title[:40]}... (EMPTY)")
# Show detailed results if verbose
if self.verbose:
print(f"\n Crawl Results:")
print(f" Total pages returned: {len(results)}")
print(f" Valid pages (with content): {valid_pages}")
print(f" Empty pages: {empty_pages}")
print(f" Total content size: {total_content} characters")
print(f"\n Page Details:")
for detail in page_details[:10]: # Show first 10 pages
print(f" - {detail}")
if len(page_details) > 10:
print(f" ... and {len(page_details) - 10} more pages")
# Determine pass/fail
if valid_pages >= expected_min_pages:
self.log_result(
f"Crawl: {url}",
"passed",
f"{valid_pages}/{len(results)} valid pages, {total_content} chars total"
)
else:
self.log_result(
f"Crawl: {url}",
"failed",
f"Only {valid_pages} valid pages (expected >= {expected_min_pages}), {empty_pages} empty, {len(results)} total"
)
except Exception as e:
self.log_result(f"Crawl: {url}", "failed", f"Exception: {type(e).__name__}: {str(e)}")
if self.verbose:
import traceback
print(f" Traceback:")
print(" " + "\n ".join(traceback.format_exc().split("\n")))
async def run_all_tests(self):
"""Run all tests"""
self.start_time = datetime.now()
print_header("WEB TOOLS TEST SUITE")
print(f"Started at: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
# Test environment
if not self.test_environment():
print_error("\nCannot proceed without required API keys!")
return False
# Test search and collect URLs
urls = self.test_web_search()
# Test extraction
await self.test_web_extract(urls if urls else None)
# Test extraction with LLM
if self.test_llm:
await self.test_web_extract_with_llm(urls if urls else None)
# Test crawling
await self.test_web_crawl()
# Print summary
self.end_time = datetime.now()
duration = (self.end_time - self.start_time).total_seconds()
print_header("TEST SUMMARY")
print(f"Duration: {duration:.2f} seconds")
print(f"\n{Colors.GREEN}Passed: {len(self.test_results['passed'])}{Colors.ENDC}")
print(f"{Colors.FAIL}Failed: {len(self.test_results['failed'])}{Colors.ENDC}")
print(f"{Colors.WARNING}Skipped: {len(self.test_results['skipped'])}{Colors.ENDC}")
# List failed tests
if self.test_results["failed"]:
print(f"\n{Colors.FAIL}{Colors.BOLD}Failed Tests:{Colors.ENDC}")
for test in self.test_results["failed"]:
print(f" - {test['test']}: {test['details']}")
# Save results to file
self.save_results()
return len(self.test_results["failed"]) == 0
def save_results(self):
"""Save test results to a JSON file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"test_results_web_tools_{timestamp}.json"
results = {
"test_suite": "Web Tools",
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_seconds": (self.end_time - self.start_time).total_seconds() if self.start_time and self.end_time else None,
"summary": {
"passed": len(self.test_results["passed"]),
"failed": len(self.test_results["failed"]),
"skipped": len(self.test_results["skipped"])
},
"results": self.test_results,
"environment": {
"firecrawl_api_key": check_firecrawl_api_key(),
"nous_api_key": check_nous_api_key(),
"debug_mode": get_debug_session_info()["enabled"]
}
}
try:
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
print_info(f"Test results saved to: {filename}")
except Exception as e:
print_warning(f"Failed to save results: {e}")
async def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Test Web Tools Module")
parser.add_argument("--no-llm", action="store_true", help="Skip LLM processing tests")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")
parser.add_argument("--debug", action="store_true", help="Enable debug mode for web tools")
args = parser.parse_args()
# Set debug mode if requested
if args.debug:
os.environ["WEB_TOOLS_DEBUG"] = "true"
print_info("Debug mode enabled for web tools")
# Create tester
tester = WebToolsTester(
verbose=args.verbose,
test_llm=not args.no_llm
)
# Run tests
success = await tester.run_all_tests()
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -48,11 +48,11 @@ import uuid
import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from firecrawl import FirecrawlApp, ScrapeOptions
from firecrawl import Firecrawl
from openai import AsyncOpenAI
# Initialize Firecrawl client once at module level
firecrawl_app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
firecrawl_client = Firecrawl(api_key=os.getenv("FIRECRAWL_API_KEY"))
# Initialize Nous Research API client for LLM processing (async)
nous_client = AsyncOpenAI(
@ -251,7 +251,8 @@ def web_search_tool(query: str, limit: int = 5) -> str:
This function provides a generic interface for web search that can work
with multiple backends. Currently uses Firecrawl.
Note: Search results are already concise snippets, so no LLM processing is applied.
Note: This function returns search result metadata only (URLs, titles, descriptions).
Use web_extract_tool to get full content from specific URLs.
Args:
query (str): The search query to look up
@ -260,16 +261,18 @@ def web_search_tool(query: str, limit: int = 5) -> str:
Returns:
str: JSON string containing search results with the following structure:
{
"query": str,
"results": [
{
"title": str,
"url": str,
"content": str,
"score": float
},
...
]
"success": bool,
"data": {
"web": [
{
"title": str,
"url": str,
"description": str,
"position": int
},
...
]
}
}
Raises:
@ -289,46 +292,67 @@ def web_search_tool(query: str, limit: int = 5) -> str:
try:
print(f"🔍 Searching the web for: '{query}' (limit: {limit})")
# Use Firecrawl's search functionality
# Firecrawl Search: search the web and get full content from results
# Docs: https://docs.firecrawl.dev/introduction
# Note: Firecrawl SDK supports search via app.search(query, limit=...)
response = firecrawl_app.search(query=query, limit=limit)
# Use Firecrawl's v2 search functionality WITHOUT scraping
# We only want search result metadata, not scraped content
# Docs: https://docs.firecrawl.dev/features/search
response = firecrawl_client.search(
query=query,
limit=limit
)
# Determine results count and trim to minimal structure: { success, data: [{markdown}] }
results_list = []
success_flag = True
if isinstance(response, dict):
success_flag = bool(response.get("success", True))
if "data" in response and isinstance(response["data"], list):
results_list = response["data"]
elif "results" in response and isinstance(response["results"], list):
results_list = response["results"]
results_count = len(results_list)
print(f"✅ Found {results_count} results")
# The response is a SearchData object with web, news, and images attributes
# When not scraping, the results are directly in these attributes
web_results = []
# Check if response has web attribute (SearchData object)
if hasattr(response, 'web'):
# Response is a SearchData object with web attribute
if response.web:
# Convert each SearchResultWeb object to dict
for result in response.web:
if hasattr(result, 'model_dump'):
# Pydantic model - use model_dump
web_results.append(result.model_dump())
elif hasattr(result, '__dict__'):
# Regular object - use __dict__
web_results.append(result.__dict__)
elif isinstance(result, dict):
# Already a dict
web_results.append(result)
elif hasattr(response, 'model_dump'):
# Response has model_dump method - use it to get dict
response_dict = response.model_dump()
if 'web' in response_dict and response_dict['web']:
web_results = response_dict['web']
elif isinstance(response, dict):
# Response is already a dictionary
if 'web' in response and response['web']:
web_results = response['web']
results_count = len(web_results)
print(f"✅ Found {results_count} search results")
# Build response with just search metadata (URLs, titles, descriptions)
response_data = {
"success": True,
"data": {
"web": web_results
}
}
# Capture debug information
debug_call_data["results_count"] = results_count
debug_call_data["original_response_size"] = len(json.dumps(response))
# Build minimal response
minimal_data = []
for item in results_list:
if isinstance(item, dict) and ("markdown" in item):
minimal_data.append({"markdown": item.get("markdown", "")})
minimal_response = {"success": success_flag, "data": minimal_data}
# Convert to JSON
result_json = json.dumps(response_data, indent=2)
result_json = json.dumps(minimal_response, indent=2)
cleaned_result = clean_base64_images(result_json)
debug_call_data["final_response_size"] = len(cleaned_result)
debug_call_data["compression_applied"] = "base64_image_removal"
debug_call_data["final_response_size"] = len(result_json)
# Log debug information
_log_debug_call("web_search_tool", debug_call_data)
_save_debug_log()
return cleaned_result
return result_json
except Exception as e:
error_msg = f"Error searching web: {str(e)}"
@ -388,40 +412,87 @@ async def web_extract_tool(
try:
print(f"📄 Extracting content from {len(urls)} URL(s)")
# Use Firecrawl's scrape functionality per URL and normalize to a common shape
# Determine requested formats for Firecrawl v2
formats: List[str] = []
if format == "markdown":
formats = ["markdown"]
elif format == "html":
formats = ["html"]
else:
# Default: request markdown for LLM-readiness and include html as backup
formats = ["markdown", "html"]
# Always use individual scraping for simplicity and reliability
# Batch scraping adds complexity without much benefit for small numbers of URLs
results: List[Dict[str, Any]] = []
for url in urls:
try:
# Determine requested formats for Firecrawl
formats: List[str] = []
if format == "markdown":
formats = ["markdown"]
elif format == "html":
formats = ["html"]
else:
# Default: request markdown for LLM-readiness and include html as backup
formats = ["markdown", "html"]
scrape_result = firecrawl_app.scrape_url(url, formats=formats)
# Firecrawl returns {success, data: {markdown?, html?, metadata}}
data = scrape_result.get("data", {}) if isinstance(scrape_result, dict) else {}
metadata = data.get("metadata", {})
print(f" 📄 Scraping: {url}")
scrape_result = firecrawl_client.scrape(
url=url,
formats=formats
)
# Process the result - properly handle object serialization
metadata = {}
title = ""
content_markdown = None
content_html = None
# Extract data from the scrape result
if hasattr(scrape_result, 'model_dump'):
# Pydantic model - use model_dump to get dict
result_dict = scrape_result.model_dump()
content_markdown = result_dict.get('markdown')
content_html = result_dict.get('html')
metadata = result_dict.get('metadata', {})
elif hasattr(scrape_result, '__dict__'):
# Regular object with attributes
content_markdown = getattr(scrape_result, 'markdown', None)
content_html = getattr(scrape_result, 'html', None)
# Handle metadata - convert to dict if it's an object
metadata_obj = getattr(scrape_result, 'metadata', {})
if hasattr(metadata_obj, 'model_dump'):
metadata = metadata_obj.model_dump()
elif hasattr(metadata_obj, '__dict__'):
metadata = metadata_obj.__dict__
elif isinstance(metadata_obj, dict):
metadata = metadata_obj
else:
metadata = {}
elif isinstance(scrape_result, dict):
# Already a dictionary
content_markdown = scrape_result.get('markdown')
content_html = scrape_result.get('html')
metadata = scrape_result.get('metadata', {})
# Ensure metadata is a dict (not an object)
if not isinstance(metadata, dict):
if hasattr(metadata, 'model_dump'):
metadata = metadata.model_dump()
elif hasattr(metadata, '__dict__'):
metadata = metadata.__dict__
else:
metadata = {}
# Get title from metadata
title = metadata.get("title", "")
content_markdown = data.get("markdown")
content_html = data.get("html")
# Choose content based on requested format
chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or ""
results.append({
"url": metadata.get("sourceURL", url),
"title": title,
"content": chosen_content,
"raw_content": chosen_content,
"metadata": metadata
"metadata": metadata # Now guaranteed to be a dict
})
except Exception as scrape_err:
print(f" ❌ Error scraping {url}: {str(scrape_err)}")
results.append({
"url": url,
"title": "",
@ -582,36 +653,126 @@ async def web_crawl_tool(
}
try:
# Ensure URL has protocol
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
print(f" 📝 Added https:// prefix to URL: {url}")
instructions_text = f" with instructions: '{instructions}'" if instructions else ""
print(f"🕷️ Crawling {url}{instructions_text}")
# Use Firecrawl's crawl functionality and normalize to a common shape
# Firecrawl SDK returns the crawl results directly for synchronous crawl
scrape_options = ScrapeOptions(formats=["markdown", "html"])
crawl_result = firecrawl_app.crawl_url(
url,
limit=20,
scrape_options=scrape_options,
)
# Use Firecrawl's v2 crawl functionality
# Docs: https://docs.firecrawl.dev/features/crawl
# The crawl() method automatically waits for completion and returns all data
# Build crawl parameters - keep it simple
crawl_params = {
"limit": 20, # Limit number of pages to crawl
"scrape_options": {
"formats": ["markdown"] # Just markdown for simplicity
}
}
# Note: The 'prompt' parameter is not documented for crawl
# Instructions are typically used with the Extract endpoint, not Crawl
if instructions:
print(f" Note: Instructions parameter ignored (not supported in crawl API)")
# Use the crawl method which waits for completion automatically
try:
crawl_result = firecrawl_client.crawl(
url=url,
**crawl_params
)
except Exception as e:
print(f" ❌ Crawl API call failed: {e}")
raise
pages: List[Dict[str, Any]] = []
if isinstance(crawl_result, dict):
# Firecrawl returns {success, data: [ {markdown?, html?, metadata} ]}
# Process crawl results - the crawl method returns a CrawlJob object with data attribute
data_list = []
# The crawl_result is a CrawlJob object with a 'data' attribute containing list of Document objects
if hasattr(crawl_result, 'data'):
data_list = crawl_result.data if crawl_result.data else []
print(f" 📊 Status: {getattr(crawl_result, 'status', 'unknown')}")
print(f" 📄 Retrieved {len(data_list)} pages")
# Debug: Check other attributes if no data
if not data_list:
print(f" 🔍 Debug - CrawlJob attributes: {[attr for attr in dir(crawl_result) if not attr.startswith('_')]}")
print(f" 🔍 Debug - Status: {getattr(crawl_result, 'status', 'N/A')}")
print(f" 🔍 Debug - Total: {getattr(crawl_result, 'total', 'N/A')}")
print(f" 🔍 Debug - Completed: {getattr(crawl_result, 'completed', 'N/A')}")
elif isinstance(crawl_result, dict) and 'data' in crawl_result:
data_list = crawl_result.get("data", [])
for item in data_list:
metadata = item.get("metadata", {}) if isinstance(item, dict) else {}
page_url = metadata.get("sourceURL", "Unknown URL")
title = metadata.get("title", "")
content_markdown = item.get("markdown") if isinstance(item, dict) else None
content_html = item.get("html") if isinstance(item, dict) else None
content = content_markdown or content_html or ""
pages.append({
"url": page_url,
"title": title,
"content": content,
"raw_content": content,
"metadata": metadata
})
else:
print(" ⚠️ Unexpected crawl result type")
print(f" 🔍 Debug - Result type: {type(crawl_result)}")
if hasattr(crawl_result, '__dict__'):
print(f" 🔍 Debug - Result attributes: {list(crawl_result.__dict__.keys())}")
for item in data_list:
# Process each crawled page - properly handle object serialization
page_url = "Unknown URL"
title = ""
content_markdown = None
content_html = None
metadata = {}
# Extract data from the item
if hasattr(item, 'model_dump'):
# Pydantic model - use model_dump to get dict
item_dict = item.model_dump()
content_markdown = item_dict.get('markdown')
content_html = item_dict.get('html')
metadata = item_dict.get('metadata', {})
elif hasattr(item, '__dict__'):
# Regular object with attributes
content_markdown = getattr(item, 'markdown', None)
content_html = getattr(item, 'html', None)
# Handle metadata - convert to dict if it's an object
metadata_obj = getattr(item, 'metadata', {})
if hasattr(metadata_obj, 'model_dump'):
metadata = metadata_obj.model_dump()
elif hasattr(metadata_obj, '__dict__'):
metadata = metadata_obj.__dict__
elif isinstance(metadata_obj, dict):
metadata = metadata_obj
else:
metadata = {}
elif isinstance(item, dict):
# Already a dictionary
content_markdown = item.get('markdown')
content_html = item.get('html')
metadata = item.get('metadata', {})
# Ensure metadata is a dict (not an object)
if not isinstance(metadata, dict):
if hasattr(metadata, 'model_dump'):
metadata = metadata.model_dump()
elif hasattr(metadata, '__dict__'):
metadata = metadata.__dict__
else:
metadata = {}
# Extract URL and title from metadata
page_url = metadata.get("sourceURL", metadata.get("url", "Unknown URL"))
title = metadata.get("title", "")
# Choose content (prefer markdown)
content = content_markdown or content_html or ""
pages.append({
"url": page_url,
"title": title,
"content": content,
"raw_content": content,
"metadata": metadata # Now guaranteed to be a dict
})
response = {"results": pages}