refactor: enhance API interaction and message handling in AIAgent

- Introduced new methods in run_agent.py for building API keyword arguments and normalizing assistant messages from API responses.
- Added functionality for compressing conversation context and managing session state in SQLite.
- Improved tool call execution handling, including enhanced logging and error management.
- Updated path handling in multiple platform files to utilize pathlib for better compatibility and readability.
This commit is contained in:
teknium1 2026-02-21 04:17:27 -08:00
parent 7ee7221af1
commit ecb430effe
6 changed files with 351 additions and 401 deletions

View file

@ -17,7 +17,8 @@ from typing import Dict, List, Optional, Any, Callable, Awaitable, Tuple
from enum import Enum
import sys
sys.path.insert(0, str(__file__).rsplit("/", 3)[0])
from pathlib import Path as _Path
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.session import SessionSource

View file

@ -27,7 +27,8 @@ except ImportError:
commands = None
import sys
sys.path.insert(0, str(__file__).rsplit("/", 3)[0])
from pathlib import Path as _Path
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (

View file

@ -24,7 +24,8 @@ except ImportError:
AsyncWebClient = Any
import sys
sys.path.insert(0, str(__file__).rsplit("/", 3)[0])
from pathlib import Path as _Path
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (

View file

@ -30,7 +30,8 @@ except ImportError:
ContextTypes = Any
import sys
sys.path.insert(0, str(__file__).rsplit("/", 3)[0])
from pathlib import Path as _Path
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (

View file

@ -25,7 +25,7 @@ from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
import sys
sys.path.insert(0, str(__file__).rsplit("/", 3)[0])
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (

View file

@ -2284,7 +2284,337 @@ class AIAgent:
self._cached_system_prompt = None
if self._memory_store:
self._memory_store.load_from_disk()
def _build_api_kwargs(self, api_messages: list) -> dict:
"""Build the keyword arguments dict for the chat completions API call."""
provider_preferences = {}
if self.providers_allowed:
provider_preferences["only"] = self.providers_allowed
if self.providers_ignored:
provider_preferences["ignore"] = self.providers_ignored
if self.providers_order:
provider_preferences["order"] = self.providers_order
if self.provider_sort:
provider_preferences["sort"] = self.provider_sort
api_kwargs = {
"model": self.model,
"messages": api_messages,
"tools": self.tools if self.tools else None,
"timeout": 600.0,
}
if self.max_tokens is not None:
api_kwargs["max_tokens"] = self.max_tokens
extra_body = {}
if provider_preferences:
extra_body["provider"] = provider_preferences
if "openrouter" in self.base_url.lower():
if self.reasoning_config is not None:
extra_body["reasoning"] = self.reasoning_config
else:
extra_body["reasoning"] = {
"enabled": True,
"effort": "xhigh"
}
if extra_body:
api_kwargs["extra_body"] = extra_body
return api_kwargs
def _build_assistant_message(self, assistant_message, finish_reason: str) -> dict:
"""Build a normalized assistant message dict from an API response message.
Handles reasoning extraction, reasoning_details, and optional tool_calls
so both the tool-call path and the final-response path share one builder.
"""
reasoning_text = self._extract_reasoning(assistant_message)
if reasoning_text and self.verbose_logging:
preview = reasoning_text[:100] + "..." if len(reasoning_text) > 100 else reasoning_text
logging.debug(f"Captured reasoning ({len(reasoning_text)} chars): {preview}")
msg = {
"role": "assistant",
"content": assistant_message.content or "",
"reasoning": reasoning_text,
"finish_reason": finish_reason,
}
if hasattr(assistant_message, 'reasoning_details') and assistant_message.reasoning_details:
msg["reasoning_details"] = [
{"type": d.get("type"), "text": d.get("text"), "signature": d.get("signature")}
for d in assistant_message.reasoning_details
if isinstance(d, dict)
]
if assistant_message.tool_calls:
msg["tool_calls"] = [
{
"id": tool_call.id,
"type": tool_call.type,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
}
for tool_call in assistant_message.tool_calls
]
return msg
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None) -> tuple:
"""Compress conversation context and split the session in SQLite.
Returns:
(compressed_messages, new_system_prompt) tuple
"""
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens)
todo_snapshot = self._todo_store.format_for_injection()
if todo_snapshot:
compressed.append({"role": "user", "content": todo_snapshot})
self._invalidate_system_prompt()
new_system_prompt = self._build_system_prompt(system_message)
self._cached_system_prompt = new_system_prompt
if self._session_db:
try:
self._session_db.end_session(self.session_id, "compression")
old_session_id = self.session_id
self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or "cli",
model=self.model,
parent_session_id=old_session_id,
)
self._session_db.update_system_prompt(self.session_id, new_system_prompt)
except Exception as e:
logger.debug("Session DB compression split failed: %s", e)
return compressed, new_system_prompt
def _execute_tool_calls(self, assistant_message, messages: list, effective_task_id: str) -> None:
"""Execute tool calls from the assistant message and append results to messages."""
for i, tool_call in enumerate(assistant_message.tool_calls, 1):
function_name = tool_call.function.name
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
logging.warning(f"Unexpected JSON error after validation: {e}")
function_args = {}
if not self.quiet_mode:
args_str = json.dumps(function_args, ensure_ascii=False)
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())}) - {args_preview}")
if self.tool_progress_callback:
try:
preview = _build_tool_preview(function_name, function_args)
self.tool_progress_callback(function_name, preview)
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
tool_start_time = time.time()
if function_name == "todo":
from tools.todo_tool import todo_tool as _todo_tool
function_result = _todo_tool(
todos=function_args.get("todos"),
merge=function_args.get("merge", False),
store=self._todo_store,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('todo', function_args, tool_duration)}")
elif function_name == "session_search" and self._session_db:
from tools.session_search_tool import session_search as _session_search
function_result = _session_search(
query=function_args.get("query", ""),
role_filter=function_args.get("role_filter"),
limit=function_args.get("limit", 3),
db=self._session_db,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('session_search', function_args, tool_duration)}")
elif function_name == "memory":
from tools.memory_tool import memory_tool as _memory_tool
function_result = _memory_tool(
action=function_args.get("action"),
target=function_args.get("target", "memory"),
content=function_args.get("content"),
old_text=function_args.get("old_text"),
store=self._memory_store,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('memory', function_args, tool_duration)}")
elif function_name == "clarify":
from tools.clarify_tool import clarify_tool as _clarify_tool
function_result = _clarify_tool(
question=function_args.get("question", ""),
choices=function_args.get("choices"),
callback=self.clarify_callback,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('clarify', function_args, tool_duration)}")
elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
tasks_arg = function_args.get("tasks")
if tasks_arg and isinstance(tasks_arg, list):
spinner_label = f"🔀 delegating {len(tasks_arg)} tasks"
else:
goal_preview = (function_args.get("goal") or "")[:30]
spinner_label = f"🔀 {goal_preview}" if goal_preview else "🔀 delegating"
spinner = None
if self.quiet_mode:
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
spinner = KawaiiSpinner(f"{face} {spinner_label}", spinner_type='dots')
spinner.start()
self._delegate_spinner = spinner
try:
function_result = _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=tasks_arg,
model=function_args.get("model"),
max_iterations=function_args.get("max_iterations"),
parent_agent=self,
)
finally:
self._delegate_spinner = None
tool_duration = time.time() - tool_start_time
cute_msg = self._get_cute_tool_message('delegate_task', function_args, tool_duration)
if spinner:
spinner.stop(cute_msg)
elif self.quiet_mode:
print(f" {cute_msg}")
elif self.quiet_mode:
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
tool_emoji_map = {
'web_search': '🔍', 'web_extract': '📄', 'web_crawl': '🕸️',
'terminal': '💻', 'process': '⚙️',
'read_file': '📖', 'write_file': '✍️', 'patch': '🔧', 'search_files': '🔎',
'browser_navigate': '🌐', 'browser_snapshot': '📸',
'browser_click': '👆', 'browser_type': '⌨️',
'browser_scroll': '📜', 'browser_back': '◀️',
'browser_press': '⌨️', 'browser_close': '🚪',
'browser_get_images': '🖼️', 'browser_vision': '👁️',
'image_generate': '🎨', 'text_to_speech': '🔊',
'vision_analyze': '👁️', 'mixture_of_agents': '🧠',
'skills_list': '📚', 'skill_view': '📚',
'schedule_cronjob': '', 'list_cronjobs': '', 'remove_cronjob': '',
'send_message': '📨', 'todo': '📋', 'memory': '🧠', 'session_search': '🔍',
'clarify': '', 'execute_code': '🐍', 'delegate_task': '🔀',
}
emoji = tool_emoji_map.get(function_name, '')
preview = _build_tool_preview(function_name, function_args) or function_name
if len(preview) > 30:
preview = preview[:27] + "..."
spinner = KawaiiSpinner(f"{face} {emoji} {preview}", spinner_type='dots')
spinner.start()
try:
function_result = handle_function_call(function_name, function_args, effective_task_id)
finally:
tool_duration = time.time() - tool_start_time
cute_msg = self._get_cute_tool_message(function_name, function_args, tool_duration)
spinner.stop(cute_msg)
else:
function_result = handle_function_call(function_name, function_args, effective_task_id)
tool_duration = time.time() - tool_start_time
result_preview = function_result[:200] if len(function_result) > 200 else function_result
if self.verbose_logging:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result preview: {result_preview}...")
messages.append({
"role": "tool",
"content": function_result,
"tool_call_id": tool_call.id
})
if not self.quiet_mode:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}")
if self._interrupt_requested and i < len(assistant_message.tool_calls):
remaining = len(assistant_message.tool_calls) - i
print(f"{self.log_prefix}⚡ Interrupt: skipping {remaining} remaining tool call(s)")
for skipped_tc in assistant_message.tool_calls[i:]:
messages.append({
"role": "tool",
"content": "[Tool execution skipped - user sent a new message]",
"tool_call_id": skipped_tc.id
})
break
if self.tool_delay > 0 and i < len(assistant_message.tool_calls):
time.sleep(self.tool_delay)
def _handle_max_iterations(self, messages: list, api_call_count: int) -> str:
"""Request a summary when max iterations are reached. Returns the final response text."""
print(f"⚠️ Reached maximum iterations ({self.max_iterations}). Requesting summary...")
summary_request = (
"You've reached the maximum number of tool-calling iterations allowed. "
"Please provide a final response summarizing what you've found and accomplished so far, "
"without calling any more tools."
)
messages.append({"role": "user", "content": summary_request})
try:
api_messages = messages.copy()
if self.ephemeral_system_prompt:
api_messages = [{"role": "system", "content": self.ephemeral_system_prompt}] + api_messages
summary_extra_body = {}
if "openrouter" in self.base_url.lower():
if self.reasoning_config is not None:
summary_extra_body["reasoning"] = self.reasoning_config
else:
summary_extra_body["reasoning"] = {
"enabled": True,
"effort": "xhigh"
}
summary_kwargs = {
"model": self.model,
"messages": api_messages,
}
if self.max_tokens is not None:
summary_kwargs["max_tokens"] = self.max_tokens
if summary_extra_body:
summary_kwargs["extra_body"] = summary_extra_body
summary_response = self.client.chat.completions.create(**summary_kwargs)
if summary_response.choices and summary_response.choices[0].message.content:
final_response = summary_response.choices[0].message.content
if "<think>" in final_response:
final_response = re.sub(r'<think>.*?</think>\s*', '', final_response, flags=re.DOTALL).strip()
messages.append({"role": "assistant", "content": final_response})
else:
final_response = "I reached the iteration limit and couldn't generate a summary."
except Exception as e:
logging.warning(f"Failed to get summary response: {e}")
final_response = f"I reached the maximum iterations ({self.max_iterations}) but couldn't summarize. Error: {str(e)}"
return final_response
def run_conversation(
self,
user_message: str,
@ -2443,50 +2773,7 @@ class AIAgent:
while retry_count <= max_retries:
try:
# Build OpenRouter provider preferences if specified
provider_preferences = {}
if self.providers_allowed:
provider_preferences["only"] = self.providers_allowed
if self.providers_ignored:
provider_preferences["ignore"] = self.providers_ignored
if self.providers_order:
provider_preferences["order"] = self.providers_order
if self.provider_sort:
provider_preferences["sort"] = self.provider_sort
# Make API call with tools - increased timeout for long responses
api_kwargs = {
"model": self.model,
"messages": api_messages,
"tools": self.tools if self.tools else None,
"timeout": 600.0 # 10 minute timeout for very long responses
}
# Add max_tokens if configured (overrides model default)
if self.max_tokens is not None:
api_kwargs["max_tokens"] = self.max_tokens
# Add extra_body for OpenRouter (provider preferences + reasoning)
extra_body = {}
# Add provider preferences if specified
if provider_preferences:
extra_body["provider"] = provider_preferences
# Configure reasoning for OpenRouter
# If reasoning_config is explicitly provided, use it (allows disabling/customizing)
# Otherwise, default to xhigh effort for OpenRouter models
if "openrouter" in self.base_url.lower():
if self.reasoning_config is not None:
extra_body["reasoning"] = self.reasoning_config
else:
extra_body["reasoning"] = {
"enabled": True,
"effort": "xhigh"
}
if extra_body:
api_kwargs["extra_body"] = extra_body
api_kwargs = self._build_api_kwargs(api_messages)
if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}:
self._dump_api_request_debug(api_kwargs, reason="preflight")
@ -2717,34 +3004,12 @@ class AIAgent:
if is_context_length_error:
print(f"{self.log_prefix}⚠️ Context length exceeded - attempting compression...")
# Try to compress and retry
original_len = len(messages)
messages = self.context_compressor.compress(messages, current_tokens=approx_tokens)
messages, active_system_prompt = self._compress_context(
messages, system_message, approx_tokens=approx_tokens
)
if len(messages) < original_len:
# Compression was possible -- re-inject todo state
todo_snapshot = self._todo_store.format_for_injection()
if todo_snapshot:
messages.append({"role": "user", "content": todo_snapshot})
# Rebuild system prompt with fresh date/time + memory
self._invalidate_system_prompt()
active_system_prompt = self._build_system_prompt(system_message)
self._cached_system_prompt = active_system_prompt
# Split session in SQLite (close old, open new with parent link)
if self._session_db:
try:
self._session_db.end_session(self.session_id, "compression")
old_session_id = self.session_id
self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or "cli",
model=self.model,
parent_session_id=old_session_id,
)
self._session_db.update_system_prompt(self.session_id, active_system_prompt)
except Exception as e:
logger.debug("Session DB compression split failed: %s", e)
print(f"{self.log_prefix} 🗜️ Compressed {original_len}{len(messages)} messages, retrying...")
continue # Retry with compressed messages
else:
@ -2921,262 +3186,17 @@ class AIAgent:
# Reset retry counter on successful JSON validation
self._invalid_json_retries = 0
# Extract reasoning from response if available
# OpenRouter can return reasoning in multiple formats:
# 1. message.reasoning - direct reasoning field
# 2. message.reasoning_content - alternative field (some providers)
# 3. message.reasoning_details - array with {summary: "..."} objects
reasoning_text = self._extract_reasoning(assistant_message)
if reasoning_text and self.verbose_logging:
preview = reasoning_text[:100] + "..." if len(reasoning_text) > 100 else reasoning_text
logging.debug(f"Captured reasoning ({len(reasoning_text)} chars): {preview}")
# Build assistant message with tool calls
# Content stays as-is; reasoning is stored separately and will be passed
# to the API via reasoning_content field when preparing api_messages
assistant_msg = {
"role": "assistant",
"content": assistant_message.content or "",
"reasoning": reasoning_text,
"finish_reason": finish_reason,
"tool_calls": [
{
"id": tool_call.id,
"type": tool_call.type,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
}
for tool_call in assistant_message.tool_calls
]
}
# Store reasoning_details for multi-turn reasoning context (OpenRouter)
if hasattr(assistant_message, 'reasoning_details') and assistant_message.reasoning_details:
assistant_msg["reasoning_details"] = [
{"type": d.get("type"), "text": d.get("text"), "signature": d.get("signature")}
for d in assistant_message.reasoning_details
if isinstance(d, dict)
]
assistant_msg = self._build_assistant_message(assistant_message, finish_reason)
messages.append(assistant_msg)
# Execute each tool call
for i, tool_call in enumerate(assistant_message.tool_calls, 1):
function_name = tool_call.function.name
# Parse arguments - should always succeed since we validated above
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
# This shouldn't happen since we validate and retry above
logging.warning(f"Unexpected JSON error after validation: {e}")
function_args = {}
# Preview tool call - cleaner format for quiet mode
if not self.quiet_mode:
args_str = json.dumps(function_args, ensure_ascii=False)
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())}) - {args_preview}")
# Fire progress callback if registered (for messaging platforms)
if self.tool_progress_callback:
try:
# Build a short preview of the primary argument
preview = _build_tool_preview(function_name, function_args)
self.tool_progress_callback(function_name, preview)
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
tool_start_time = time.time()
# Todo tool -- handle directly (needs agent's TodoStore instance)
if function_name == "todo":
from tools.todo_tool import todo_tool as _todo_tool
function_result = _todo_tool(
todos=function_args.get("todos"),
merge=function_args.get("merge", False),
store=self._todo_store,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('todo', function_args, tool_duration)}")
# Session search -- handle directly (needs SessionDB instance)
elif function_name == "session_search" and self._session_db:
from tools.session_search_tool import session_search as _session_search
function_result = _session_search(
query=function_args.get("query", ""),
role_filter=function_args.get("role_filter"),
limit=function_args.get("limit", 3),
db=self._session_db,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('session_search', function_args, tool_duration)}")
# Memory tool -- handle directly (needs agent's MemoryStore instance)
elif function_name == "memory":
from tools.memory_tool import memory_tool as _memory_tool
function_result = _memory_tool(
action=function_args.get("action"),
target=function_args.get("target", "memory"),
content=function_args.get("content"),
old_text=function_args.get("old_text"),
store=self._memory_store,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('memory', function_args, tool_duration)}")
# Clarify tool -- delegates to platform-provided callback
elif function_name == "clarify":
from tools.clarify_tool import clarify_tool as _clarify_tool
function_result = _clarify_tool(
question=function_args.get("question", ""),
choices=function_args.get("choices"),
callback=self.clarify_callback,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('clarify', function_args, tool_duration)}")
# Delegate task -- spawn child agent(s) with isolated context
elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
tasks_arg = function_args.get("tasks")
if tasks_arg and isinstance(tasks_arg, list):
spinner_label = f"🔀 delegating {len(tasks_arg)} tasks"
else:
goal_preview = (function_args.get("goal") or "")[:30]
spinner_label = f"🔀 {goal_preview}" if goal_preview else "🔀 delegating"
spinner = None
if self.quiet_mode:
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
spinner = KawaiiSpinner(f"{face} {spinner_label}", spinner_type='dots')
spinner.start()
# Store spinner on self so delegate_tool can update its text
self._delegate_spinner = spinner
try:
function_result = _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=tasks_arg,
model=function_args.get("model"),
max_iterations=function_args.get("max_iterations"),
parent_agent=self,
)
finally:
self._delegate_spinner = None
tool_duration = time.time() - tool_start_time
cute_msg = self._get_cute_tool_message('delegate_task', function_args, tool_duration)
if spinner:
spinner.stop(cute_msg)
elif self.quiet_mode:
print(f" {cute_msg}")
# Execute other tools - with animated kawaii spinner in quiet mode
# The face is "alive" while the tool works, then vanishes
# and is replaced by the clean result line.
elif self.quiet_mode:
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
tool_emoji_map = {
'web_search': '🔍', 'web_extract': '📄', 'web_crawl': '🕸️',
'terminal': '💻', 'process': '⚙️',
'read_file': '📖', 'write_file': '✍️', 'patch': '🔧', 'search_files': '🔎',
'browser_navigate': '🌐', 'browser_snapshot': '📸',
'browser_click': '👆', 'browser_type': '⌨️',
'browser_scroll': '📜', 'browser_back': '◀️',
'browser_press': '⌨️', 'browser_close': '🚪',
'browser_get_images': '🖼️', 'browser_vision': '👁️',
'image_generate': '🎨', 'text_to_speech': '🔊',
'vision_analyze': '👁️', 'mixture_of_agents': '🧠',
'skills_list': '📚', 'skill_view': '📚',
'schedule_cronjob': '', 'list_cronjobs': '', 'remove_cronjob': '',
'send_message': '📨', 'todo': '📋', 'memory': '🧠', 'session_search': '🔍',
'clarify': '', 'execute_code': '🐍', 'delegate_task': '🔀',
}
emoji = tool_emoji_map.get(function_name, '')
preview = _build_tool_preview(function_name, function_args) or function_name
if len(preview) > 30:
preview = preview[:27] + "..."
spinner = KawaiiSpinner(f"{face} {emoji} {preview}", spinner_type='dots')
spinner.start()
try:
function_result = handle_function_call(function_name, function_args, effective_task_id)
finally:
tool_duration = time.time() - tool_start_time
cute_msg = self._get_cute_tool_message(function_name, function_args, tool_duration)
spinner.stop(cute_msg)
else:
function_result = handle_function_call(function_name, function_args, effective_task_id)
tool_duration = time.time() - tool_start_time
result_preview = function_result[:200] if len(function_result) > 200 else function_result
if self.verbose_logging:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result preview: {result_preview}...")
# Add tool result to conversation
messages.append({
"role": "tool",
"content": function_result,
"tool_call_id": tool_call.id
})
# Preview tool response (only in non-quiet mode)
if not self.quiet_mode:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}")
# Check for interrupt between tool calls - skip remaining
# tools so the agent can respond to the user immediately
if self._interrupt_requested and i < len(assistant_message.tool_calls):
remaining = len(assistant_message.tool_calls) - i
print(f"{self.log_prefix}⚡ Interrupt: skipping {remaining} remaining tool call(s)")
# Add placeholder results for skipped tool calls so the
# message sequence stays valid (assistant tool_calls need matching tool results)
for skipped_tc in assistant_message.tool_calls[i:]:
messages.append({
"role": "tool",
"content": "[Tool execution skipped - user sent a new message]",
"tool_call_id": skipped_tc.id
})
break
# Delay between tool calls
if self.tool_delay > 0 and i < len(assistant_message.tool_calls):
time.sleep(self.tool_delay)
self._execute_tool_calls(assistant_message, messages, effective_task_id)
# Check if context compression is needed before next API call
# Uses actual token count from last API response
if self.compression_enabled and self.context_compressor.should_compress():
messages = self.context_compressor.compress(
messages,
current_tokens=self.context_compressor.last_prompt_tokens
messages, active_system_prompt = self._compress_context(
messages, system_message,
approx_tokens=self.context_compressor.last_prompt_tokens
)
# Re-inject todo state after compression
todo_snapshot = self._todo_store.format_for_injection()
if todo_snapshot:
messages.append({"role": "user", "content": todo_snapshot})
# Rebuild system prompt with fresh date/time + memory
self._invalidate_system_prompt()
active_system_prompt = self._build_system_prompt(system_message)
self._cached_system_prompt = active_system_prompt
# Split session in SQLite (close old, open new with parent link)
if self._session_db:
try:
self._session_db.end_session(self.session_id, "compression")
old_session_id = self.session_id
self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or "cli",
model=self.model,
parent_session_id=old_session_id,
)
self._session_db.update_system_prompt(self.session_id, active_system_prompt)
except Exception as e:
logger.debug("Session DB compression split failed: %s", e)
# Save session log incrementally (so progress is visible even if interrupted)
self._session_messages = messages
@ -3242,28 +3262,7 @@ class AIAgent:
if hasattr(self, '_empty_content_retries'):
self._empty_content_retries = 0
# Extract reasoning from response if available
reasoning_text = self._extract_reasoning(assistant_message)
if reasoning_text and self.verbose_logging:
preview = reasoning_text[:100] + "..." if len(reasoning_text) > 100 else reasoning_text
logging.debug(f"Captured final reasoning ({len(reasoning_text)} chars): {preview}")
# Build final assistant message
final_msg = {
"role": "assistant",
"content": final_response,
"reasoning": reasoning_text,
"finish_reason": finish_reason,
}
# Store reasoning_details for multi-turn reasoning context (OpenRouter)
if hasattr(assistant_message, 'reasoning_details') and assistant_message.reasoning_details:
final_msg["reasoning_details"] = [
{"type": d.get("type"), "text": d.get("text"), "signature": d.get("signature")}
for d in assistant_message.reasoning_details
if isinstance(d, dict)
]
final_msg = self._build_assistant_message(assistant_message, finish_reason)
messages.append(final_msg)
@ -3289,61 +3288,8 @@ class AIAgent:
final_response = f"I apologize, but I encountered repeated errors: {error_msg}"
break
# Handle max iterations reached - ask model to summarize what it found
if api_call_count >= self.max_iterations and final_response is None:
print(f"⚠️ Reached maximum iterations ({self.max_iterations}). Requesting summary...")
# Inject a user message asking for a summary
summary_request = (
"You've reached the maximum number of tool-calling iterations allowed. "
"Please provide a final response summarizing what you've found and accomplished so far, "
"without calling any more tools."
)
messages.append({"role": "user", "content": summary_request})
# Make one final API call WITHOUT tools to force a text response
try:
api_messages = messages.copy()
if self.ephemeral_system_prompt:
api_messages = [{"role": "system", "content": self.ephemeral_system_prompt}] + api_messages
# Build extra_body for summary call (same reasoning config as main loop)
summary_extra_body = {}
if "openrouter" in self.base_url.lower():
if self.reasoning_config is not None:
summary_extra_body["reasoning"] = self.reasoning_config
else:
summary_extra_body["reasoning"] = {
"enabled": True,
"effort": "xhigh"
}
summary_kwargs = {
"model": self.model,
"messages": api_messages,
# No tools parameter - forces text response
}
if self.max_tokens is not None:
summary_kwargs["max_tokens"] = self.max_tokens
if summary_extra_body:
summary_kwargs["extra_body"] = summary_extra_body
summary_response = self.client.chat.completions.create(**summary_kwargs)
if summary_response.choices and summary_response.choices[0].message.content:
final_response = summary_response.choices[0].message.content
# Strip think blocks from final response
if "<think>" in final_response:
final_response = re.sub(r'<think>.*?</think>\s*', '', final_response, flags=re.DOTALL).strip()
# Add to messages for session continuity
messages.append({"role": "assistant", "content": final_response})
else:
final_response = "I reached the iteration limit and couldn't generate a summary."
except Exception as e:
logging.warning(f"Failed to get summary response: {e}")
final_response = f"I reached the maximum iterations ({self.max_iterations}) but couldn't summarize. Error: {str(e)}"
final_response = self._handle_max_iterations(messages, api_call_count)
# Determine if conversation completed successfully
completed = final_response is not None and api_call_count < self.max_iterations