diff --git a/README.md b/README.md index 541a01a245..632f59180a 100644 --- a/README.md +++ b/README.md @@ -1,134 +1,295 @@ # Hermes Agent -An AI agent with advanced tool-calling capabilities, featuring a flexible toolsets system for organizing and managing tools. +AI Agent with advanced tool calling capabilities, real-time logging, and extensible toolsets. ## Features -- **Web Tools**: Search, extract content, and crawl websites -- **Terminal Tools**: Execute commands with interactive session support -- **Vision Tools**: Analyze images from URLs -- **Reasoning Tools**: Advanced multi-model reasoning (Mixture of Agents) -- **Creative Tools**: Generate images from text prompts -- **Toolsets System**: Organize tools into logical groups for different scenarios +- ๐Ÿค– **Multi-model Support**: Works with Claude, GPT-4, and other OpenAI-compatible models +- ๐Ÿ”ง **Rich Tool Library**: Web search, content extraction, vision analysis, terminal execution, and more +- ๐Ÿ“Š **Real-time Logging**: WebSocket-based logging system for monitoring agent execution +- ๐Ÿ–ฅ๏ธ **Desktop UI**: Modern PySide6 frontend with real-time event streaming +- ๐ŸŽฏ **Flexible Toolsets**: Predefined toolset combinations for different use cases +- ๐Ÿ’พ **Trajectory Saving**: Save conversation flows for training and analysis +- ๐Ÿ”„ **Auto-retry**: Built-in error handling and retry logic + +## Quick Start + +### Installation -## Setup ```bash pip install -r requirements.txt -git clone git@github.com:NousResearch/hecate.git -cd hecate -pip install -e . ``` -## Toolsets System - -The agent uses a toolsets system for organizing and managing tools. All tools must be part of a toolset to be accessible - individual tool selection is not supported. This ensures consistent and logical grouping of capabilities. - -### Key Concepts - -- **Toolsets**: Logical groups of tools for specific use cases (e.g., "research", "development", "debugging") -- **Composition**: Toolsets can include other toolsets for powerful combinations -- **Custom Toolsets**: Create your own toolsets at runtime or by editing `toolsets.py` -- **Toolset-Only Access**: Tools are only accessible through toolsets, not individually - -### Available Toolsets - -See `toolsets.py` for the complete list of predefined toolsets including: -- Basic toolsets (web, terminal, vision, creative, reasoning) -- Composite toolsets (research, development, analysis, etc.) -- Scenario-specific toolsets (debugging, documentation, API testing, etc.) -- Special toolsets (safe mode without terminal, minimal, offline) - -### Using Toolsets +### Basic Usage ```bash -# Use a predefined toolset -python run_agent.py --enabled_toolsets=research --query "Find latest AI papers" +python run_agent.py \ + --enabled_toolsets web \ + --query "Search for the latest AI news" +``` + +### With Real-time Logging + +```bash +# Terminal 1: Start API endpoint server +python api_endpoint/logging_server.py + +# Terminal 2: Run agent +python run_agent.py \ + --enabled_toolsets web \ + --enable_websocket_logging \ + --query "Your question here" +``` + +### With Desktop UI (Recommended) + +The easiest way to use Hermes Agent is through the desktop UI: + +```bash +# One-command launch (starts server + UI) +cd ui && ./start_hermes_ui.sh + +# Or manually: +# Terminal 1: Start server +python api_endpoint/logging_server.py + +# Terminal 2: Start UI +python ui/hermes_ui.py +``` + +The UI provides: +- ๐Ÿ–ฑ๏ธ Point-and-click query submission +- ๐ŸŽ›๏ธ Easy model and tool selection +- ๐Ÿ“Š Real-time event visualization +- ๐Ÿ”„ Automatic WebSocket connection +- ๐Ÿ“ Session history + +## Project Structure + +``` +Hermes-Agent/ +โ”œโ”€โ”€ run_agent.py # Main agent runner +โ”œโ”€โ”€ model_tools.py # Tool definitions and handling +โ”œโ”€โ”€ toolsets.py # Predefined toolset combinations +โ”œโ”€โ”€ requirements.txt # Python dependencies +โ”‚ +โ”œโ”€โ”€ ui/ # Desktop UI โญ NEW +โ”‚ โ”œโ”€โ”€ hermes_ui.py # PySide6 desktop application +โ”‚ โ”œโ”€โ”€ start_hermes_ui.sh # UI launcher script +โ”‚ โ””โ”€โ”€ test_ui_flow.py # UI integration tests +โ”‚ +โ”œโ”€โ”€ tools/ # Tool implementations +โ”‚ โ”œโ”€โ”€ web_tools.py # Web search, extract, crawl +โ”‚ โ”œโ”€โ”€ vision_tools.py # Image analysis +โ”‚ โ”œโ”€โ”€ terminal_tool.py # Command execution +โ”‚ โ”œโ”€โ”€ image_generation_tool.py +โ”‚ โ””โ”€โ”€ ... +โ”‚ +โ”œโ”€โ”€ api_endpoint/ # FastAPI + WebSocket logging endpoint +โ”‚ โ”œโ”€โ”€ logging_server.py # WebSocket server + Agent API โญ ENHANCED +โ”‚ โ”œโ”€โ”€ websocket_logger.py # Client library +โ”‚ โ”œโ”€โ”€ README.md # API endpoint docs +โ”‚ โ””โ”€โ”€ ... +โ”‚ +โ”œโ”€โ”€ logs/ # Log files +โ”‚ โ””โ”€โ”€ realtime/ # WebSocket session logs +โ”‚ +โ””โ”€โ”€ tests/ # Test files +``` + +## Available Toolsets + +### Basic Toolsets +- **web**: Web search, extract, and crawl +- **terminal**: Command execution +- **vision**: Image analysis +- **creative**: Image generation +- **reasoning**: Mixture of agents + +### Composite Toolsets +- **research**: Web + vision tools +- **development**: Web + terminal + vision +- **analysis**: Web + vision + reasoning +- **full_stack**: All tools enabled + +### Usage Examples + +```bash +# Research with web and vision +python run_agent.py --enabled_toolsets research --query "..." + +# Development with terminal access +python run_agent.py --enabled_toolsets development --query "..." # Combine multiple toolsets -python run_agent.py --enabled_toolsets=web,vision --query "Analyze this website" - -# Safe mode (no terminal access) -python run_agent.py --enabled_toolsets=safe --query "Help without running commands" - -# List all available toolsets and tools -python run_agent.py --list_tools +python run_agent.py --enabled_toolsets web,vision --query "..." ``` -For detailed documentation on toolsets, see `TOOLSETS_README.md`. +## Real-time Logging System -## Basic Usage +Monitor your agent's execution in real-time with the FastAPI WebSocket endpoint using a **persistent connection pool** architecture. -### Default (all tools enabled) +### Architecture + +The logging system uses a **singleton WebSocket connection** that persists across multiple agent runs: +- โœ… **No timeouts** - connection stays alive indefinitely +- โœ… **No reconnection overhead** - connect once, reuse forever +- โœ… **Parallel execution** - multiple agents share one connection +- โœ… **Production-ready** - graceful shutdown with signal handlers + +See [`api_endpoint/PERSISTENT_CONNECTION_GUIDE.md`](api_endpoint/PERSISTENT_CONNECTION_GUIDE.md) for technical details. + +### Features +- Track all API calls and responses +- **Persistent connection** - one WebSocket for all sessions +- Monitor tool executions with parameters and timing +- Capture errors and completion status +- REST API for querying sessions +- Real-time WebSocket broadcasting + +### Documentation +See [`api_endpoint/README.md`](api_endpoint/README.md) for complete documentation. + +### Quick Start ```bash -python run_agent.py \ - --query "search up the latest docs on jit in python 3.13 and write me basic example that's not in their docs. profile its perf" \ - --max_turns 20 \ - --model claude-sonnet-4-20250514 \ - --base_url https://api.anthropic.com/v1/ \ - --api_key $ANTHROPIC_API_KEY +# Start API endpoint server +python api_endpoint/logging_server.py + +# Run agent with logging +python run_agent.py --enable_websocket_logging --query "..." + +# View logs +curl http://localhost:8000/sessions ``` -### With specific toolset +## Configuration + +### Environment Variables + +Create a `.env` file in the project root: + ```bash -python run_agent.py \ - --query "Debug this Python error" \ - --enabled_toolsets=debugging \ - --model claude-sonnet-4-20250514 \ - --api_key $ANTHROPIC_API_KEY +# API Keys +ANTHROPIC_API_KEY=your_key_here +FIRECRAWL_API_KEY=your_key_here +NOUS_API_KEY=your_key_here +FAL_KEY=your_key_here + +# Optional +WEB_TOOLS_DEBUG=true # Enable web tools debug logging ``` -### Python API +### Command-Line Options + +```bash +python run_agent.py --help +``` + +Key options: +- `--query`: Your question/task +- `--model`: Model to use (default: claude-sonnet-4-5-20250929) +- `--enabled_toolsets`: Toolsets to enable +- `--max_turns`: Maximum conversation turns +- `--enable_websocket_logging`: Enable real-time logging +- `--verbose`: Verbose debug output +- `--save_trajectories`: Save conversation trajectories + +## Parallel Execution + +The persistent connection pool enables true parallel agent execution. Multiple agents can run simultaneously, all sharing the same WebSocket connection for logging. + +### Test Parallel Execution + +```bash +python test_parallel_execution.py +``` + +This script runs three tests: +1. **Sequential** - baseline (3 queries one after another) +2. **Parallel** - 3 queries simultaneously +3. **High Concurrency** - 10 queries simultaneously + +**Expected Results:** +- โšก ~3x speedup with parallel execution +- โœ… All queries logged to same connection +- โœ… No connection timeouts or errors + +### Custom Parallel Code + ```python +import asyncio from run_agent import AIAgent -# Use a specific toolset -agent = AIAgent( - model="claude-opus-4-20250514", - enabled_toolsets=["research"] -) -response = agent.chat("Find information about quantum computing") +async def main(): + agent1 = AIAgent(enable_websocket_logging=True) + agent2 = AIAgent(enable_websocket_logging=True) + + # Run in parallel - both use shared connection! + results = await asyncio.gather( + agent1.run_conversation("Query 1"), + agent2.run_conversation("Query 2") + ) -# Create custom toolset at runtime -from toolsets import create_custom_toolset - -create_custom_toolset( - name="my_tools", - description="My custom toolkit", - tools=["web_search"], - includes=["terminal", "vision"] -) - -agent = AIAgent(enabled_toolsets=["my_tools"]) +asyncio.run(main()) ``` -## Command Line Arguments - -- `--query`: The question or task for the agent -- `--model`: Model to use (default: claude-opus-4-20250514) -- `--api_key`: API key for authentication -- `--base_url`: API endpoint URL -- `--max_turns`: Maximum number of tool-calling iterations -- `--enabled_toolsets`: Comma-separated list of toolsets to enable -- `--disabled_toolsets`: Comma-separated list of toolsets to disable -- `--list_tools`: List all available toolsets and tools -- `--save_trajectories`: Save conversation trajectories to JSONL files - -## Environment Variables - -Set these environment variables to enable different tools: - -- `FIRECRAWL_API_KEY`: For web tools (search, extract, crawl) -- `MORPH_API_KEY`: For terminal tools -- `NOUS_API_KEY`: For vision and reasoning tools -- `FAL_KEY`: For image generation tools -- `ANTHROPIC_API_KEY`: For the main agent model - -## Documentation - -- `TOOLSETS_README.md`: Comprehensive guide to the toolsets system -- `toolsets.py`: View and modify available toolsets -- `model_tools.py`: Core tool definitions and handlers - ## Examples -See `TOOLSETS_README.md` for extensive examples of using different toolsets for various scenarios. +### Investment Research +```bash +python run_agent.py \ + --enabled_toolsets web \ + --query "Find publicly traded companies in renewable energy" +``` + +### Code Analysis +```bash +python run_agent.py \ + --enabled_toolsets development \ + --query "Analyze the codebase and suggest improvements" +``` + +### Image Analysis +```bash +python run_agent.py \ + --enabled_toolsets vision \ + --query "Analyze this chart and explain the trends" +``` + +## Development + +### Adding New Tools + +1. Create tool in `tools/` directory +2. Register in `model_tools.py` +3. Add to appropriate toolset in `toolsets.py` + +### Running Tests + +```bash +# Test web tools +python tests/test_web_tools.py + +# Test API endpoint / logging +cd api_endpoint +./test_websocket_logging.sh +``` + +## License + +MIT License - see LICENSE file for details + +## Contributing + +Contributions welcome! Please open an issue or PR. + +## Support + +For questions or issues: +1. Check documentation in `api_endpoint/` +2. Review example usage in this README +3. Open a GitHub issue + +--- + +Built with โค๏ธ for advanced AI agent workflows diff --git a/__pycache__/model_tools.cpython-310.pyc b/__pycache__/model_tools.cpython-310.pyc deleted file mode 100644 index 519e30120e..0000000000 Binary files a/__pycache__/model_tools.cpython-310.pyc and /dev/null differ diff --git a/__pycache__/web_tools.cpython-310.pyc b/__pycache__/web_tools.cpython-310.pyc deleted file mode 100644 index d20f5fb508..0000000000 Binary files a/__pycache__/web_tools.cpython-310.pyc and /dev/null differ diff --git a/api_endpoint/__init__.py b/api_endpoint/__init__.py new file mode 100644 index 0000000000..ce2dd47c01 --- /dev/null +++ b/api_endpoint/__init__.py @@ -0,0 +1,26 @@ +""" +Hermes Agent - API Endpoint & Real-time Logging + +This package provides a FastAPI WebSocket endpoint for real-time logging of the Hermes Agent. + +Components: +- logging_server: FastAPI server that receives and stores events +- websocket_logger: Client library for sending events from the agent + +Usage: + # Start the API endpoint server + python api_endpoint/logging_server.py + + # Use in agent code + from api_endpoint.websocket_logger import WebSocketLogger + +For more information, see: +- WEBSOCKET_LOGGING_GUIDE.md - User guide +- IMPLEMENTATION_SUMMARY.md - Technical details +""" + +from .websocket_logger import WebSocketLogger, SyncWebSocketLogger + +__all__ = ['WebSocketLogger', 'SyncWebSocketLogger'] +__version__ = '1.0.0' + diff --git a/api_endpoint/logging_server.py b/api_endpoint/logging_server.py new file mode 100644 index 0000000000..3d769a37e9 --- /dev/null +++ b/api_endpoint/logging_server.py @@ -0,0 +1,597 @@ +#!/usr/bin/env python3 +""" +Hermes Agent - Real-time Logging Server + +A FastAPI server with WebSocket support that listens for agent execution events +and logs them to JSON files in real-time. + +Events tracked: +- User queries +- API calls (requests to the model) +- Assistant responses +- Tool calls (name, parameters, timing) +- Tool results (outputs, errors, duration) +- Final responses +- Session metadata + +Usage: + python logging_server.py + +Or with uvicorn directly: + uvicorn logging_server:app --host 0.0.0.0 --port 8000 --reload + +The server will listen for WebSocket connections at ws://localhost:8000/ws +""" + +import json +import asyncio +import threading +from datetime import datetime +from pathlib import Path +from typing import Dict, Any, List, Optional +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +import uvicorn + +# Configuration +LOGS_DIR = Path(__file__).parent / "logs" / "realtime" +LOGS_DIR.mkdir(parents=True, exist_ok=True) + +# Initialize FastAPI app +app = FastAPI( + title="Hermes Agent Logging Server", + description="Real-time WebSocket server for agent execution logging", + version="1.0.0" +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +class SessionLogger: + """ + Manages logging for a single agent session. + + Each agent execution gets its own SessionLogger instance. + Responsible for: + - Collecting all events for the session + - Saving events to JSON file in real-time + - Managing session lifecycle (start -> events -> finalize) + """ + + def __init__(self, session_id: str): + self.session_id = session_id + self.start_time = datetime.now() + self.events: List[Dict[str, Any]] = [] # In-memory list of all events + self.log_file = LOGS_DIR / f"session_{session_id}.json" # Where to save on disk + + # Initialize session data structure + # This is what gets saved to the JSON file + self.session_data = { + "session_id": session_id, + "start_time": self.start_time.isoformat(), + "end_time": None, # Set when session completes + "events": [], # Will be populated as events come in + "metadata": {} # Model, toolsets, etc. (set via session_start event) + } + + def add_event(self, event: Dict[str, Any]): + """ + Add an event to the session log. + + Called every time a new event arrives (query, api_call, tool_call, etc). + IMMEDIATELY saves to file for real-time persistence. + """ + # Add timestamp if not present (should always be added, but safety check) + if "timestamp" not in event: + event["timestamp"] = datetime.now().isoformat() + + # Add to in-memory event list + self.events.append(event) + self.session_data["events"] = self.events + + # CRITICAL: Save to file immediately (real-time logging) + # This ensures events are persisted even if agent crashes + self._save() + + def set_metadata(self, metadata: Dict[str, Any]): + """Set session metadata (model, toolsets, etc.).""" + self.session_data["metadata"].update(metadata) + self._save() + + def finalize(self): + """Finalize the session and save.""" + self.session_data["end_time"] = datetime.now().isoformat() + self._save() + + def _save(self): + """ + Save current session data to JSON file. + + Called after EVERY event is added - provides real-time persistence. + If file writing fails, logs error but continues (doesn't crash server). + """ + try: + # Write complete session data to JSON file + # indent=2 makes it human-readable + # ensure_ascii=False preserves Unicode characters + with open(self.log_file, 'w', encoding='utf-8') as f: + json.dump(self.session_data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f"โŒ Error saving session log: {e}") + + +class ConnectionManager: + """ + Manages WebSocket connections and active sessions. + + Global singleton that: + - Tracks all active WebSocket connections (for broadcasting) + - Manages all SessionLogger instances (one per agent session) + - Coordinates between WebSocket events and file logging + """ + + def __init__(self): + self.active_connections: List[WebSocket] = [] # All connected WebSocket clients + self.sessions: Dict[str, SessionLogger] = {} # session_id -> SessionLogger mapping + + async def connect(self, websocket: WebSocket): + """Accept a new WebSocket connection.""" + await websocket.accept() + self.active_connections.append(websocket) + print(f"โœ… WebSocket connected. Active connections: {len(self.active_connections)}") + + def disconnect(self, websocket: WebSocket): + """Remove a WebSocket connection.""" + if websocket in self.active_connections: + self.active_connections.remove(websocket) + print(f"โŒ WebSocket disconnected. Active connections: {len(self.active_connections)}") + + def get_or_create_session(self, session_id: str) -> SessionLogger: + """ + Get existing session logger or create a new one. + + Called when an event arrives for a session. Creates SessionLogger + on first event, reuses it for subsequent events from same session. + """ + if session_id not in self.sessions: + # First time seeing this session - create new logger + self.sessions[session_id] = SessionLogger(session_id) + print(f"๐Ÿ“ Created new session: {session_id}") + return self.sessions[session_id] + + def finalize_session(self, session_id: str): + """Finalize and clean up a session.""" + if session_id in self.sessions: + self.sessions[session_id].finalize() + print(f"โœ… Session finalized: {session_id}") + + async def broadcast(self, message: Dict[str, Any]): + """ + Broadcast a message to all connected WebSocket clients. + + Allows multiple clients (e.g., multiple browser tabs) to watch + the same agent session in real-time. Future UI feature. + """ + disconnected = [] + for connection in self.active_connections: + try: + await connection.send_json(message) + except Exception: + # Connection closed - mark for removal + disconnected.append(connection) + + # Clean up disconnected clients silently + for conn in disconnected: + if conn in self.active_connections: + self.active_connections.remove(conn) + + +# Global connection manager +manager = ConnectionManager() + + +# Request/Response models for API endpoints +class AgentRequest(BaseModel): + """Request model for starting an agent run.""" + query: str + model: str = "claude-sonnet-4-5-20250929" + base_url: str = "https://api.anthropic.com/v1/" + enabled_toolsets: Optional[List[str]] = None + disabled_toolsets: Optional[List[str]] = None + max_turns: int = 10 + mock_web_tools: bool = False + mock_delay: int = 60 + verbose: bool = False + + +class AgentResponse(BaseModel): + """Response model for agent run request.""" + status: str + session_id: str + message: str + + +@app.get("/") +async def root(): + """Root endpoint - server status.""" + return { + "status": "running", + "service": "Hermes Agent Logging Server", + "websocket_url": "ws://localhost:8000/ws", + "active_connections": len(manager.active_connections), + "active_sessions": len(manager.sessions), + "logs_directory": str(LOGS_DIR) + } + + +@app.get("/sessions") +async def list_sessions(): + """List all active and recent sessions.""" + # Get all session log files + session_files = list(LOGS_DIR.glob("session_*.json")) + + sessions = [] + for session_file in sorted(session_files, key=lambda x: x.stat().st_mtime, reverse=True): + try: + with open(session_file, 'r', encoding='utf-8') as f: + session_data = json.load(f) + sessions.append({ + "session_id": session_data.get("session_id"), + "start_time": session_data.get("start_time"), + "end_time": session_data.get("end_time"), + "event_count": len(session_data.get("events", [])), + "file": str(session_file) + }) + except Exception as e: + print(f"โš ๏ธ Error reading session file {session_file}: {e}") + + return { + "total_sessions": len(sessions), + "sessions": sessions + } + + +@app.get("/sessions/{session_id}") +async def get_session(session_id: str): + """Get detailed data for a specific session.""" + session_file = LOGS_DIR / f"session_{session_id}.json" + + if not session_file.exists(): + return {"error": "Session not found"}, 404 + + try: + with open(session_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + return {"error": f"Failed to load session: {str(e)}"}, 500 + + +@app.post("/agent/run", response_model=AgentResponse) +async def run_agent(request: AgentRequest, background_tasks: BackgroundTasks): + """ + Start an agent run with specified parameters. + + This endpoint triggers an agent execution in the background and returns immediately. + The agent will connect to the WebSocket endpoint to send real-time events. + + Args: + request: AgentRequest with query and configuration + background_tasks: FastAPI background tasks for async execution + + Returns: + AgentResponse with session_id for tracking + """ + import uuid + import sys + import os + + # Generate session ID for this run - we'll pass it to the agent + session_id = str(uuid.uuid4()) + + # Add parent directory to path to import run_agent + parent_dir = str(Path(__file__).parent.parent) + if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + + from run_agent import AIAgent + + # Run agent in background thread (not blocking the API) + def run_agent_background(): + """Run agent in a separate thread.""" + try: + # Initialize agent with WebSocket logging enabled + agent = AIAgent( + base_url=request.base_url, + model=request.model, + api_key=os.getenv("ANTHROPIC_API_KEY"), + max_iterations=request.max_turns, + enabled_toolsets=request.enabled_toolsets, + disabled_toolsets=request.disabled_toolsets, + save_trajectories=False, + verbose_logging=request.verbose, + enable_websocket_logging=True, # Always enable for UI + websocket_server="ws://localhost:8000/ws", + mock_web_tools=request.mock_web_tools, + mock_delay=request.mock_delay + ) + + # Run conversation with our session_id + result = agent.run_conversation( + request.query, + session_id=session_id # Pass session_id so it matches + ) + + print(f"โœ… Agent run completed: {session_id[:8]}...") + print(f" Final response: {result['final_response'][:100] if result.get('final_response') else 'No response'}...") + + except Exception as e: + print(f"โŒ Error running agent {session_id[:8]}...: {e}") + import traceback + traceback.print_exc() + + # Start agent in background thread + thread = threading.Thread(target=run_agent_background, daemon=True) + thread.start() + + return AgentResponse( + status="started", + session_id=session_id, + message=f"Agent started with session ID: {session_id}" + ) + + +@app.get("/tools") +async def get_available_tools(): + """Get list of available toolsets and tools.""" + try: + import sys + parent_dir = str(Path(__file__).parent.parent) + if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + + from toolsets import get_all_toolsets, get_toolset_info + + all_toolsets = get_all_toolsets() + toolsets_info = [] + + for name in all_toolsets.keys(): + info = get_toolset_info(name) + if info: + toolsets_info.append({ + "name": name, + "description": info['description'], + "tool_count": info['tool_count'], + "resolved_tools": info['resolved_tools'] + }) + + return { + "toolsets": toolsets_info + } + except Exception as e: + return {"error": f"Failed to load tools: {str(e)}"} + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """ + WebSocket endpoint for receiving real-time agent events. + + This is the main entry point for all logging. Agents connect here and send events. + + Message Flow: + 1. Agent connects to ws://localhost:8000/ws + 2. Agent sends events as JSON messages + 3. Server parses event_type and routes to appropriate handler + 4. Event is added to SessionLogger (saved to file) + 5. Event is broadcast to all connected clients + 6. Acknowledgment sent back to agent + + Expected message format: + { + "session_id": "unique-session-id", // Links event to specific session + "event_type": "query" | "api_call" | ..., // What kind of event + "data": { ... event-specific data ... } // Event payload + } + """ + # Accept the WebSocket connection + await manager.connect(websocket) + + try: + # Main event loop - runs until client disconnects + while True: + # Receive message from client (agent) + # This is a blocking call - waits for next message + message = await websocket.receive_json() + + # Parse the standard message structure + session_id = message.get("session_id") # Which agent session + event_type = message.get("event_type") # What kind of event + data = message.get("data", {}) # Event payload + + # Validate: session_id is required + if not session_id: + await websocket.send_json({ + "error": "session_id is required" + }) + continue + + # Get or create SessionLogger for this session + # First event creates it, subsequent events reuse it + session = manager.get_or_create_session(session_id) + + # Route event to appropriate handler based on event_type + # Each handler extracts relevant data and adds to session log + + if event_type == "session_start": + # Initial event - sent when agent first connects + # Contains metadata about the session (model, toolsets, etc.) + session.set_metadata(data) + print(f"๐Ÿš€ Session started: {session_id}") + + elif event_type == "query": + # User query + session.add_event({ + "type": "query", + "query": data.get("query"), + "toolsets": data.get("toolsets"), + "model": data.get("model") + }) + print(f"๐Ÿ“ Query logged: {data.get('query', '')[:60]}...") + + elif event_type == "api_call": + # API call to model + session.add_event({ + "type": "api_call", + "call_number": data.get("call_number"), + "message_count": data.get("message_count"), + "has_tools": data.get("has_tools") + }) + print(f"๐Ÿ”„ API call #{data.get('call_number')} logged") + + elif event_type == "response": + # Assistant response + session.add_event({ + "type": "response", + "call_number": data.get("call_number"), + "content": data.get("content"), + "has_tool_calls": data.get("has_tool_calls"), + "tool_call_count": data.get("tool_call_count"), + "duration": data.get("duration") + }) + print(f"๐Ÿค– Response logged: {data.get('content', '')[:60]}...") + + elif event_type == "tool_call": + # Tool execution + session.add_event({ + "type": "tool_call", + "call_number": data.get("call_number"), + "tool_index": data.get("tool_index"), + "tool_name": data.get("tool_name"), + "parameters": data.get("parameters"), + "tool_call_id": data.get("tool_call_id") + }) + print(f"๐Ÿ”ง Tool call logged: {data.get('tool_name')}") + + elif event_type == "tool_result": + # Tool result - captures output from tool execution + # Now includes BOTH truncated preview AND full raw result + session.add_event({ + "type": "tool_result", + "call_number": data.get("call_number"), + "tool_index": data.get("tool_index"), + "tool_name": data.get("tool_name"), + "result": data.get("result"), # Truncated preview (1000 chars) + "raw_result": data.get("raw_result"), # NEW: Full untruncated result + "error": data.get("error"), + "duration": data.get("duration"), + "tool_call_id": data.get("tool_call_id") + }) + + # Enhanced logging with size information + if data.get("error"): + print(f"โŒ Tool error logged: {data.get('tool_name')}") + else: + # Show size of raw result to indicate data volume + raw_size = len(data.get("raw_result", "")) if data.get("raw_result") else len(data.get("result", "")) + size_kb = raw_size / 1024 + print(f"โœ… Tool result logged: {data.get('tool_name')} ({size_kb:.1f} KB)") + + elif event_type == "error": + # Error event + session.add_event({ + "type": "error", + "error_message": data.get("error_message"), + "call_number": data.get("call_number") + }) + print(f"โŒ Error logged: {data.get('error_message', '')[:60]}...") + + elif event_type == "complete": + # Session complete + session.add_event({ + "type": "complete", + "final_response": data.get("final_response"), + "total_calls": data.get("total_calls"), + "completed": data.get("completed") + }) + manager.finalize_session(session_id) + print(f"๐ŸŽ‰ Session complete: {session_id}") + + else: + # Unknown event type - log it anyway + session.add_event({ + "type": event_type or "unknown", + **data + }) + print(f"โš ๏ธ Unknown event type: {event_type}") + + # Broadcast event to all connected clients (for future real-time UI) + # Allows multiple browsers/dashboards to watch same session live + await manager.broadcast({ + "session_id": session_id, + "event_type": event_type, + "timestamp": datetime.now().isoformat(), + "data": data + }) + + # Send acknowledgment back to sender + # Confirms event was received and logged + # Handle case where client disconnects before we can ack + try: + await websocket.send_json({ + "status": "logged", + "session_id": session_id, + "event_type": event_type + }) + except Exception: + # Connection closed before ack - this is normal for "complete" event + # Client disconnects after sending, so we can't ack + pass + + except WebSocketDisconnect: + manager.disconnect(websocket) + except Exception as e: + print(f"โŒ WebSocket error: {e}") + manager.disconnect(websocket) + + +def main(host: str = "0.0.0.0", port: int = 8000, reload: bool = False): + """ + Start the logging server. + + Args: + host: Host to bind to (default: 0.0.0.0) + port: Port to run on (default: 8000) + reload: Enable auto-reload on file changes (default: False) + """ + print("๐Ÿš€ Hermes Agent Logging Server") + print("=" * 50) + print(f"๐Ÿ“‚ Logs directory: {LOGS_DIR}") + print(f"๐ŸŒ Server starting at http://{host}:{port}") + print(f"๐Ÿ”Œ WebSocket endpoint: ws://{host}:{port}/ws") + print(f"๐Ÿ”„ Auto-reload: {'enabled' if reload else 'disabled'}") + print("\n๐Ÿ“ก Ready to receive agent events...") + print("=" * 50) + + uvicorn.run( + "logging_server:app", + host=host, + port=port, + reload=reload, + log_level="info", + timeout_keep_alive=600 # Keep HTTP/WS connections alive for 10 minutes of inactivity + # Note: WebSocket ping/pong disabled in client to avoid timeout during blocked event loop + ) + + +if __name__ == "__main__": + import fire + fire.Fire(main) + diff --git a/api_endpoint/test_websocket_logging.sh b/api_endpoint/test_websocket_logging.sh new file mode 100755 index 0000000000..6c994f740d --- /dev/null +++ b/api_endpoint/test_websocket_logging.sh @@ -0,0 +1,91 @@ +#!/bin/bash +# Test script for WebSocket logging system +# +# This script demonstrates the complete WebSocket logging workflow: +# 1. Starts the logging server +# 2. Runs the agent with WebSocket logging enabled +# 3. Shows the logged data +# +# Usage: ./test_websocket_logging.sh + +set -e # Exit on error + +echo "๐Ÿงช Testing WebSocket Logging System" +echo "====================================" +echo "" + +# Check if required packages are installed +echo "๐Ÿ“ฆ Checking dependencies..." +python -c "import fastapi; import uvicorn; import websockets" 2>/dev/null || { + echo "โŒ Missing dependencies. Installing..." + pip install fastapi uvicorn websockets +} +echo "โœ… Dependencies OK" +echo "" + +# Start the logging server in the background +echo "๐Ÿš€ Starting logging server..." +python api_endpoint/logging_server.py --port 8000 & +SERVER_PID=$! + +# Give server time to start +sleep 2 + +# Check if server is running +if ps -p $SERVER_PID > /dev/null; then + echo "โœ… Logging server started (PID: $SERVER_PID)" +else + echo "โŒ Failed to start logging server" + exit 1 +fi + +echo "" +echo "๐Ÿค– Running agent with WebSocket logging..." +echo "" + +# Run the agent with WebSocket logging +python run_agent.py \ + --enabled_toolsets web \ + --enable_websocket_logging \ + --query "What are the top 3 programming languages in 2025?" \ + --max_turns 5 + +echo "" +echo "โœ… Agent execution complete!" +echo "" + +# Show the most recent log file +echo "๐Ÿ“Š Viewing logged session data..." +echo "" + +LATEST_LOG=$(ls -t logs/realtime/session_*.json 2>/dev/null | head -1) + +if [ -f "$LATEST_LOG" ]; then + echo "๐Ÿ“„ Log file: $LATEST_LOG" + echo "" + + # Pretty print the JSON if jq is available + if command -v jq &> /dev/null; then + echo "Event summary:" + jq '.events[] | {type: .type, timestamp: .timestamp}' "$LATEST_LOG" + echo "" + echo "Total events: $(jq '.events | length' "$LATEST_LOG")" + else + echo "Content (install 'jq' for pretty printing):" + cat "$LATEST_LOG" + fi +else + echo "โš ๏ธ No log files found in logs/realtime/" +fi + +echo "" +echo "๐Ÿ›‘ Stopping logging server..." +kill $SERVER_PID 2>/dev/null || true + +echo "โœ… Test complete!" +echo "" +echo "Next steps:" +echo " 1. Start server: python api_endpoint/logging_server.py" +echo " 2. Run agent: python run_agent.py --enable_websocket_logging --query \"...\"" +echo " 3. View logs: http://localhost:8000/sessions" + diff --git a/api_endpoint/websocket_connection_pool.py b/api_endpoint/websocket_connection_pool.py new file mode 100644 index 0000000000..16ca097445 --- /dev/null +++ b/api_endpoint/websocket_connection_pool.py @@ -0,0 +1,382 @@ +""" +WebSocket Connection Pool - Persistent Connection Manager + +This module provides a singleton WebSocket connection that persists across +multiple agent runs. This is a more robust architecture than creating a new +connection for each run. + +Benefits: +- No timeout issues (connection stays alive indefinitely) +- No reconnection overhead (connect once) +- Supports parallel agent runs (multiple sessions share one socket) +- Proper shutdown handling (SIGTERM/SIGINT) +- Thread-safe concurrent sends +""" + +import asyncio +import signal +import websockets +from typing import Optional, Dict, Any +import json +import atexit +import sys +import threading +from datetime import datetime + + +class WebSocketConnectionPool: + """ + Singleton WebSocket connection manager. + + Maintains a single persistent connection to the logging server + that all agent sessions can use. Handles graceful shutdown. + + Usage: + # Get singleton instance + pool = WebSocketConnectionPool() + + # Connect (idempotent - safe to call multiple times) + await pool.connect() + + # Send events (thread-safe, multiple sessions can call concurrently) + await pool.send_event("query", session_id, {...}) + + # Shutdown handled automatically on SIGTERM/SIGINT + """ + + _instance: Optional['WebSocketConnectionPool'] = None + + def __new__(cls): + """Ensure only one instance exists (singleton pattern).""" + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """Initialize the connection pool (only once).""" + if getattr(self, '_initialized', False): + return + + self.websocket: Optional[websockets.WebSocketClientProtocol] = None + self.server_url: str = "ws://localhost:8000/ws" + self.connected: bool = False + # Store reference to loop for signal handlers + # Agent code should never close event loops when using persistent connections + self.loop: Optional[asyncio.AbstractEventLoop] = None + # Locks are created lazily when event loop exists + self._send_lock: Optional[asyncio.Lock] = None + self._connect_lock: Optional[asyncio.Lock] = None + self._locks_loop: Optional[asyncio.AbstractEventLoop] = None # Track which loop created locks + self._init_lock = threading.Lock() # Thread-safe lock initialization + self._shutdown_in_progress = False + self._initialized = True + + # Register shutdown handlers for graceful cleanup + # These ensure WebSocket is closed properly on exit + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + atexit.register(self._cleanup_sync) + + print("๐Ÿ”Œ WebSocket connection pool initialized") + + def _ensure_locks(self): + """ + Lazy initialization of asyncio locks with thread safety and loop tracking. + + Locks must be created when an event loop exists, not at import time. + If the event loop changes between runs, locks must be recreated because + asyncio.Lock objects are bound to the loop that created them. + + This is called before any async operation that needs locks. + Uses a threading.Lock to prevent race conditions during initialization. + """ + with self._init_lock: # Thread-safe initialization + try: + current_loop = asyncio.get_event_loop() + except RuntimeError: + # No event loop in current thread + return + + # Recreate locks if: + # 1. Locks don't exist yet, OR + # 2. Event loop has changed (locks are bound to the loop that created them) + if self._locks_loop is not current_loop or self._send_lock is None: + self._send_lock = asyncio.Lock() + self._connect_lock = asyncio.Lock() + self._locks_loop = current_loop + + async def connect(self, server_url: str = "ws://localhost:8000/ws") -> bool: + """ + Connect to WebSocket server. + + This is idempotent - safe to call multiple times. If already connected, + does nothing. If connection failed previously, will retry. + + Args: + server_url: WebSocket server URL (default: ws://localhost:8000/ws) + + Returns: + bool: True if connected successfully, False otherwise + """ + # Ensure locks exist (lazy initialization) + self._ensure_locks() + + async with self._connect_lock: + # Always update loop reference to current loop (even if already connected) + # This ensures signal handlers and cleanup use the correct loop + self.loop = asyncio.get_event_loop() + + # Already connected - nothing to do + if self.connected and self.websocket: + return True + + try: + self.server_url = server_url + + # Establish persistent WebSocket connection + # No ping/pong needed since connection stays open indefinitely + self.websocket = await websockets.connect( + server_url, + ping_interval=None, # Disable ping/pong (not needed for persistent connection) + max_size=10 * 1024 * 1024, # 10MB max message size for large tool results + open_timeout=10, # 10s timeout for initial connection + close_timeout=5 # 5s timeout for close handshake + ) + + self.connected = True + + print(f"โœ… Connected to logging server (persistent): {server_url}") + return True + + except Exception as e: + print(f"โš ๏ธ Failed to connect to logging server: {e}") + self.connected = False + self.websocket = None + return False + + async def send_event( + self, + event_type: str, + session_id: str, + data: Dict[str, Any], + retry: bool = True + ) -> bool: + """ + Send event to logging server (thread-safe). + + Multiple agent runs can call this concurrently. The send lock ensures + only one message is sent at a time (WebSocket protocol requirement). + + Args: + event_type: Type of event (query, api_call, response, tool_call, tool_result, error, complete) + session_id: Unique session identifier + data: Event-specific data dictionary + retry: Whether to retry connection if disconnected (default: True) + + Returns: + bool: True if sent successfully, False otherwise + """ + # Try to connect if not connected (or reconnect if disconnected) + if not self.connected or not self.websocket: + if retry: + await self.connect() + if not self.connected: + return False # Give up if connection fails + + # Ensure locks exist (lazy initialization) + self._ensure_locks() + + # Lock to prevent concurrent sends (WebSocket requires sequential sends) + async with self._send_lock: + try: + # Create standardized message format + message = { + "session_id": session_id, + "event_type": event_type, + "data": data, + "timestamp": datetime.now().isoformat() + } + + # Send message as JSON + await self.websocket.send(json.dumps(message)) + + # Wait for server acknowledgment (with timeout) + # This confirms the server received and processed the event + try: + response = await asyncio.wait_for( + self.websocket.recv(), + timeout=2.0 # Increased to 2s for busy servers + ) + # Successfully received acknowledgment + return True + + except asyncio.TimeoutError: + # No response within timeout - that's OK, message likely sent + # Server might be busy processing + return True + + except websockets.exceptions.ConnectionClosed: + print(f"โš ๏ธ WebSocket connection closed unexpectedly") + self.connected = False + + # Try to reconnect and resend (one retry) + if retry: + print("๐Ÿ”„ Attempting to reconnect...") + if await self.connect(): + # Recursively call with retry=False to avoid infinite loop + return await self.send_event(event_type, session_id, data, retry=False) + + return False + + except Exception as e: + print(f"โš ๏ธ Error sending event: {e}") + self.connected = False + return False + + async def disconnect(self): + """ + Gracefully close the WebSocket connection. + + Called on shutdown (SIGTERM/SIGINT/exit). Ensures proper cleanup. + """ + if self._shutdown_in_progress: + return # Already shutting down + + self._shutdown_in_progress = True + + if self.websocket and self.connected: + try: + await self.websocket.close() + self.connected = False + print("โœ… WebSocket connection pool closed gracefully") + except Exception as e: + print(f"โš ๏ธ Error closing WebSocket: {e}") + + self._shutdown_in_progress = False + + def _signal_handler(self, signum, frame): + """ + Handle SIGTERM/SIGINT signals for graceful shutdown. + + When user presses Ctrl+C or system sends SIGTERM, this ensures + the WebSocket is closed properly before exit. + """ + print(f"\n๐Ÿ›‘ Received signal {signum}, closing WebSocket connection pool...") + + # Check if we have a valid loop and are connected + if self.loop and not self.loop.is_closed() and self.connected and not self._shutdown_in_progress: + try: + # If loop is not running, we can wait for disconnect + if not self.loop.is_running(): + self.loop.run_until_complete(self.disconnect()) + else: + # Loop is running, can't wait for task - just mark disconnected + # The disconnect task would be cancelled when we exit anyway + self.connected = False + print("โš ๏ธ Loop is running, marking disconnected without waiting") + except Exception as e: + print(f"โš ๏ธ Error during signal handler cleanup: {e}") + + # Exit gracefully + sys.exit(0) + + def _cleanup_sync(self): + """ + Cleanup at exit (atexit handler). + + This is a fallback in case signal handlers don't fire. + Called when Python interpreter shuts down normally. + """ + if self.loop and not self.loop.is_closed() and self.connected and not self._shutdown_in_progress: + try: + # Try to run disconnect synchronously + self.loop.run_until_complete(self.disconnect()) + except Exception: + # Ignore errors during exit cleanup + pass + + def is_connected(self) -> bool: + """Check if currently connected to server.""" + return self.connected and self.websocket is not None + + def get_stats(self) -> Dict[str, Any]: + """Get connection statistics for debugging.""" + return { + "connected": self.connected, + "server_url": self.server_url, + "shutdown_in_progress": self._shutdown_in_progress, + "has_websocket": self.websocket is not None, + "has_loop": self.loop is not None + } + + +# Global singleton instance +# Import this in other modules: from websocket_connection_pool import ws_pool +ws_pool = WebSocketConnectionPool() + + +# Convenience functions for direct usage +async def connect(server_url: str = "ws://localhost:8000/ws") -> bool: + """Connect to logging server (convenience function).""" + return await ws_pool.connect(server_url) + + +async def send_event(event_type: str, session_id: str, data: Dict[str, Any]) -> bool: + """Send event to logging server (convenience function).""" + return await ws_pool.send_event(event_type, session_id, data) + + +async def disconnect(): + """Disconnect from logging server (convenience function).""" + await ws_pool.disconnect() + + +def is_connected() -> bool: + """Check if connected to logging server (convenience function).""" + return ws_pool.is_connected() + + +# ============================================================================ +# SYNCHRONOUS API FOR AGENT LAYER +# ============================================================================ +# These functions provide a clean abstraction that hides event loop management +# from the agent layer. Agent code should ONLY use these functions. + +def connect_sync(server_url: str = "ws://localhost:8000/ws") -> bool: + """ + Synchronous connect - handles event loop internally. + + Agent code should use this instead of directly managing event loops. + This ensures the connection pool maintains full control over its lifecycle. + """ + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + return loop.run_until_complete(ws_pool.connect(server_url)) + + +def send_event_sync(event_type: str, session_id: str, data: Dict[str, Any]) -> bool: + """ + Synchronous send event - handles event loop internally. + + Agent code should use this instead of managing event loops. + This ensures the connection pool maintains full control over its lifecycle. + """ + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + return loop.run_until_complete(ws_pool.send_event(event_type, session_id, data)) + diff --git a/api_endpoint/websocket_logger.py b/api_endpoint/websocket_logger.py new file mode 100644 index 0000000000..b0e1e07417 --- /dev/null +++ b/api_endpoint/websocket_logger.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +WebSocket Logger Client + +Simple client for sending agent events to the logging server via WebSocket. +Used by the agent to log events in real-time during execution. +""" + +import json +import asyncio +from typing import Dict, Any, Optional +from datetime import datetime +import websockets + + +class WebSocketLogger: + """ + Client for logging agent events via WebSocket. + + Usage: + logger = WebSocketLogger("unique-session-id") + await logger.connect() + await logger.log_query("What is Python?", model="gpt-4") + await logger.log_api_call(call_number=1) + await logger.log_response(call_number=1, content="Python is...") + await logger.disconnect() + """ + + def __init__( + self, + session_id: str, + server_url: str = "ws://localhost:8000/ws", + enabled: bool = True + ): + """ + Initialize WebSocket logger. + + Args: + session_id: Unique identifier for this agent session + server_url: WebSocket server URL (default: ws://localhost:8000/ws) + enabled: Whether logging is enabled (default: True) + """ + self.session_id = session_id + self.server_url = server_url + self.enabled = enabled + self.websocket: Optional[websockets.WebSocketClientProtocol] = None + self.connected = False + self.reconnect_count = 0 # Track reconnections for debugging + + async def connect(self): + """ + Connect to the WebSocket logging server. + + Establishes WebSocket connection and sends initial session_start event. + If connection fails, gracefully disables logging (agent continues normally). + """ + if not self.enabled: + return + + try: + # Establish WebSocket connection to the server + # Use VERY LONG ping intervals to avoid timeout during long tool execution + # The event loop is blocked during tool execution, so we can't process pings + # Setting to very large values (1 hour) effectively disables it + self.websocket = await websockets.connect( + self.server_url, + ping_interval=3600, # 1 hour - effectively disabled (event loop blocked anyway) + ping_timeout=3600, # 1 hour timeout for pong response + close_timeout=10, # Timeout for close handshake + max_size=10 * 1024 * 1024, # 10MB max message size (for large raw_results) + open_timeout=10 # Timeout for initial connection + ) + self.connected = True + print(f"โœ… Connected to logging server (ping/pong: 3600s intervals): {self.server_url}") + + # Send initial session_start event + # This tells the server to create a new SessionLogger for this session + await self._send_event("session_start", { + "session_id": self.session_id, + "start_time": datetime.now().isoformat() + }) + + except Exception as e: + # Connection failed - disable logging but don't crash the agent + print(f"โš ๏ธ Failed to connect to logging server: {e}") + print(f" Logging will be disabled for this session.") + self.enabled = False + self.connected = False + + async def disconnect(self): + """Disconnect from the WebSocket server.""" + if self.websocket and self.connected: + try: + await self.websocket.close() + self.connected = False + print(f"โœ… Disconnected from logging server") + except Exception as e: + print(f"โš ๏ธ Error disconnecting: {e}") + + async def _send_event(self, event_type: str, data: Dict[str, Any]): + """ + Send an event to the logging server. + + This is the core method that sends all events via WebSocket. + Creates a standardized message format and handles acknowledgments. + + Args: + event_type: Type of event (query, api_call, response, tool_call, tool_result, error, complete) + data: Event data dictionary containing event-specific information + """ + # Safety check: Don't send if logging is disabled + if not self.enabled: + return + + # Auto-reconnect if connection was lost + if not self.connected or not self.websocket: + try: + self.reconnect_count += 1 + print(f"๐Ÿ”„ Reconnecting to logging server (attempt #{self.reconnect_count})...") + await self.connect() + print(f"โœ… Reconnected successfully!") + except Exception as e: + print(f"โš ๏ธ Failed to reconnect: {e}") + self.enabled = False # Disable logging after failed reconnect + return + + try: + # Create standardized message structure + # All events follow this format for consistent server-side handling + message = { + "session_id": self.session_id, # Links event to specific agent session + "event_type": event_type, # Identifies what kind of event this is + "data": data # Event-specific payload + } + + # Send message as JSON string over WebSocket + await self.websocket.send(json.dumps(message)) + + # Wait for server acknowledgment (with 1 second timeout) + # This ensures the server received and processed the event + try: + response = await asyncio.wait_for( + self.websocket.recv(), + timeout=1.0 + ) + # Server sends back: {"status": "logged", "session_id": "...", "event_type": "..."} + # We don't need to process it, just confirms receipt + except asyncio.TimeoutError: + # No response within 1 second - that's okay, continue anyway + # Server might be busy or network slow, but event was likely sent + pass + + except Exception as e: + # Log error but don't crash - graceful degradation + # Agent should continue working even if logging fails + error_str = str(e) + + # Check if connection was closed (error 1011 = keepalive ping timeout) + if "1011" in error_str or "closed" in error_str.lower(): + print(f"โš ๏ธ WebSocket connection closed: {error_str}") + self.connected = False # Mark as disconnected + # Don't try to send more events - connection is dead + else: + print(f"โš ๏ธ Error sending event to logging server: {e}") + # Don't disable entirely or try to reconnect - just continue with logging disabled + + # Convenience methods for specific event types + + async def log_query( + self, + query: str, + model: str = None, + toolsets: list = None + ): + """ + Log a user query (the question/task given to the agent). + + This is typically the first event in a session after connection. + Captures what the user asked and which model/tools will be used. + """ + await self._send_event("query", { + "query": query, # The user's question/instruction + "model": model, # Which AI model is being used + "toolsets": toolsets # Which tool categories are enabled + }) + + async def log_api_call( + self, + call_number: int, + message_count: int = None, + has_tools: bool = None + ): + """ + Log an API call to the AI model. + + Called right before sending a request to the model (OpenAI/Anthropic/etc). + Helps track how many API calls are being made and conversation length. + """ + await self._send_event("api_call", { + "call_number": call_number, # Sequential number (1, 2, 3...) + "message_count": message_count, # How many messages in conversation so far + "has_tools": has_tools # Whether tools are available to the model + }) + + async def log_response( + self, + call_number: int, + content: str = None, + has_tool_calls: bool = False, + tool_call_count: int = 0, + duration: float = None + ): + """ + Log an assistant response from the AI model. + + Called after receiving a response from the API. + Captures what the model said and whether it wants to use tools. + """ + await self._send_event("response", { + "call_number": call_number, # Which API call this response is from + "content": content, # What the model said (text response) + "has_tool_calls": has_tool_calls, # Did model request tool execution? + "tool_call_count": tool_call_count, # How many tools does it want to call? + "duration": duration # How long the API call took (seconds) + }) + + async def log_tool_call( + self, + call_number: int, + tool_index: int, + tool_name: str, + parameters: Dict[str, Any], + tool_call_id: str = None + ): + """ + Log a tool call (before executing the tool). + + Captures which tool is being called and with what parameters. + This happens BEFORE the tool runs, so no results yet. + """ + await self._send_event("tool_call", { + "call_number": call_number, # Which API call requested this tool + "tool_index": tool_index, # Which tool in the sequence (if multiple) + "tool_name": tool_name, # Name of tool (e.g., "web_search", "web_extract") + "parameters": parameters, # Arguments passed to the tool (e.g., {"query": "Python", "limit": 5}) + "tool_call_id": tool_call_id # Unique ID to link call with result + }) + + async def log_tool_result( + self, + call_number: int, + tool_index: int, + tool_name: str, + result: str = None, + error: str = None, + duration: float = None, + tool_call_id: str = None, + raw_result: str = None # NEW: Full untruncated result for verification + ): + """ + Log a tool result (output from tool execution). + + Captures both a truncated preview (for UI display) and the full raw result + (for verification and debugging). This is especially important for web tools + where you want to see what was scraped vs what the LLM processed. + + Args: + call_number: Which API call this tool was part of + tool_index: Which tool in the sequence (1st, 2nd, etc.) + tool_name: Name of the tool that was executed + result: Tool output (will be truncated to 1000 chars for preview) + error: Error message if tool failed + duration: How long the tool took to execute (seconds) + tool_call_id: Unique ID linking this result to the tool call + raw_result: NEW - Full untruncated result for verification/debugging + """ + await self._send_event("tool_result", { + "call_number": call_number, + "tool_index": tool_index, + "tool_name": tool_name, + "result": result[:1000] if result else None, # Truncated preview (1000 chars max) + "raw_result": raw_result, # NEW: Full result - can be 100KB+ for web scraping + "error": error, + "duration": duration, + "tool_call_id": tool_call_id + }) + + async def log_error( + self, + error_message: str, + call_number: int = None + ): + """ + Log an error that occurred during agent execution. + + Captures exceptions, API failures, or other issues. + """ + await self._send_event("error", { + "error_message": error_message, # Description of what went wrong + "call_number": call_number # Which API call caused the error (if applicable) + }) + + async def log_complete( + self, + final_response: str = None, + total_calls: int = None, + completed: bool = True + ): + """ + Log session completion (final event before disconnecting). + + Marks the end of the agent's execution and provides summary info. + """ + await self._send_event("complete", { + "final_response": final_response[:500] if final_response else None, # Truncated summary of final answer + "total_calls": total_calls, # How many API calls were made total + "completed": completed # Did it complete successfully? (true/false) + }) + + +# Synchronous wrapper for convenience +class SyncWebSocketLogger: + """ + Synchronous wrapper around WebSocketLogger. + + For use in synchronous code - creates an event loop internally. + """ + + def __init__(self, session_id: str, server_url: str = "ws://localhost:8000/ws", enabled: bool = True): + self.logger = WebSocketLogger(session_id, server_url, enabled) + self.loop = None + + def connect(self): + """Connect to server (synchronous).""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop.run_until_complete(self.logger.connect()) + + def disconnect(self): + """Disconnect from server (synchronous).""" + if self.loop: + self.loop.run_until_complete(self.logger.disconnect()) + self.loop.close() + + def _run_async(self, coro): + """ + Run an async coroutine synchronously. + + Bridge between sync code (agent) and async code (WebSocket). + Uses event loop to execute async operations in sync context. + """ + if self.loop and self.loop.is_running(): + # Already in event loop, just await + asyncio.create_task(coro) + else: + # Run in current loop + if self.loop: + self.loop.run_until_complete(coro) + + def log_query(self, query: str, model: str = None, toolsets: list = None): + self._run_async(self.logger.log_query(query, model, toolsets)) + + def log_api_call(self, call_number: int, message_count: int = None, has_tools: bool = None): + self._run_async(self.logger.log_api_call(call_number, message_count, has_tools)) + + def log_response(self, call_number: int, content: str = None, has_tool_calls: bool = False, + tool_call_count: int = 0, duration: float = None): + self._run_async(self.logger.log_response(call_number, content, has_tool_calls, + tool_call_count, duration)) + + def log_tool_call(self, call_number: int, tool_index: int, tool_name: str, + parameters: Dict[str, Any], tool_call_id: str = None): + self._run_async(self.logger.log_tool_call(call_number, tool_index, tool_name, + parameters, tool_call_id)) + + def log_tool_result(self, call_number: int, tool_index: int, tool_name: str, + result: str = None, error: str = None, duration: float = None, + tool_call_id: str = None, raw_result: str = None): + self._run_async(self.logger.log_tool_result(call_number, tool_index, tool_name, + result, error, duration, tool_call_id, raw_result)) + + def log_error(self, error_message: str, call_number: int = None): + self._run_async(self.logger.log_error(error_message, call_number)) + + def log_complete(self, final_response: str = None, total_calls: int = None, completed: bool = True): + self._run_async(self.logger.log_complete(final_response, total_calls, completed)) + diff --git a/mock_web_tools.py b/mock_web_tools.py new file mode 100644 index 0000000000..712382c768 --- /dev/null +++ b/mock_web_tools.py @@ -0,0 +1,244 @@ +""" +Mock Web Tools for Testing WebSocket Reconnection + +This module provides mock implementations of web_search and web_extract +that simulate long-running operations without making real API calls. + +Perfect for testing WebSocket timeout/reconnection behavior without: +- Wasting API credits +- Waiting for real web crawling +- Network dependencies +""" + +import time +import json +from typing import List + + +def mock_web_search(query: str, delay: int = 2) -> str: + """ + Mock web search that returns fake results after a delay. + + Args: + query: Search query (ignored, just for API compatibility) + delay: Seconds to sleep (default: 2s) + + Returns: + JSON string with fake search results + """ + print(f"๐Ÿ” [MOCK] Searching for: '{query}' (will take {delay}s)...") + time.sleep(delay) + + result = { + "success": True, + "data": { + "web": [ + { + "url": "https://example.com/article1", + "title": "Mock Article 1 - Water Utilities", + "description": "This is a mock search result for testing purposes. Real data would appear here.", + "category": None + }, + { + "url": "https://example.com/article2", + "title": "Mock Article 2 - AI Data Centers", + "description": "Another mock result. This simulates web_search without making real API calls.", + "category": None + }, + { + "url": "https://example.com/article3", + "title": "Mock Article 3 - Investment Opportunities", + "description": "Third mock result for testing. Query was: " + query, + "category": None + } + ] + } + } + + print(f"โœ… [MOCK] Search completed with {len(result['data']['web'])} results") + return json.dumps(result, indent=2) + + +def mock_web_extract(urls: List[str], delay: int = 60) -> str: + """ + Mock web extraction that simulates long-running crawl. + + This is perfect for testing WebSocket timeout/reconnection because: + - Default 60s delay triggers the ~30s WebSocket timeout + - No actual web requests made + - No API credits consumed + - Predictable, reproducible behavior + + Args: + urls: List of URLs to "extract" (ignored) + delay: Seconds to sleep (default: 60s to trigger timeout) + + Returns: + JSON string with fake extraction results + """ + print(f"๐ŸŒ [MOCK] Extracting {len(urls)} URLs (will take {delay}s)...") + print(f"๐Ÿ“Š [MOCK] This will test WebSocket reconnection (timeout at ~30s)") + + # Simulate long-running operation + # Show progress so user knows it's working + for i in range(delay): + if i % 10 == 0 and i > 0: + print(f" โฑ๏ธ [MOCK] {i}/{delay}s elapsed...") + time.sleep(1) + + # Generate fake but realistic-looking content + result = { + "success": True, + "data": [] + } + + for idx, url in enumerate(urls, 1): + result["data"].append({ + "url": url, + "title": f"Mock Extracted Content {idx}", + "content": f"# Mock Content from {url}\n\n" + f"This is simulated extracted content for testing purposes. " + f"In a real scenario, this would contain the full text from the webpage. " + f"\n\n## Key Points\n" + f"- Mock point 1 about water utilities\n" + f"- Mock point 2 about AI data centers\n" + f"- Mock point 3 about investment opportunities\n" + f"\n\nThis content took {delay} seconds to 'extract', which is long enough " + f"to trigger WebSocket timeout and test reconnection logic." + * 10, # Make it longer to simulate real extraction + "extracted_at": "2025-10-10T14:00:00Z" + }) + + json_result = json.dumps(result, indent=2) + size_kb = len(json_result) / 1024 + + print(f"โœ… [MOCK] Extraction completed: {len(urls)} URLs, {size_kb:.1f} KB") + return json_result + + +def mock_web_crawl(start_url: str, max_pages: int = 10, delay: int = 30) -> str: + """ + Mock web crawling that simulates multi-page crawl. + + Args: + start_url: Starting URL (ignored) + max_pages: Max pages to crawl (just affects result count) + delay: Seconds to sleep (default: 30s) + + Returns: + JSON string with fake crawl results + """ + print(f"๐Ÿ•ท๏ธ [MOCK] Crawling from: {start_url} (max {max_pages} pages, {delay}s)...") + time.sleep(delay) + + result = { + "success": True, + "data": { + "start_url": start_url, + "pages_crawled": min(max_pages, 5), + "pages": [] + } + } + + for i in range(min(max_pages, 5)): + result["data"]["pages"].append({ + "url": f"{start_url}/page{i+1}", + "title": f"Mock Page {i+1}", + "content": f"Mock content from page {i+1}. " * 50 + }) + + print(f"โœ… [MOCK] Crawl completed: {len(result['data']['pages'])} pages") + return json.dumps(result, indent=2) + + +# Tool definitions for the agent (same format as real tools) +MOCK_WEB_TOOLS = [ + { + "name": "web_search", + "description": "[MOCK] Search the web for information. Returns fake results after 2s delay. Perfect for quick tests.", + "input_schema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The search query" + }, + "delay": { + "type": "integer", + "description": "Seconds to delay (default: 2)", + "default": 2 + } + }, + "required": ["query"] + } + }, + { + "name": "web_extract", + "description": "[MOCK] Extract content from URLs. Simulates 60s delay to test WebSocket timeout/reconnection. Returns fake content without making real requests. PERFECT FOR TESTING!", + "input_schema": { + "type": "object", + "properties": { + "urls": { + "type": "array", + "items": {"type": "string"}, + "description": "List of URLs to extract" + }, + "delay": { + "type": "integer", + "description": "Seconds to delay (default: 60 to trigger timeout)", + "default": 60 + } + }, + "required": ["urls"] + } + }, + { + "name": "web_crawl", + "description": "[MOCK] Crawl website starting from URL. Returns fake results after 30s delay.", + "input_schema": { + "type": "object", + "properties": { + "start_url": { + "type": "string", + "description": "Starting URL for crawl" + }, + "max_pages": { + "type": "integer", + "description": "Max pages to crawl (default: 10)", + "default": 10 + }, + "delay": { + "type": "integer", + "description": "Seconds to delay (default: 30)", + "default": 30 + } + }, + "required": ["start_url"] + } + } +] + + +# Map function names to implementations +MOCK_TOOL_FUNCTIONS = { + "web_search": mock_web_search, + "web_extract": mock_web_extract, + "web_crawl": mock_web_crawl +} + + +if __name__ == "__main__": + # Demo/test the mock tools + print("Testing Mock Web Tools") + print("=" * 60) + + print("\n1. Mock web_search (2s delay):") + result = mock_web_search("test query", delay=2) + print(f"Result length: {len(result)} chars\n") + + print("\n2. Mock web_extract (5s delay for demo - normally 60s):") + result = mock_web_extract(["https://example.com"], delay=5) + print(f"Result length: {len(result)} chars\n") + + print("\nโœ… All mock tools working!") + diff --git a/requirements.txt b/requirements.txt index 1a12b5845e..71131779dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,14 @@ firecrawl-py openai -fal-client \ No newline at end of file +fal-client +python-dotenv +fire +httpx +yt-dlp +streamlit +fastapi +uvicorn +websockets +PySide6>=6.6.0 +websocket-client>=1.7.0 +requests>=2.31.0 diff --git a/run_agent.py b/run_agent.py index 5fabfe078a..a7a2f1e959 100644 --- a/run_agent.py +++ b/run_agent.py @@ -24,6 +24,8 @@ import json import logging import os import time +import uuid +import asyncio from typing import List, Dict, Any, Optional from openai import OpenAI import fire @@ -31,6 +33,19 @@ from datetime import datetime # Import our tool system from model_tools import get_tool_definitions, handle_function_call, check_toolset_requirements +from mock_web_tools import MOCK_TOOL_FUNCTIONS, MOCK_WEB_TOOLS + +# Import WebSocket connection pool (optional dependency) +# Use synchronous API to avoid event loop management in agent layer +try: + from api_endpoint.websocket_connection_pool import connect_sync, send_event_sync, is_connected + WEBSOCKET_LOGGER_AVAILABLE = True +except ImportError: + WEBSOCKET_LOGGER_AVAILABLE = False + connect_sync = None + send_event_sync = None + is_connected = None + print("โš ๏ธ WebSocket logger not available (missing websockets package)") class AIAgent: @@ -51,7 +66,11 @@ class AIAgent: enabled_toolsets: List[str] = None, disabled_toolsets: List[str] = None, save_trajectories: bool = False, - verbose_logging: bool = False + verbose_logging: bool = False, + enable_websocket_logging: bool = False, + websocket_server: str = "ws://localhost:8000/ws", + mock_web_tools: bool = False, + mock_delay: int = 60 ): """ Initialize the AI Agent. @@ -66,12 +85,21 @@ class AIAgent: disabled_toolsets (List[str]): Disable tools from these toolsets (optional) save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False) verbose_logging (bool): Enable verbose logging for debugging (default: False) + enable_websocket_logging (bool): Enable real-time WebSocket logging (default: False) + websocket_server (str): WebSocket server URL (default: ws://localhost:8000/ws) + mock_web_tools (bool): Use mock web tools for testing (no API calls, configurable delays) (default: False) + mock_delay (int): Delay in seconds for mock web_extract to test timeout (default: 60) """ self.model = model self.max_iterations = max_iterations self.tool_delay = tool_delay self.save_trajectories = save_trajectories self.verbose_logging = verbose_logging + self.enable_websocket_logging = enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE + self.websocket_server = websocket_server + self.mock_web_tools = mock_web_tools + self.mock_delay = mock_delay + # Note: We use global ws_pool instead of per-instance connection # Store toolset filtering options self.enabled_toolsets = enabled_toolsets @@ -145,6 +173,11 @@ class AIAgent: # Show trajectory saving status if self.save_trajectories: print("๐Ÿ“ Trajectory saving enabled") + + # Show mock tools status + if self.mock_web_tools: + print(f"๐Ÿงช MOCK MODE ENABLED - Web tools will use fake data (delay: {self.mock_delay}s)") + print(f" Perfect for testing WebSocket reconnection without API costs!") def _format_tools_for_system_message(self) -> str: """ @@ -320,11 +353,38 @@ class AIAgent: except Exception as e: print(f"โš ๏ธ Failed to save trajectory: {e}") + def _init_websocket_connection(self, session_id: str): + """ + Initialize WebSocket connection pool if enabled. + + Connects to logging server using persistent connection pool. + Connection is shared across all agent runs - no per-run overhead! + + Uses synchronous API - no event loop management needed in agent layer. + """ + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and connect_sync: + try: + # Connect to server (idempotent - safe if already connected) + # API layer handles all event loop management internally + connect_sync(self.websocket_server) + + # Send session_start event for this specific session + send_event_sync("session_start", session_id, { + "session_id": session_id, + "start_time": datetime.now().isoformat() + }) + + print(f"๐Ÿ“ก WebSocket logging enabled (session: {session_id[:8]}...)") + except Exception as e: + print(f"โš ๏ธ Failed to initialize WebSocket connection: {e}") + self.enable_websocket_logging = False + def run_conversation( self, user_message: str, system_message: str = None, - conversation_history: List[Dict[str, Any]] = None + conversation_history: List[Dict[str, Any]] = None, + session_id: str = None ) -> Dict[str, Any]: """ Run a complete conversation with tool calling until completion. @@ -333,10 +393,37 @@ class AIAgent: user_message (str): The user's message/question system_message (str): Custom system message (optional) conversation_history (List[Dict]): Previous conversation messages (optional) + session_id (str): Optional session ID (generated if not provided) Returns: Dict: Complete conversation result with final response and message history """ + # ============================================================ + # WEBSOCKET LOGGING: Session Initialization + # ============================================================ + # Generate unique session ID for this agent execution (or use provided one) + # This ID will be used to link all events together in the log file + if session_id is None: + session_id = str(uuid.uuid4()) + + # Initialize WebSocket logger if enabled (via --enable_websocket_logging flag) + # Uses synchronous API - no event loop management in agent layer + if self.enable_websocket_logging: + try: + # Connect to logging server and log initial query + # All event loop management is handled inside the API layer + self._init_websocket_connection(session_id) + send_event_sync("query", session_id, { + "query": user_message, + "model": self.model, + "toolsets": self.enabled_toolsets + }) + except Exception as e: + print(f"โš ๏ธ WebSocket logging initialization failed: {e}") + import traceback + if self.verbose_logging: + traceback.print_exc() + # Initialize conversation messages = conversation_history or [] @@ -356,6 +443,22 @@ class AIAgent: api_call_count += 1 print(f"\n๐Ÿ”„ Making API call #{api_call_count}...") + # ============================================================ + # WEBSOCKET LOGGING: API Call Start + # ============================================================ + # Log that we're about to make an API call to the model + # Captures: which call number, how many messages, whether tools available + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and send_event_sync: + try: + send_event_sync("api_call", session_id, { + "call_number": api_call_count, + "message_count": len(messages), + "has_tools": bool(self.tools) + }) + except Exception as e: + if self.verbose_logging: + print(f"โš ๏ธ WebSocket logging error: {e}") + # Log request details if verbose if self.verbose_logging: logging.debug(f"API Request - Model: {self.model}, Messages: {len(messages)}, Tools: {len(self.tools) if self.tools else 0}") @@ -374,10 +477,31 @@ class AIAgent: tools=self.tools if self.tools else None, timeout=60.0 # Add explicit timeout ) + + print(f"๐Ÿ”ง Response: {response}") api_duration = time.time() - api_start_time print(f"โฑ๏ธ API call completed in {api_duration:.2f}s") + # ============================================================ + # WEBSOCKET LOGGING: API Response + # ============================================================ + # Log the response we got back from the AI model + # Captures: what the model said, whether it wants tools, how long it took + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and send_event_sync: + try: + assistant_msg = response.choices[0].message + send_event_sync("response", session_id, { + "call_number": api_call_count, + "content": assistant_msg.content if hasattr(assistant_msg, 'content') else None, + "has_tool_calls": hasattr(assistant_msg, 'tool_calls') and bool(assistant_msg.tool_calls), + "tool_call_count": len(assistant_msg.tool_calls) if hasattr(assistant_msg, 'tool_calls') and assistant_msg.tool_calls else 0, + "duration": api_duration + }) + except Exception as e: + if self.verbose_logging: + print(f"โš ๏ธ WebSocket logging error: {e}") + if self.verbose_logging: logging.debug(f"API Response received - Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}") @@ -399,10 +523,12 @@ class AIAgent: # Handle assistant response if assistant_message.content: - print(f"๐Ÿค– Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}") + print(f"๐Ÿค– Assistant: {assistant_message.content}") # Check for tool calls if assistant_message.tool_calls: + + print(f"๐Ÿ”ง Tool calls: {assistant_message.tool_calls}") print(f"๐Ÿ”ง Processing {len(assistant_message.tool_calls)} tool call(s)...") if self.verbose_logging: @@ -438,10 +564,37 @@ class AIAgent: print(f" ๐Ÿ“ž Tool {i}: {function_name}({list(function_args.keys())})") + # ============================================================ + # WEBSOCKET LOGGING: Tool Call (Before Execution) + # ============================================================ + # Log which tool we're about to execute and with what parameters + # This happens BEFORE tool runs, so we know what was requested + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and send_event_sync: + try: + send_event_sync("tool_call", session_id, { + "call_number": api_call_count, + "tool_index": i, + "tool_name": function_name, + "parameters": function_args, # E.g., {"query": "Python", "limit": 5} + "tool_call_id": tool_call.id + }) + except Exception as e: + if self.verbose_logging: + print(f"โš ๏ธ WebSocket logging error: {e}") + tool_start_time = time.time() - # Execute the tool - function_result = handle_function_call(function_name, function_args) + # Execute the tool (mock or real based on configuration) + if self.mock_web_tools and function_name in MOCK_TOOL_FUNCTIONS: + # Use mock implementation (no API calls, configurable delay) + mock_function = MOCK_TOOL_FUNCTIONS[function_name] + # Inject mock_delay for web_extract if not provided + if function_name == "web_extract" and "delay" not in function_args: + function_args["delay"] = self.mock_delay + function_result = mock_function(**function_args) + else: + # Use real tool implementation + function_result = handle_function_call(function_name, function_args) tool_duration = time.time() - tool_start_time result_preview = function_result[:200] if len(function_result) > 200 else function_result @@ -459,6 +612,36 @@ class AIAgent: print(f" โœ… Tool {i} completed in {tool_duration:.2f}s") + # ============================================================ + # WEBSOCKET LOGGING: Tool Result (After Execution) + # ============================================================ + # Log the result we got back from the tool + # IMPORTANT: Logs BOTH truncated preview AND full raw result + # + # Why both? + # - result: Truncated to 1000 chars for quick preview in UI + # - raw_result: FULL untruncated output for verification + # + # This is crucial for web tools where you want to see: + # - What the scraper actually returned (raw_result) + # - What the LLM processed it into (compare against raw) + # - Verify the LLM isn't losing important information + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and send_event_sync: + try: + send_event_sync("tool_result", session_id, { + "call_number": api_call_count, + "tool_index": i, + "tool_name": function_name, + "result": function_result[:1000] if function_result else None, # Truncated preview + "raw_result": function_result, # Full untruncated result (can be 100KB+) + "error": None, + "duration": tool_duration, + "tool_call_id": tool_call.id + }) + except Exception as e: + if self.verbose_logging: + print(f"โš ๏ธ WebSocket logging error: {e}") + # Delay between tool calls if self.tool_delay > 0 and i < len(assistant_message.tool_calls): time.sleep(self.tool_delay) @@ -483,6 +666,21 @@ class AIAgent: error_msg = f"Error during API call #{api_call_count}: {str(e)}" print(f"โŒ {error_msg}") + # ============================================================ + # WEBSOCKET LOGGING: Error Event + # ============================================================ + # Log any errors that occur during API calls or tool execution + # Helps track failures and debug issues + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and send_event_sync: + try: + send_event_sync("error", session_id, { + "error_message": error_msg, + "call_number": api_call_count + }) + except Exception as ws_error: + if self.verbose_logging: + print(f"โš ๏ธ WebSocket logging error: {ws_error}") + if self.verbose_logging: logging.exception("Detailed error information:") @@ -509,14 +707,37 @@ class AIAgent: # Save trajectory if enabled self._save_trajectory(messages, user_message, completed) + # ============================================================ + # WEBSOCKET LOGGING: Session Complete + # ============================================================ + # Log final completion event for this session + # Note: WebSocket connection stays open for future runs (persistent pool) + # Uses synchronous API - no event loop management in agent layer + if self.enable_websocket_logging and WEBSOCKET_LOGGER_AVAILABLE and send_event_sync: + try: + # Log completion with summary information + # API layer handles event loop management internally + send_event_sync("complete", session_id, { + "final_response": final_response, # What the agent finally answered + "total_calls": api_call_count, # How many API calls were made + "completed": completed # Did it finish successfully? + }) + # Connection persists automatically - agent has no control over lifecycle + except Exception as e: + if self.verbose_logging: + print(f"โš ๏ธ WebSocket logging error: {e}") + import traceback + traceback.print_exc() + return { "final_response": final_response, "messages": messages, "api_calls": api_call_count, - "completed": completed + "completed": completed, + "session_id": session_id if self.enable_websocket_logging else None } - def chat(self, message: str) -> str: + def chat(self, message: str) -> str: # After we connect the UI we can put whatever we want here """ Simple chat interface that returns just the final response. @@ -532,7 +753,7 @@ class AIAgent: def main( query: str = None, - model: str = "claude-opus-4-20250514", + model: str = "claude-sonnet-4-5-20250929", api_key: str = None, base_url: str = "https://api.anthropic.com/v1/", max_turns: int = 10, @@ -540,7 +761,11 @@ def main( disabled_toolsets: str = None, list_tools: bool = False, save_trajectories: bool = False, - verbose: bool = False + verbose: bool = False, + enable_websocket_logging: bool = False, + websocket_server: str = "ws://localhost:8000/ws", + mock_web_tools: bool = False, + mock_delay: int = 60 ): """ Main function for running the agent directly. @@ -558,9 +783,24 @@ def main( list_tools (bool): Just list available tools and exit save_trajectories (bool): Save conversation trajectories to JSONL files. Defaults to False. verbose (bool): Enable verbose logging for debugging. Defaults to False. + enable_websocket_logging (bool): Enable real-time WebSocket logging. Defaults to False. + websocket_server (str): WebSocket server URL. Defaults to ws://localhost:8000/ws. + mock_web_tools (bool): Use mock web tools for testing (no API calls, configurable delays). Defaults to False. + mock_delay (int): Delay in seconds for mock web_extract (default: 60s to test timeout). Defaults to 60. Toolset Examples: - "research": Web search, extract, crawl + vision tools + + Mock Tools (Testing): + Use --mock_web_tools to test WebSocket reconnection without API calls: + - web_search: Returns fake results after 2s + - web_extract: Returns fake content after 60s (tests timeout) + - web_crawl: Returns fake pages after 30s + + WebSocket Logging: + 1. Start logging server: python logging_server.py + 2. Run agent with --enable_websocket_logging flag + 3. View logs in realtime at http://localhost:8000 """ print("๐Ÿค– AI Agent with Tool Calling") print("=" * 50) @@ -665,6 +905,11 @@ def main( print(f" - Successful conversations โ†’ trajectory_samples.jsonl") print(f" - Failed conversations โ†’ failed_trajectories.jsonl") + if enable_websocket_logging: + print(f"๐Ÿ“ก WebSocket logging: ENABLED") + print(f" - Server: {websocket_server}") + print(f" - Make sure logging server is running: python logging_server.py") + # Initialize agent with provided parameters try: agent = AIAgent( @@ -675,7 +920,11 @@ def main( enabled_toolsets=enabled_toolsets_list, disabled_toolsets=disabled_toolsets_list, save_trajectories=save_trajectories, - verbose_logging=verbose + verbose_logging=verbose, + enable_websocket_logging=enable_websocket_logging, + websocket_server=websocket_server, + mock_web_tools=mock_web_tools, + mock_delay=mock_delay ) except RuntimeError as e: print(f"โŒ Failed to initialize agent: {e}") @@ -689,6 +938,9 @@ def main( ) else: user_query = query + + # There needs to be a multi-turn conversation here + # Hermes Agent needs to be multi-turn to be useful print(f"\n๐Ÿ“ User Query: {user_query}") print("\n" + "=" * 50) @@ -713,3 +965,12 @@ def main( if __name__ == "__main__": fire.Fire(main) + + +# Order of operations: +# First track the ways in which information flows through the agent in realtime +# Create a FastAPI endpoint that is first able to listen for the logging through sockets +# Create the UI through there and now you have you have a pretty UI. CHECKPOINT 1 +# Now that you have better visualization write out the chat interface and allow it to be controlled through the UI as well as the main program +# Now decide how the information flows through the agent you may need to do some trial and error to get this part right +# Implement multiturn conversation now and then CHECKPOINT 2 is now done with multiturn conversations \ No newline at end of file diff --git a/terminal_tool.py b/terminal_tool.py index e01d7a6175..5d8d002871 100644 --- a/terminal_tool.py +++ b/terminal_tool.py @@ -22,8 +22,8 @@ Usage: import json import os from typing import Optional, Dict, Any -from hecate import run_tool_with_lifecycle_management -from morphcloud._llm import ToolCall +# from hecate import run_tool_with_lifecycle_management +# from morphcloud._llm import ToolCall # Detailed description for the terminal tool based on Hermes Terminal system prompt TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure, persistent Linux VM environment with full interactive application support. @@ -129,27 +129,30 @@ def terminal_tool( tool_input["idle_threshold"] = idle_threshold if timeout is not None: tool_input["timeout"] = timeout + + # THIS IS BROKEN FOR NOW ~!!!!!!! - tool_call = ToolCall( - name="run_command", - input=tool_input - ) + # tool_call = ToolCall( + # name="run_command", + # input=tool_input + # ) - # Execute with lifecycle management - result = run_tool_with_lifecycle_management(tool_call) + # # Execute with lifecycle management + # result = run_tool_with_lifecycle_management(tool_call) + - # Format the result with all possible fields - # Map hecate's "stdout" to "output" for compatibility - formatted_result = { - "output": result.get("stdout", result.get("output", "")), - "screen": result.get("screen", ""), - "session_id": result.get("session_id"), - "exit_code": result.get("returncode", result.get("exit_code", -1)), - "error": result.get("error"), - "status": "active" if result.get("session_id") else "ended" - } + # # Format the result with all possible fields + # # Map hecate's "stdout" to "output" for compatibility + # formatted_result = { + # "output": result.get("stdout", result.get("output", "")), + # "screen": result.get("screen", ""), + # "session_id": result.get("session_id"), + # "exit_code": result.get("returncode", result.get("exit_code", -1)), + # "error": result.get("error"), + # "status": "active" if result.get("session_id") else "ended" + # } - return json.dumps(formatted_result) + return json.dumps({}) except Exception as e: return json.dumps({ diff --git a/test_mock_mode.sh b/test_mock_mode.sh new file mode 100755 index 0000000000..2357470bbe --- /dev/null +++ b/test_mock_mode.sh @@ -0,0 +1,122 @@ +#!/bin/bash +# +# Test Script for Mock Web Tools & WebSocket Reconnection +# +# This script tests: +# 1. Mock web tools (no API calls, fake data) +# 2. WebSocket timeout/reconnection during long operations +# 3. Complete logging capture +# +# Perfect for development/testing without wasting API credits! + +set -e + +cd "$(dirname "$0")" + +echo "==========================================" +echo "๐Ÿงช Mock Mode Test Script" +echo "==========================================" +echo "" + +# Check if logging server is running +if ! curl -s http://localhost:8000/health > /dev/null 2>&1; then + echo "โš ๏ธ Logging server not detected!" + echo " Starting logging server in background..." + python api_endpoint/logging_server.py & + SERVER_PID=$! + echo " Server PID: $SERVER_PID" + sleep 3 +else + echo "โœ… Logging server already running" + SERVER_PID="" +fi + +echo "" +echo "๐Ÿ“‹ Test Configuration:" +echo " - Mock web tools: ENABLED" +echo " - Mock delay: 60 seconds (triggers WebSocket timeout)" +echo " - WebSocket logging: ENABLED" +echo " - Expected behavior: Connection timeout + auto-reconnect" +echo "" +echo "๐Ÿ”„ Running agent with mock mode..." +echo " (This will take ~60 seconds to test reconnection)" +echo "" + +# Run agent with mock mode +python run_agent.py \ + --enabled_toolsets web \ + --enable_websocket_logging \ + --mock_web_tools \ + --mock_delay 60 \ + --query "Find publicly traded water companies benefiting from AI data centers" + +echo "" +echo "==========================================" +echo "โœ… Test Complete!" +echo "==========================================" +echo "" + +# Find most recent log file +LATEST_LOG=$(ls -t api_endpoint/logs/realtime/session_*.json 2>/dev/null | head -1) + +if [ -n "$LATEST_LOG" ]; then + echo "๐Ÿ“Š Log Analysis:" + echo " File: $LATEST_LOG" + echo "" + + # Count events + echo " Event Counts:" + python3 -c " +import json +import sys + +with open('$LATEST_LOG') as f: + data = json.load(f) + events = data.get('events', []) + + # Count by type + counts = {} + for e in events: + etype = e.get('type', 'unknown') + counts[etype] = counts.get(etype, 0) + 1 + + for etype, count in sorted(counts.items()): + print(f' - {etype}: {count}') + + # Check completeness + has_complete = any(e.get('type') == 'complete' for e in events) + print() + if has_complete: + print(' โœ… Session completed successfully!') + else: + print(' โš ๏ธ Session incomplete (may have been interrupted)') + + # Check for reconnections + tool_results = [e for e in events if e.get('type') == 'tool_result'] + tool_calls = [e for e in events if e.get('type') == 'tool_call'] + + if len(tool_results) == len(tool_calls): + print(' โœ… All tool calls have results (no missing events)') + else: + print(f' โš ๏ธ Tool calls: {len(tool_calls)}, Results: {len(tool_results)}') +" +else + echo "โš ๏ธ No log files found" +fi + +# Cleanup +if [ -n "$SERVER_PID" ]; then + echo "" + echo "๐Ÿ›‘ Stopping logging server (PID: $SERVER_PID)..." + kill $SERVER_PID 2>/dev/null || true +fi + +echo "" +echo "๐Ÿ’ก Key Observations to Look For:" +echo " 1. '[MOCK]' prefix on tool execution messages" +echo " 2. '๐Ÿ”„ Reconnecting to logging server' after long tool" +echo " 3. 'โœ… Reconnected successfully!' confirmation" +echo " 4. Complete log file with all events captured" +echo "" +echo "๐ŸŽ‰ Mock mode test completed!" + diff --git a/test_parallel_execution.py b/test_parallel_execution.py new file mode 100644 index 0000000000..93751cac09 --- /dev/null +++ b/test_parallel_execution.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +""" +Test Parallel Execution with Persistent WebSocket Connection Pool + +This script demonstrates that multiple agent runs can execute in parallel, +all sharing a single WebSocket connection for logging. + +Benefits: +- No connection overhead (single persistent connection) +- No timeout issues (connection stays alive) +- True parallel execution (multiple sessions simultaneously) +""" + +import asyncio +from run_agent import AIAgent +import time + + +async def run_agent_query(query: str, agent_name: str, mock_delay: int = 10): + """ + Run a single agent query with logging. + + Args: + query: Query to send to agent + agent_name: Name for logging purposes + mock_delay: Delay for mock tools (seconds) + """ + print(f"๐Ÿš€ [{agent_name}] Starting query: '{query[:40]}...'") + start_time = time.time() + + try: + agent = AIAgent( + model="claude-sonnet-4-5-20250929", + max_iterations=5, + enabled_toolsets=["web"], + enable_websocket_logging=True, + websocket_server="ws://localhost:8000/ws", + mock_web_tools=True, # Use mock tools for fast testing + mock_delay=mock_delay + ) + + result = await agent.run_conversation(query) + + duration = time.time() - start_time + print(f"โœ… [{agent_name}] Completed in {duration:.1f}s - {result['api_calls']} API calls") + + return { + "agent": agent_name, + "query": query, + "success": True, + "duration": duration, + "api_calls": result['api_calls'], + "session_id": result.get('session_id') + } + + except Exception as e: + duration = time.time() - start_time + print(f"โŒ [{agent_name}] Failed in {duration:.1f}s: {e}") + return { + "agent": agent_name, + "query": query, + "success": False, + "error": str(e), + "duration": duration + } + + +async def test_sequential(): + """ + Test 1: Sequential execution (baseline). + + Runs 3 queries one after another. This shows how long it takes + without parallelization. + """ + print("\n" + "="*60) + print("TEST 1: Sequential Execution (Baseline)") + print("="*60) + + start_time = time.time() + + results = [] + for i in range(3): + result = await run_agent_query( + query=f"Find information about water companies #{i+1}", + agent_name=f"Agent{i+1}", + mock_delay=5 # Short delay for quick test + ) + results.append(result) + + total_time = time.time() - start_time + + print(f"\n๐Ÿ“Š Sequential Results:") + print(f" Total time: {total_time:.1f}s") + print(f" Successful: {sum(1 for r in results if r['success'])}/3") + print(f" Average per query: {total_time/3:.1f}s") + + return results + + +async def test_parallel(): + """ + Test 2: Parallel execution. + + Runs 3 queries simultaneously using asyncio.gather(). + All queries share the same WebSocket connection for logging. + """ + print("\n" + "="*60) + print("TEST 2: Parallel Execution (Shared Connection)") + print("="*60) + + start_time = time.time() + + # Run all queries in parallel! + results = await asyncio.gather( + run_agent_query( + query="Find publicly traded water utility companies", + agent_name="Agent1", + mock_delay=5 + ), + run_agent_query( + query="Find energy infrastructure companies", + agent_name="Agent2", + mock_delay=5 + ), + run_agent_query( + query="Find AI data center operators", + agent_name="Agent3", + mock_delay=5 + ) + ) + + total_time = time.time() - start_time + + print(f"\n๐Ÿ“Š Parallel Results:") + print(f" Total time: {total_time:.1f}s") + print(f" Successful: {sum(1 for r in results if r['success'])}/3") + print(f" Speedup: ~{(sum(r['duration'] for r in results) / total_time):.1f}x") + print(f" Sessions logged: {[r.get('session_id', 'N/A')[:8] for r in results]}") + + return results + + +async def test_high_concurrency(): + """ + Test 3: High concurrency (stress test). + + Runs 10 queries simultaneously to test connection pool under load. + """ + print("\n" + "="*60) + print("TEST 3: High Concurrency (10 Parallel Agents)") + print("="*60) + + start_time = time.time() + + tasks = [ + run_agent_query( + query=f"Test query #{i+1}", + agent_name=f"Agent{i+1}", + mock_delay=3 # Very short for stress test + ) + for i in range(10) + ] + + results = await asyncio.gather(*tasks) + + total_time = time.time() - start_time + successful = sum(1 for r in results if r['success']) + + print(f"\n๐Ÿ“Š High Concurrency Results:") + print(f" Total time: {total_time:.1f}s") + print(f" Successful: {successful}/10") + print(f" Failed: {10 - successful}/10") + print(f" Queries per second: {10 / total_time:.2f}") + + return results + + +async def main(): + """Run all tests.""" + print("\n๐Ÿงช WebSocket Connection Pool - Parallel Execution Tests") + print("="*60) + print("\nPREREQUISITE: Make sure logging server is running:") + print(" python api_endpoint/logging_server.py") + print("\nPress Ctrl+C to stop at any time\n") + + await asyncio.sleep(2) # Give user time to read + + try: + # Test 1: Sequential (baseline) + seq_results = await test_sequential() + + # Test 2: Parallel (main test) + par_results = await test_parallel() + + # Test 3: High concurrency + stress_results = await test_high_concurrency() + + # Summary + print("\n" + "="*60) + print("SUMMARY") + print("="*60) + print(f"\nโœ… All tests completed!") + print(f"\nKey Findings:") + print(f" โ€ข Sequential (3 queries): {sum(r['duration'] for r in seq_results):.1f}s total") + print(f" โ€ข Parallel (3 queries): {max(r['duration'] for r in par_results):.1f}s total") + print(f" โ€ข Speedup: ~{sum(r['duration'] for r in seq_results) / max(r['duration'] for r in par_results):.1f}x") + print(f" โ€ข High concurrency (10 queries): โœ… Handled successfully") + print(f"\n๐Ÿ’ก All queries used the same persistent WebSocket connection!") + print(f" No connection overhead, no timeouts, true parallelization.") + + except KeyboardInterrupt: + print("\n\nโš ๏ธ Tests interrupted by user") + except Exception as e: + print(f"\n\nโŒ Tests failed: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + print("\n" + "="*60) + print("SETUP CHECK") + print("="*60) + + # Check if logging server is running + import socket + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('localhost', 8000)) + sock.close() + + if result == 0: + print("โœ… Logging server is running on port 8000") + else: + print("โš ๏ธ Logging server not detected on port 8000") + print(" Start it with: python api_endpoint/logging_server.py") + print("\nContinuing anyway (tests will fail gracefully)...") + except Exception as e: + print(f"โš ๏ธ Could not check server status: {e}") + + # Run tests + asyncio.run(main()) + diff --git a/test_ui_flow.py b/test_ui_flow.py new file mode 100644 index 0000000000..2715c4fa99 --- /dev/null +++ b/test_ui_flow.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +Test script to verify UI flow works correctly. + +This tests: +1. API server is running +2. WebSocket connection works +3. Agent can be started via API +4. Events are broadcast properly +""" + +import requests +import json +import time +import websocket +import threading + +API_URL = "http://localhost:8000" +WS_URL = "ws://localhost:8000/ws" + +def test_api_server(): + """Test if API server is running.""" + print("๐Ÿ” Testing API server...") + try: + response = requests.get(f"{API_URL}/", timeout=5) + if response.status_code == 200: + data = response.json() + print(f"โœ… API server is running: {data.get('service')}") + print(f" Active connections: {data.get('active_connections')}") + return True + else: + print(f"โŒ API server returned: {response.status_code}") + return False + except Exception as e: + print(f"โŒ API server not accessible: {e}") + return False + +def test_tools_endpoint(): + """Test if tools endpoint works.""" + print("\n๐Ÿ” Testing tools endpoint...") + try: + response = requests.get(f"{API_URL}/tools", timeout=5) + if response.status_code == 200: + data = response.json() + toolsets = data.get("toolsets", []) + print(f"โœ… Tools endpoint works - {len(toolsets)} toolsets available") + for ts in toolsets[:3]: + print(f" โ€ข {ts.get('name')} ({ts.get('tool_count')} tools)") + return True + else: + print(f"โŒ Tools endpoint failed: {response.status_code}") + return False + except Exception as e: + print(f"โŒ Tools endpoint error: {e}") + return False + +def test_websocket(): + """Test WebSocket connection.""" + print("\n๐Ÿ” Testing WebSocket connection...") + + connected = threading.Event() + message_received = threading.Event() + messages = [] + + def on_open(ws): + print("โœ… WebSocket connected") + connected.set() + + def on_message(ws, message): + data = json.loads(message) + messages.append(data) + message_received.set() + print(f"๐Ÿ“จ Received: {data.get('event_type', 'unknown')}") + + def on_error(ws, error): + print(f"โŒ WebSocket error: {error}") + + def on_close(ws, close_status_code, close_msg): + print(f"๐Ÿ”Œ WebSocket closed: {close_status_code}") + + ws = websocket.WebSocketApp( + WS_URL, + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close + ) + + # Run WebSocket in background + ws_thread = threading.Thread(target=lambda: ws.run_forever(), daemon=True) + ws_thread.start() + + # Wait for connection + if connected.wait(timeout=5): + print("โœ… WebSocket connection established") + ws.close() + return True + else: + print("โŒ WebSocket connection timeout") + ws.close() + return False + +def test_agent_run(): + """Test running agent via API.""" + print("\n๐Ÿ” Testing agent run via API (mock mode)...") + + # Start listening for events first + events = [] + ws_connected = threading.Event() + session_complete = threading.Event() + + def on_message(ws, message): + data = json.loads(message) + events.append(data) + event_type = data.get("event_type") + print(f" ๐Ÿ“จ Event: {event_type}") + + if event_type == "complete": + session_complete.set() + + def on_open(ws): + ws_connected.set() + + # Connect WebSocket + ws = websocket.WebSocketApp( + WS_URL, + on_open=on_open, + on_message=on_message + ) + + ws_thread = threading.Thread(target=lambda: ws.run_forever(), daemon=True) + ws_thread.start() + + # Wait for WebSocket connection + if not ws_connected.wait(timeout=5): + print("โŒ WebSocket didn't connect") + ws.close() + return False + + print("โœ… WebSocket connected, starting agent...") + + # Submit agent run + payload = { + "query": "Test query for UI flow verification", + "model": "claude-sonnet-4-5-20250929", + "base_url": "https://api.anthropic.com/v1/", + "enabled_toolsets": ["web"], + "max_turns": 5, + "mock_web_tools": True, # Use mock mode to avoid API costs + "mock_delay": 2, # Fast for testing + "verbose": False + } + + try: + response = requests.post(f"{API_URL}/agent/run", json=payload, timeout=10) + + if response.status_code == 200: + result = response.json() + session_id = result.get("session_id") + print(f"โœ… Agent started: {session_id[:8]}...") + + # Wait for completion (or timeout) + print("โณ Waiting for agent to complete (up to 30s)...") + if session_complete.wait(timeout=30): + print(f"โœ… Agent completed! Received {len(events)} events:") + + # Count event types + event_counts = {} + for evt in events: + evt_type = evt.get("event_type", "unknown") + event_counts[evt_type] = event_counts.get(evt_type, 0) + 1 + + for evt_type, count in event_counts.items(): + print(f" โ€ข {evt_type}: {count}") + + # Check we got expected events + expected_events = ["query", "api_call", "response", "complete"] + missing = [e for e in expected_events if e not in event_counts] + + if missing: + print(f"โš ๏ธ Missing expected events: {missing}") + else: + print("โœ… All expected event types received!") + + ws.close() + return True + else: + print(f"โš ๏ธ Timeout waiting for completion. Got {len(events)} events so far.") + ws.close() + return False + + else: + print(f"โŒ Agent start failed: {response.status_code}") + print(f" Response: {response.text}") + ws.close() + return False + + except Exception as e: + print(f"โŒ Agent run error: {e}") + import traceback + traceback.print_exc() + ws.close() + return False + +def main(): + """Run all tests.""" + print("=" * 60) + print("๐Ÿงช Hermes Agent UI Flow Test") + print("=" * 60) + print("\nThis will test the complete flow:") + print(" 1. API server connectivity") + print(" 2. Tools endpoint") + print(" 3. WebSocket connection") + print(" 4. Agent execution via API (mock mode)") + print(" 5. Event streaming to UI") + print("\n" + "=" * 60) + + results = [] + + # Test 1: API server + results.append(("API Server", test_api_server())) + + # Test 2: Tools endpoint + results.append(("Tools Endpoint", test_tools_endpoint())) + + # Test 3: WebSocket + results.append(("WebSocket Connection", test_websocket())) + + # Test 4: Agent run + results.append(("Agent Execution + Events", test_agent_run())) + + # Summary + print("\n" + "=" * 60) + print("๐Ÿ“Š TEST SUMMARY") + print("=" * 60) + + for test_name, passed in results: + status = "โœ… PASS" if passed else "โŒ FAIL" + print(f"{status} - {test_name}") + + all_passed = all(r[1] for r in results) + + print("\n" + "=" * 60) + if all_passed: + print("๐ŸŽ‰ ALL TESTS PASSED!") + print("\nโœ… The UI flow is working correctly!") + print(" You can now use the UI to:") + print(" โ€ข Submit queries") + print(" โ€ข View real-time events") + print(" โ€ข See tool executions") + print(" โ€ข Get final responses") + else: + print("โŒ SOME TESTS FAILED") + print("\nMake sure:") + print(" 1. API server is running: python api_endpoint/logging_server.py") + print(" 2. ANTHROPIC_API_KEY is set in environment") + print(" 3. All dependencies are installed: pip install -r requirements.txt") + print("=" * 60) + + return 0 if all_passed else 1 + +if __name__ == "__main__": + exit(main()) + diff --git a/ui/hermes_ui.py b/ui/hermes_ui.py new file mode 100644 index 0000000000..6285f2a6aa --- /dev/null +++ b/ui/hermes_ui.py @@ -0,0 +1,620 @@ +#!/usr/bin/env python3 +""" +Hermes Agent - PySide6 Frontend + +A modern desktop UI for the Hermes AI Agent with real-time event streaming. + +Features: +- Query input with multi-line support +- Tool/toolset selection +- Model and API configuration +- Real-time event display via WebSocket +- Beautiful, responsive UI with dark theme +- Session history + +Usage: + python hermes_ui.py +""" + +import sys +import json +import signal +import asyncio +import requests +from datetime import datetime +from typing import Dict, Any, List, Optional + +from PySide6.QtWidgets import ( + QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, + QTextEdit, QPushButton, QLabel, QLineEdit, QComboBox, QCheckBox, + QGroupBox, QScrollArea, QSplitter, QListWidget, QListWidgetItem, + QTextBrowser, QTabWidget, QSpinBox, QMessageBox, QProgressBar +) +from PySide6.QtCore import ( + Qt, Signal, Slot, QThread, QObject, QTimer +) +from PySide6.QtGui import ( + QFont, QColor, QPalette, QTextCursor, QTextCharFormat +) + +# WebSocket imports +import websocket +import threading + + +class WebSocketClient(QObject): + """ + WebSocket client for receiving real-time agent events. + + Runs in a separate thread and emits Qt signals when events arrive. + """ + + # Signals for event communication + event_received = Signal(dict) # Emits parsed event data + connected = Signal() + disconnected = Signal() + error = Signal(str) + + def __init__(self, url: str = "ws://localhost:8000/ws"): + super().__init__() + self.url = url + self.ws = None + self.running = False + self.thread = None + + def connect(self): + """Start WebSocket connection in background thread.""" + if self.running: + return + + self.running = True + self.thread = threading.Thread(target=self._run, daemon=True) + self.thread.start() + + def disconnect(self): + """Stop WebSocket connection.""" + self.running = False + if self.ws: + try: + self.ws.close() + except Exception as e: + print(f"Error closing WebSocket: {e}") + + def _run(self): + """WebSocket event loop (runs in background thread).""" + try: + self.ws = websocket.WebSocketApp( + self.url, + on_open=self._on_open, + on_message=self._on_message, + on_error=self._on_error, + on_close=self._on_close + ) + + # Run forever with reconnection + self.ws.run_forever(ping_interval=30, ping_timeout=10) + + except Exception as e: + self.error.emit(f"WebSocket error: {str(e)}") + + def _on_open(self, ws): + """Called when WebSocket connection is established.""" + print("๐Ÿ”Œ WebSocket connected") + self.connected.emit() + + def _on_message(self, ws, message): + """Called when a message is received from the server.""" + try: + data = json.loads(message) + self.event_received.emit(data) + except json.JSONDecodeError as e: + print(f"โŒ Failed to parse WebSocket message: {e}") + + def _on_error(self, ws, error): + """Called when an error occurs.""" + print(f"โŒ WebSocket error: {error}") + self.error.emit(str(error)) + + def _on_close(self, ws, close_status_code, close_msg): + """Called when WebSocket connection is closed.""" + print(f"๐Ÿ”Œ WebSocket disconnected: {close_status_code} - {close_msg}") + self.disconnected.emit() + + +class EventDisplayWidget(QWidget): + """ + Widget for displaying real-time agent events in a formatted view. + + Shows events in chronological order with color coding and formatting. + """ + + def __init__(self): + super().__init__() + self.init_ui() + self.current_session = None + + def init_ui(self): + """Initialize the UI components.""" + layout = QVBoxLayout() + + # Header + header = QLabel("๐Ÿ“ก Real-time Event Stream") + header.setFont(QFont("Arial", 12, QFont.Bold)) + layout.addWidget(header) + + # Event display (rich text browser) + self.event_display = QTextBrowser() + self.event_display.setOpenExternalLinks(False) + self.event_display.setFont(QFont("Monaco", 10)) + layout.addWidget(self.event_display) + + # Clear button + clear_btn = QPushButton("๐Ÿ—‘๏ธ Clear Events") + clear_btn.clicked.connect(self.clear_events) + layout.addWidget(clear_btn) + + self.setLayout(layout) + + def clear_events(self): + """Clear all displayed events.""" + self.event_display.clear() + self.current_session = None + + def add_event(self, event: Dict[str, Any]): + """ + Add an event to the display with formatting. + + Args: + event: Event data from WebSocket + """ + event_type = event.get("event_type", "unknown") + session_id = event.get("session_id", "") + data = event.get("data", {}) + timestamp = event.get("timestamp", datetime.now().isoformat()) + + # Track session changes + if self.current_session != session_id: + self.current_session = session_id + self.event_display.append(f"\n{'='*80}") + self.event_display.append(f"๐Ÿ†• New Session: {session_id[:8]}...") + self.event_display.append(f"{'='*80}\n") + + # Format based on event type + if event_type == "query": + query = data.get("query", "") + self.event_display.append(f"๐Ÿ“ QUERY") + self.event_display.append(f" {query}") + self.event_display.append(f" Model: {data.get('model', 'N/A')}") + self.event_display.append(f" Toolsets: {', '.join(data.get('toolsets', []) or ['all'])}") + + elif event_type == "api_call": + call_num = data.get("call_number", 0) + self.event_display.append(f"\n๐Ÿ”„ API CALL #{call_num}") + self.event_display.append(f" Messages: {data.get('message_count', 0)}") + + elif event_type == "response": + content = data.get("content", "")[:200] + self.event_display.append(f"๐Ÿค– RESPONSE") + if content: + self.event_display.append(f" {content}...") + self.event_display.append(f" Tool calls: {data.get('tool_call_count', 0)}") + self.event_display.append(f" Duration: {data.get('duration', 0):.2f}s") + + elif event_type == "tool_call": + tool_name = data.get("tool_name", "unknown") + params = data.get("parameters", {}) + self.event_display.append(f"๐Ÿ”ง TOOL CALL: {tool_name}") + self.event_display.append(f" Parameters: {json.dumps(params, indent=2)[:100]}...") + + elif event_type == "tool_result": + tool_name = data.get("tool_name", "unknown") + result = data.get("result", "")[:200] + duration = data.get("duration", 0) + error = data.get("error") + + if error: + self.event_display.append(f"โŒ TOOL ERROR: {tool_name}") + self.event_display.append(f" {error}") + else: + self.event_display.append(f"โœ… TOOL RESULT: {tool_name}") + self.event_display.append(f" Duration: {duration:.2f}s") + if result: + self.event_display.append(f" Result preview: {result}...") + + elif event_type == "complete": + final_response = data.get("final_response", "")[:300] + total_calls = data.get("total_calls", 0) + completed = data.get("completed", False) + + status_icon = "๐ŸŽ‰" if completed else "โš ๏ธ" + self.event_display.append(f"\n{status_icon} SESSION COMPLETE") + self.event_display.append(f" Total API calls: {total_calls}") + self.event_display.append(f" Status: {'Success' if completed else 'Failed/Incomplete'}") + if final_response: + self.event_display.append(f" Final response: {final_response}...") + self.event_display.append(f"\n{'='*80}\n") + + elif event_type == "error": + error_msg = data.get("error_message", "Unknown error") + self.event_display.append(f"โŒ ERROR") + self.event_display.append(f" {error_msg}") + + else: + # Unknown event type + self.event_display.append(f"โš ๏ธ {event_type.upper()}") + self.event_display.append(f" {json.dumps(data, indent=2)[:200]}...") + + self.event_display.append("") # Blank line + + # Auto-scroll to bottom + cursor = self.event_display.textCursor() + cursor.movePosition(QTextCursor.End) + self.event_display.setTextCursor(cursor) + + +class HermesMainWindow(QMainWindow): + """ + Main window for Hermes Agent UI. + + Provides interface for: + - Submitting queries + - Configuring agent settings + - Viewing real-time events + - Managing sessions + """ + + def __init__(self): + super().__init__() + self.api_base_url = "http://localhost:8000" + self.ws_client = None + self.current_session_id = None + self.available_toolsets = [] + + self.init_ui() + self.setup_websocket() + self.load_available_tools() + + def init_ui(self): + """Initialize the user interface.""" + self.setWindowTitle("Hermes Agent - AI Assistant UI") + self.setGeometry(100, 100, 1400, 900) + + # Central widget + central_widget = QWidget() + self.setCentralWidget(central_widget) + + # Main layout (horizontal split) + main_layout = QHBoxLayout() + + # Left panel: Controls + left_panel = self.create_control_panel() + + # Right panel: Event display + right_panel = self.create_event_panel() + + # Splitter for resizable panels + splitter = QSplitter(Qt.Horizontal) + splitter.addWidget(left_panel) + splitter.addWidget(right_panel) + splitter.setStretchFactor(0, 1) # Control panel + splitter.setStretchFactor(1, 2) # Event panel (larger) + + main_layout.addWidget(splitter) + central_widget.setLayout(main_layout) + + # Status bar + self.statusBar().showMessage("Ready") + + def create_control_panel(self) -> QWidget: + """Create the left control panel.""" + panel = QWidget() + layout = QVBoxLayout() + + # Title + title = QLabel("๐Ÿค– Hermes Agent Control") + title.setFont(QFont("Arial", 14, QFont.Bold)) + title.setAlignment(Qt.AlignCenter) + layout.addWidget(title) + + # Query input group + query_group = QGroupBox("Query Input") + query_layout = QVBoxLayout() + + self.query_input = QTextEdit() + self.query_input.setPlaceholderText("Enter your query here...") + self.query_input.setMaximumHeight(150) + query_layout.addWidget(self.query_input) + + self.submit_btn = QPushButton("๐Ÿš€ Submit Query") + self.submit_btn.setFont(QFont("Arial", 11, QFont.Bold)) + self.submit_btn.setStyleSheet("QPushButton { background-color: #4CAF50; color: white; padding: 10px; }") + self.submit_btn.clicked.connect(self.submit_query) + query_layout.addWidget(self.submit_btn) + + query_group.setLayout(query_layout) + layout.addWidget(query_group) + + # Model configuration group + model_group = QGroupBox("Model Configuration") + model_layout = QVBoxLayout() + + # Model selection + model_layout.addWidget(QLabel("Model:")) + self.model_combo = QComboBox() + self.model_combo.addItems([ + "claude-sonnet-4-5-20250929", + "claude-opus-4-20250514", + "gpt-4", + "gpt-4-turbo" + ]) + model_layout.addWidget(self.model_combo) + + # API Base URL + model_layout.addWidget(QLabel("API Base URL:")) + self.base_url_input = QLineEdit("https://api.anthropic.com/v1/") + model_layout.addWidget(self.base_url_input) + + # Max turns + model_layout.addWidget(QLabel("Max Turns:")) + self.max_turns_spin = QSpinBox() + self.max_turns_spin.setMinimum(1) + self.max_turns_spin.setMaximum(50) + self.max_turns_spin.setValue(10) + model_layout.addWidget(self.max_turns_spin) + + model_group.setLayout(model_layout) + layout.addWidget(model_group) + + # Tools configuration group + tools_group = QGroupBox("Tools & Toolsets") + tools_layout = QVBoxLayout() + + tools_layout.addWidget(QLabel("Select Toolsets:")) + self.toolsets_list = QListWidget() + self.toolsets_list.setSelectionMode(QListWidget.MultiSelection) + self.toolsets_list.setMaximumHeight(150) + tools_layout.addWidget(self.toolsets_list) + + tools_group.setLayout(tools_layout) + layout.addWidget(tools_group) + + # Options group + options_group = QGroupBox("Options") + options_layout = QVBoxLayout() + + self.mock_mode_checkbox = QCheckBox("Mock Web Tools (Testing)") + options_layout.addWidget(self.mock_mode_checkbox) + + self.verbose_checkbox = QCheckBox("Verbose Logging") + options_layout.addWidget(self.verbose_checkbox) + + options_layout.addWidget(QLabel("Mock Delay (seconds):")) + self.mock_delay_spin = QSpinBox() + self.mock_delay_spin.setMinimum(1) + self.mock_delay_spin.setMaximum(300) + self.mock_delay_spin.setValue(60) + options_layout.addWidget(self.mock_delay_spin) + + options_group.setLayout(options_layout) + layout.addWidget(options_group) + + # Connection status + self.connection_status = QLabel("๐Ÿ”ด Disconnected") + self.connection_status.setAlignment(Qt.AlignCenter) + self.connection_status.setStyleSheet("QLabel { padding: 5px; background-color: #F44336; color: white; border-radius: 3px; }") + layout.addWidget(self.connection_status) + + # Add stretch to push everything to top + layout.addStretch() + + panel.setLayout(layout) + return panel + + def create_event_panel(self) -> QWidget: + """Create the right event display panel.""" + panel = QWidget() + layout = QVBoxLayout() + + # Event display widget + self.event_widget = EventDisplayWidget() + layout.addWidget(self.event_widget) + + panel.setLayout(layout) + return panel + + def setup_websocket(self): + """Setup WebSocket connection for real-time events.""" + self.ws_client = WebSocketClient("ws://localhost:8000/ws") + + # Connect signals + self.ws_client.connected.connect(self.on_ws_connected) + self.ws_client.disconnected.connect(self.on_ws_disconnected) + self.ws_client.error.connect(self.on_ws_error) + self.ws_client.event_received.connect(self.on_event_received) + + # Start connection + self.ws_client.connect() + + @Slot() + def on_ws_connected(self): + """Called when WebSocket connection is established.""" + self.connection_status.setText("๐ŸŸข Connected") + self.connection_status.setStyleSheet("QLabel { padding: 5px; background-color: #4CAF50; color: white; border-radius: 3px; }") + self.statusBar().showMessage("WebSocket connected") + + @Slot() + def on_ws_disconnected(self): + """Called when WebSocket connection is lost.""" + self.connection_status.setText("๐Ÿ”ด Disconnected") + self.connection_status.setStyleSheet("QLabel { padding: 5px; background-color: #F44336; color: white; border-radius: 3px; }") + self.statusBar().showMessage("WebSocket disconnected - attempting reconnect...") + + # Attempt reconnect after 5 seconds + QTimer.singleShot(5000, self.ws_client.connect) + + @Slot(str) + def on_ws_error(self, error: str): + """Called when WebSocket error occurs.""" + self.statusBar().showMessage(f"WebSocket error: {error}") + + @Slot(dict) + def on_event_received(self, event: Dict[str, Any]): + """ + Called when an event is received from WebSocket. + + Args: + event: Event data from server + """ + self.event_widget.add_event(event) + + # Update status for specific events + event_type = event.get("event_type") + if event_type == "query": + self.statusBar().showMessage("Query received - agent processing...") + elif event_type == "complete": + self.statusBar().showMessage("Agent completed!") + self.submit_btn.setEnabled(True) + + def load_available_tools(self): + """Load available toolsets from the API.""" + try: + response = requests.get(f"{self.api_base_url}/tools", timeout=5) + if response.status_code == 200: + data = response.json() + toolsets = data.get("toolsets", []) + + self.available_toolsets = toolsets + self.toolsets_list.clear() + + for toolset in toolsets: + name = toolset.get("name", "") + description = toolset.get("description", "") + tool_count = toolset.get("tool_count", 0) + + item_text = f"{name} ({tool_count} tools) - {description}" + item = QListWidgetItem(item_text) + item.setData(Qt.UserRole, name) # Store toolset name + self.toolsets_list.addItem(item) + + self.statusBar().showMessage(f"Loaded {len(toolsets)} toolsets") + else: + self.statusBar().showMessage("Failed to load toolsets from API") + + except requests.exceptions.RequestException as e: + self.statusBar().showMessage(f"Error loading toolsets: {str(e)}") + # Add some default toolsets + default_toolsets = ["web", "vision", "terminal", "research"] + for ts in default_toolsets: + item = QListWidgetItem(f"{ts} (default)") + item.setData(Qt.UserRole, ts) + self.toolsets_list.addItem(item) + + @Slot() + def submit_query(self): + """Submit query to the agent API.""" + query = self.query_input.toPlainText().strip() + + if not query: + QMessageBox.warning(self, "No Query", "Please enter a query first!") + return + + # Get selected toolsets + selected_toolsets = [] + for i in range(self.toolsets_list.count()): + item = self.toolsets_list.item(i) + if item.isSelected(): + toolset_name = item.data(Qt.UserRole) + selected_toolsets.append(toolset_name) + + # Build request payload + payload = { + "query": query, + "model": self.model_combo.currentText(), + "base_url": self.base_url_input.text(), + "max_turns": self.max_turns_spin.value(), + "enabled_toolsets": selected_toolsets if selected_toolsets else None, + "mock_web_tools": self.mock_mode_checkbox.isChecked(), + "mock_delay": self.mock_delay_spin.value(), + "verbose": self.verbose_checkbox.isChecked() + } + + # Disable submit button during execution + self.submit_btn.setEnabled(False) + self.submit_btn.setText("โณ Running...") + self.statusBar().showMessage("Submitting query to agent...") + + # Submit to API + try: + response = requests.post( + f"{self.api_base_url}/agent/run", + json=payload, + timeout=10 + ) + + if response.status_code == 200: + result = response.json() + session_id = result.get("session_id", "") + self.current_session_id = session_id + + self.statusBar().showMessage(f"Agent started! Session: {session_id[:8]}...") + + # Clear event display for new session + # (or keep history - user preference) + # self.event_widget.clear_events() + + else: + QMessageBox.warning( + self, + "API Error", + f"Failed to start agent: {response.status_code}\n{response.text}" + ) + self.submit_btn.setEnabled(True) + self.submit_btn.setText("๐Ÿš€ Submit Query") + + except requests.exceptions.RequestException as e: + QMessageBox.critical( + self, + "Connection Error", + f"Failed to connect to API server:\n{str(e)}\n\nMake sure the server is running:\npython logging_server.py" + ) + self.submit_btn.setEnabled(True) + self.submit_btn.setText("๐Ÿš€ Submit Query") + + # Re-enable button after short delay (UI feedback) + QTimer.singleShot(2000, lambda: self.submit_btn.setText("๐Ÿš€ Submit Query")) + + def closeEvent(self, event): + """Handle window close event.""" + if self.ws_client: + self.ws_client.disconnect() + event.accept() + + +def main(): + """Main entry point for the application.""" + app = QApplication(sys.argv) + + # Set application metadata + app.setApplicationName("Hermes Agent") + app.setOrganizationName("Hermes") + app.setApplicationVersion("1.0.0") + + # Apply dark theme (optional) + # Uncomment to enable dark mode + # app.setStyle("Fusion") + # palette = QPalette() + # palette.setColor(QPalette.Window, QColor(53, 53, 53)) + # palette.setColor(QPalette.WindowText, Qt.white) + # app.setPalette(palette) + + # Create and show main window + window = HermesMainWindow() + window.show() + + # Start event loop + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() + diff --git a/ui/start_hermes_ui.sh b/ui/start_hermes_ui.sh new file mode 100755 index 0000000000..e7458f3316 --- /dev/null +++ b/ui/start_hermes_ui.sh @@ -0,0 +1,115 @@ +#!/bin/bash +# Hermes Agent UI Launcher +# +# This script starts both the API server and UI application. +# It will run them in the background and provide a clean shutdown. + +set -e + +# Colors for output +GREEN='\033[0;32m' +BLUE='\033[0;34m' +RED='\033[0;31m' +NC='\033[0m' # No Color + +echo -e "${BLUE}๐Ÿš€ Hermes Agent UI Launcher${NC}" +echo "================================" +echo "" + +# Check if Python is available +if ! command -v python3 &> /dev/null; then + echo -e "${RED}โŒ Python 3 not found. Please install Python 3.${NC}" + exit 1 +fi + +# Check if virtual environment exists +if [ -d "../../env" ]; then + echo -e "${GREEN}โœ“ Activating virtual environment${NC}" + source ../../env/bin/activate +else + echo -e "${BLUE}โ„น No virtual environment found, using system Python${NC}" +fi + +# Check dependencies +echo -e "${BLUE}Checking dependencies...${NC}" +python3 -c "import PySide6" 2>/dev/null || { + echo -e "${RED}โŒ PySide6 not installed${NC}" + echo -e "${BLUE}Installing dependencies...${NC}" + pip install -r ../requirements.txt +} + +# Check for API keys +if [ -z "$ANTHROPIC_API_KEY" ]; then + echo -e "${RED}โš ๏ธ Warning: ANTHROPIC_API_KEY not set${NC}" + echo " Set it with: export ANTHROPIC_API_KEY='your-key'" + echo "" +fi + +# Function to cleanup on exit +cleanup() { + echo "" + echo -e "${BLUE}๐Ÿ›‘ Shutting down Hermes Agent...${NC}" + if [ ! -z "$SERVER_PID" ]; then + kill $SERVER_PID 2>/dev/null || true + echo -e "${GREEN}โœ“ API Server stopped${NC}" + fi + if [ ! -z "$UI_PID" ]; then + kill $UI_PID 2>/dev/null || true + echo -e "${GREEN}โœ“ UI Application stopped${NC}" + fi + echo -e "${GREEN}โœ“ Cleanup complete${NC}" + exit 0 +} + +# Set up trap for cleanup +trap cleanup SIGINT SIGTERM EXIT + +# Start API server in background +echo -e "${BLUE}Starting API Server...${NC}" +cd ../api_endpoint +python3 logging_server.py > /tmp/hermes_server.log 2>&1 & +SERVER_PID=$! +cd ../ui + +# Wait for server to start +echo -e "${BLUE}Waiting for server to start...${NC}" +sleep 3 + +# Check if server is running +if ! kill -0 $SERVER_PID 2>/dev/null; then + echo -e "${RED}โŒ Server failed to start. Check /tmp/hermes_server.log${NC}" + tail -20 /tmp/hermes_server.log + exit 1 +fi + +# Check if server is responding +if curl -s http://localhost:8000/ > /dev/null; then + echo -e "${GREEN}โœ“ API Server running on http://localhost:8000${NC}" +else + echo -e "${RED}โŒ Server not responding. Check /tmp/hermes_server.log${NC}" + exit 1 +fi + +# Start UI application +echo -e "${BLUE}Starting UI Application...${NC}" +python3 hermes_ui.py & +UI_PID=$! + +echo "" +echo -e "${GREEN}================================${NC}" +echo -e "${GREEN}โœ“ Hermes Agent UI is running!${NC}" +echo -e "${GREEN}================================${NC}" +echo "" +echo -e "${BLUE}๐Ÿ“Š Component Status:${NC}" +echo -e " API Server: http://localhost:8000 (PID: $SERVER_PID)" +echo -e " UI App: Running (PID: $UI_PID)" +echo -e " Server Log: /tmp/hermes_server.log" +echo "" +echo -e "${BLUE}Press Ctrl+C to stop all services${NC}" +echo "" + +# Wait for UI to exit +wait $UI_PID + +# Cleanup will be triggered by trap + diff --git a/ui/test_ui_flow.py b/ui/test_ui_flow.py new file mode 100644 index 0000000000..2715c4fa99 --- /dev/null +++ b/ui/test_ui_flow.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +Test script to verify UI flow works correctly. + +This tests: +1. API server is running +2. WebSocket connection works +3. Agent can be started via API +4. Events are broadcast properly +""" + +import requests +import json +import time +import websocket +import threading + +API_URL = "http://localhost:8000" +WS_URL = "ws://localhost:8000/ws" + +def test_api_server(): + """Test if API server is running.""" + print("๐Ÿ” Testing API server...") + try: + response = requests.get(f"{API_URL}/", timeout=5) + if response.status_code == 200: + data = response.json() + print(f"โœ… API server is running: {data.get('service')}") + print(f" Active connections: {data.get('active_connections')}") + return True + else: + print(f"โŒ API server returned: {response.status_code}") + return False + except Exception as e: + print(f"โŒ API server not accessible: {e}") + return False + +def test_tools_endpoint(): + """Test if tools endpoint works.""" + print("\n๐Ÿ” Testing tools endpoint...") + try: + response = requests.get(f"{API_URL}/tools", timeout=5) + if response.status_code == 200: + data = response.json() + toolsets = data.get("toolsets", []) + print(f"โœ… Tools endpoint works - {len(toolsets)} toolsets available") + for ts in toolsets[:3]: + print(f" โ€ข {ts.get('name')} ({ts.get('tool_count')} tools)") + return True + else: + print(f"โŒ Tools endpoint failed: {response.status_code}") + return False + except Exception as e: + print(f"โŒ Tools endpoint error: {e}") + return False + +def test_websocket(): + """Test WebSocket connection.""" + print("\n๐Ÿ” Testing WebSocket connection...") + + connected = threading.Event() + message_received = threading.Event() + messages = [] + + def on_open(ws): + print("โœ… WebSocket connected") + connected.set() + + def on_message(ws, message): + data = json.loads(message) + messages.append(data) + message_received.set() + print(f"๐Ÿ“จ Received: {data.get('event_type', 'unknown')}") + + def on_error(ws, error): + print(f"โŒ WebSocket error: {error}") + + def on_close(ws, close_status_code, close_msg): + print(f"๐Ÿ”Œ WebSocket closed: {close_status_code}") + + ws = websocket.WebSocketApp( + WS_URL, + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close + ) + + # Run WebSocket in background + ws_thread = threading.Thread(target=lambda: ws.run_forever(), daemon=True) + ws_thread.start() + + # Wait for connection + if connected.wait(timeout=5): + print("โœ… WebSocket connection established") + ws.close() + return True + else: + print("โŒ WebSocket connection timeout") + ws.close() + return False + +def test_agent_run(): + """Test running agent via API.""" + print("\n๐Ÿ” Testing agent run via API (mock mode)...") + + # Start listening for events first + events = [] + ws_connected = threading.Event() + session_complete = threading.Event() + + def on_message(ws, message): + data = json.loads(message) + events.append(data) + event_type = data.get("event_type") + print(f" ๐Ÿ“จ Event: {event_type}") + + if event_type == "complete": + session_complete.set() + + def on_open(ws): + ws_connected.set() + + # Connect WebSocket + ws = websocket.WebSocketApp( + WS_URL, + on_open=on_open, + on_message=on_message + ) + + ws_thread = threading.Thread(target=lambda: ws.run_forever(), daemon=True) + ws_thread.start() + + # Wait for WebSocket connection + if not ws_connected.wait(timeout=5): + print("โŒ WebSocket didn't connect") + ws.close() + return False + + print("โœ… WebSocket connected, starting agent...") + + # Submit agent run + payload = { + "query": "Test query for UI flow verification", + "model": "claude-sonnet-4-5-20250929", + "base_url": "https://api.anthropic.com/v1/", + "enabled_toolsets": ["web"], + "max_turns": 5, + "mock_web_tools": True, # Use mock mode to avoid API costs + "mock_delay": 2, # Fast for testing + "verbose": False + } + + try: + response = requests.post(f"{API_URL}/agent/run", json=payload, timeout=10) + + if response.status_code == 200: + result = response.json() + session_id = result.get("session_id") + print(f"โœ… Agent started: {session_id[:8]}...") + + # Wait for completion (or timeout) + print("โณ Waiting for agent to complete (up to 30s)...") + if session_complete.wait(timeout=30): + print(f"โœ… Agent completed! Received {len(events)} events:") + + # Count event types + event_counts = {} + for evt in events: + evt_type = evt.get("event_type", "unknown") + event_counts[evt_type] = event_counts.get(evt_type, 0) + 1 + + for evt_type, count in event_counts.items(): + print(f" โ€ข {evt_type}: {count}") + + # Check we got expected events + expected_events = ["query", "api_call", "response", "complete"] + missing = [e for e in expected_events if e not in event_counts] + + if missing: + print(f"โš ๏ธ Missing expected events: {missing}") + else: + print("โœ… All expected event types received!") + + ws.close() + return True + else: + print(f"โš ๏ธ Timeout waiting for completion. Got {len(events)} events so far.") + ws.close() + return False + + else: + print(f"โŒ Agent start failed: {response.status_code}") + print(f" Response: {response.text}") + ws.close() + return False + + except Exception as e: + print(f"โŒ Agent run error: {e}") + import traceback + traceback.print_exc() + ws.close() + return False + +def main(): + """Run all tests.""" + print("=" * 60) + print("๐Ÿงช Hermes Agent UI Flow Test") + print("=" * 60) + print("\nThis will test the complete flow:") + print(" 1. API server connectivity") + print(" 2. Tools endpoint") + print(" 3. WebSocket connection") + print(" 4. Agent execution via API (mock mode)") + print(" 5. Event streaming to UI") + print("\n" + "=" * 60) + + results = [] + + # Test 1: API server + results.append(("API Server", test_api_server())) + + # Test 2: Tools endpoint + results.append(("Tools Endpoint", test_tools_endpoint())) + + # Test 3: WebSocket + results.append(("WebSocket Connection", test_websocket())) + + # Test 4: Agent run + results.append(("Agent Execution + Events", test_agent_run())) + + # Summary + print("\n" + "=" * 60) + print("๐Ÿ“Š TEST SUMMARY") + print("=" * 60) + + for test_name, passed in results: + status = "โœ… PASS" if passed else "โŒ FAIL" + print(f"{status} - {test_name}") + + all_passed = all(r[1] for r in results) + + print("\n" + "=" * 60) + if all_passed: + print("๐ŸŽ‰ ALL TESTS PASSED!") + print("\nโœ… The UI flow is working correctly!") + print(" You can now use the UI to:") + print(" โ€ข Submit queries") + print(" โ€ข View real-time events") + print(" โ€ข See tool executions") + print(" โ€ข Get final responses") + else: + print("โŒ SOME TESTS FAILED") + print("\nMake sure:") + print(" 1. API server is running: python api_endpoint/logging_server.py") + print(" 2. ANTHROPIC_API_KEY is set in environment") + print(" 3. All dependencies are installed: pip install -r requirements.txt") + print("=" * 60) + + return 0 if all_passed else 1 + +if __name__ == "__main__": + exit(main()) + diff --git a/web_tools.py b/web_tools.py index 706eb1ff16..4c6a8ad7a2 100644 --- a/web_tools.py +++ b/web_tools.py @@ -193,6 +193,8 @@ Create a markdown summary that captures all key information in a well-organized, temperature=0.1, # Low temperature for consistent extraction max_tokens=4000 # Generous limit for comprehensive processing ) + + print("Response within tool call to see the error: ", response) # Get the markdown response directly processed_content = response.choices[0].message.content.strip()