mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
refactor: remove 24 confirmed dead functions — 432 lines of unused code
Each function was verified to have exactly 1 reference in the entire codebase (its own definition). Zero calls, zero imports, zero string references anywhere including tests. Removed by category: Superseded wrappers (replaced by newer implementations): - agent/anthropic_adapter.py: run_hermes_oauth_login, refresh_hermes_oauth_token - hermes_cli/callbacks.py: sudo_password_callback (superseded by CLI method) - hermes_cli/setup.py: _set_model_provider, _sync_model_from_disk - tools/file_tools.py: get_file_tools (superseded by registry.register) - tools/cronjob_tools.py: get_cronjob_tool_definitions (same) - tools/terminal_tool.py: _check_dangerous_command (_check_all_guards used) Dead private helpers (lost their callers during refactors): - agent/anthropic_adapter.py: _convert_user_content_part_to_anthropic - agent/display.py: honcho_session_line, write_tty - hermes_cli/providers.py: _build_labels (+ dead _labels_cache var) - hermes_cli/tools_config.py: _prompt_yes_no - hermes_cli/models.py: _extract_model_ids - hermes_cli/uninstall.py: log_error - gateway/platforms/feishu.py: _is_loop_ready - tools/file_operations.py: _read_image (64-line method) - tools/process_registry.py: cleanup_expired - tools/skill_manager_tool.py: check_skill_manage_requirements Dead class methods (zero callers): - run_agent.py: _is_anthropic_url (logic duplicated inline at L618) - run_agent.py: _classify_empty_content_response (68-line method, never wired) - cli.py: reset_conversation (callers all use new_session directly) - cli.py: _clear_current_input (added but never wired in) Other: - gateway/delivery.py: build_delivery_context_for_tool - tools/browser_tool.py: get_active_browser_sessions
This commit is contained in:
parent
69c753c19b
commit
ca0459d109
19 changed files with 1 additions and 432 deletions
|
|
@ -706,29 +706,6 @@ def run_hermes_oauth_login_pure() -> Optional[Dict[str, Any]]:
|
|||
}
|
||||
|
||||
|
||||
def run_hermes_oauth_login() -> Optional[str]:
|
||||
"""Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription.
|
||||
|
||||
Opens a browser to claude.ai for authorization, prompts for the code,
|
||||
exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json.
|
||||
|
||||
Returns the access token on success, None on failure.
|
||||
"""
|
||||
result = run_hermes_oauth_login_pure()
|
||||
if not result:
|
||||
return None
|
||||
|
||||
access_token = result["access_token"]
|
||||
refresh_token = result["refresh_token"]
|
||||
expires_at_ms = result["expires_at_ms"]
|
||||
|
||||
_save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms)
|
||||
_write_claude_code_credentials(access_token, refresh_token, expires_at_ms)
|
||||
|
||||
print("Authentication successful!")
|
||||
return access_token
|
||||
|
||||
|
||||
def _save_hermes_oauth_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None:
|
||||
"""Save OAuth credentials to ~/.hermes/.anthropic_oauth.json."""
|
||||
data = {
|
||||
|
|
@ -756,38 +733,6 @@ def read_hermes_oauth_credentials() -> Optional[Dict[str, Any]]:
|
|||
return None
|
||||
|
||||
|
||||
def refresh_hermes_oauth_token() -> Optional[str]:
|
||||
"""Refresh the Hermes-managed OAuth token using the stored refresh token.
|
||||
|
||||
Returns the new access token, or None if refresh fails.
|
||||
"""
|
||||
creds = read_hermes_oauth_credentials()
|
||||
if not creds or not creds.get("refreshToken"):
|
||||
return None
|
||||
|
||||
try:
|
||||
refreshed = refresh_anthropic_oauth_pure(
|
||||
creds["refreshToken"],
|
||||
use_json=True,
|
||||
)
|
||||
_save_hermes_oauth_credentials(
|
||||
refreshed["access_token"],
|
||||
refreshed["refresh_token"],
|
||||
refreshed["expires_at_ms"],
|
||||
)
|
||||
_write_claude_code_credentials(
|
||||
refreshed["access_token"],
|
||||
refreshed["refresh_token"],
|
||||
refreshed["expires_at_ms"],
|
||||
)
|
||||
logger.debug("Successfully refreshed Hermes OAuth token")
|
||||
return refreshed["access_token"]
|
||||
except Exception as e:
|
||||
logger.debug("Failed to refresh Hermes OAuth token: %s", e)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Message / tool / response format conversion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -857,35 +802,6 @@ def _convert_openai_image_part_to_anthropic(part: Dict[str, Any]) -> Optional[Di
|
|||
return None
|
||||
|
||||
|
||||
def _convert_user_content_part_to_anthropic(part: Any) -> Optional[Dict[str, Any]]:
|
||||
if isinstance(part, dict):
|
||||
ptype = part.get("type")
|
||||
if ptype == "text":
|
||||
block = {"type": "text", "text": part.get("text", "")}
|
||||
if isinstance(part.get("cache_control"), dict):
|
||||
block["cache_control"] = dict(part["cache_control"])
|
||||
return block
|
||||
if ptype == "image_url":
|
||||
return _convert_openai_image_part_to_anthropic(part)
|
||||
if ptype == "image" and part.get("source"):
|
||||
return dict(part)
|
||||
if ptype == "image" and part.get("data"):
|
||||
media_type = part.get("mimeType") or part.get("media_type") or "image/png"
|
||||
return {
|
||||
"type": "image",
|
||||
"source": {
|
||||
"type": "base64",
|
||||
"media_type": media_type,
|
||||
"data": part.get("data", ""),
|
||||
},
|
||||
}
|
||||
if ptype == "tool_result":
|
||||
return dict(part)
|
||||
elif part is not None:
|
||||
return {"type": "text", "text": str(part)}
|
||||
return None
|
||||
|
||||
|
||||
def convert_tools_to_anthropic(tools: List[Dict]) -> List[Dict]:
|
||||
"""Convert OpenAI tool definitions to Anthropic format."""
|
||||
if not tools:
|
||||
|
|
|
|||
|
|
@ -986,24 +986,6 @@ def _osc8_link(url: str, text: str) -> str:
|
|||
return f"\033]8;;{url}\033\\{text}\033]8;;\033\\"
|
||||
|
||||
|
||||
def honcho_session_line(workspace: str, session_name: str) -> str:
|
||||
"""One-line session indicator: `Honcho session: <clickable name>`."""
|
||||
url = honcho_session_url(workspace, session_name)
|
||||
linked_name = _osc8_link(url, f"{_SKY_BLUE}{session_name}{_ANSI_RESET}")
|
||||
return f"{_DIM}Honcho session:{_ANSI_RESET} {linked_name}"
|
||||
|
||||
|
||||
def write_tty(text: str) -> None:
|
||||
"""Write directly to /dev/tty, bypassing stdout capture."""
|
||||
try:
|
||||
fd = os.open("/dev/tty", os.O_WRONLY)
|
||||
os.write(fd, text.encode("utf-8"))
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
sys.stdout.write(text)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Context pressure display (CLI user-facing warnings)
|
||||
# =========================================================================
|
||||
|
|
|
|||
15
cli.py
15
cli.py
|
|
@ -3536,13 +3536,6 @@ class HermesCLI:
|
|||
_cprint(f" Original session: {parent_session_id}")
|
||||
_cprint(f" Branch session: {new_session_id}")
|
||||
|
||||
def reset_conversation(self):
|
||||
"""Reset the conversation by starting a new session."""
|
||||
# Shut down memory provider before resetting — actual session boundary
|
||||
if hasattr(self, 'agent') and self.agent:
|
||||
self.agent.shutdown_memory_provider(self.conversation_history)
|
||||
self.new_session()
|
||||
|
||||
def save_conversation(self):
|
||||
"""Save the current conversation to a file."""
|
||||
if not self.conversation_history:
|
||||
|
|
@ -6290,14 +6283,6 @@ class HermesCLI:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
def _clear_current_input(self) -> None:
|
||||
if getattr(self, "_app", None):
|
||||
try:
|
||||
self._app.current_buffer.text = ""
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def chat(self, message, images: list = None) -> Optional[str]:
|
||||
"""
|
||||
Send a message to the agent and get a response.
|
||||
|
|
|
|||
|
|
@ -314,38 +314,4 @@ def parse_deliver_spec(
|
|||
return deliver
|
||||
|
||||
|
||||
def build_delivery_context_for_tool(
|
||||
config: GatewayConfig,
|
||||
origin: Optional[SessionSource] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Build context for the unified cronjob tool to understand delivery options.
|
||||
|
||||
This is passed to the tool so it can validate and explain delivery targets.
|
||||
"""
|
||||
connected = config.get_connected_platforms()
|
||||
|
||||
options = {
|
||||
"origin": {
|
||||
"description": "Back to where this job was created",
|
||||
"available": origin is not None,
|
||||
},
|
||||
"local": {
|
||||
"description": "Save to local files only",
|
||||
"available": True,
|
||||
}
|
||||
}
|
||||
|
||||
for platform in connected:
|
||||
home = config.get_home_channel(platform)
|
||||
options[platform.value] = {
|
||||
"description": f"{platform.value.title()} home channel",
|
||||
"available": True,
|
||||
"home_channel": home.to_dict() if home else None,
|
||||
}
|
||||
|
||||
return {
|
||||
"origin": origin.to_dict() if origin else None,
|
||||
"options": options,
|
||||
"always_log_local": config.always_log_local,
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -387,10 +387,6 @@ def _coerce_required_int(value: Any, default: int, min_value: int = 0) -> int:
|
|||
return default if parsed is None else parsed
|
||||
|
||||
|
||||
def _is_loop_ready(loop: Optional[asyncio.AbstractEventLoop]) -> bool:
|
||||
return loop is not None and not bool(getattr(loop, "is_closed", lambda: False)())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Post payload builders and parsers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -63,47 +63,6 @@ def clarify_callback(cli, question, choices):
|
|||
)
|
||||
|
||||
|
||||
def sudo_password_callback(cli) -> str:
|
||||
"""Prompt for sudo password through the TUI.
|
||||
|
||||
Sets up a password input area and blocks until the user responds.
|
||||
"""
|
||||
timeout = 45
|
||||
response_queue = queue.Queue()
|
||||
|
||||
cli._sudo_state = {"response_queue": response_queue}
|
||||
cli._sudo_deadline = _time.monotonic() + timeout
|
||||
|
||||
if hasattr(cli, "_app") and cli._app:
|
||||
cli._app.invalidate()
|
||||
|
||||
while True:
|
||||
try:
|
||||
result = response_queue.get(timeout=1)
|
||||
cli._sudo_state = None
|
||||
cli._sudo_deadline = 0
|
||||
if hasattr(cli, "_app") and cli._app:
|
||||
cli._app.invalidate()
|
||||
if result:
|
||||
cprint(f"\n{_DIM} ✓ Password received (cached for session){_RST}")
|
||||
else:
|
||||
cprint(f"\n{_DIM} ⏭ Skipped{_RST}")
|
||||
return result
|
||||
except queue.Empty:
|
||||
remaining = cli._sudo_deadline - _time.monotonic()
|
||||
if remaining <= 0:
|
||||
break
|
||||
if hasattr(cli, "_app") and cli._app:
|
||||
cli._app.invalidate()
|
||||
|
||||
cli._sudo_state = None
|
||||
cli._sudo_deadline = 0
|
||||
if hasattr(cli, "_app") and cli._app:
|
||||
cli._app.invalidate()
|
||||
cprint(f"\n{_DIM} ⏱ Timeout — continuing without sudo{_RST}")
|
||||
return ""
|
||||
|
||||
|
||||
def prompt_for_secret(cli, var_name: str, prompt: str, metadata=None) -> dict:
|
||||
"""Prompt for a secret value through the TUI (e.g. API keys for skills).
|
||||
|
||||
|
|
|
|||
|
|
@ -1131,10 +1131,6 @@ def _payload_items(payload: Any) -> list[dict[str, Any]]:
|
|||
return []
|
||||
|
||||
|
||||
def _extract_model_ids(payload: Any) -> list[str]:
|
||||
return [item.get("id", "") for item in _payload_items(payload) if item.get("id")]
|
||||
|
||||
|
||||
def copilot_default_headers() -> dict[str, str]:
|
||||
"""Standard headers for Copilot API requests.
|
||||
|
||||
|
|
|
|||
|
|
@ -344,18 +344,6 @@ def get_label(provider_id: str) -> str:
|
|||
return canonical
|
||||
|
||||
|
||||
# Build LABELS dict for backward compat
|
||||
def _build_labels() -> Dict[str, str]:
|
||||
"""Build labels dict from overlays + overrides. Lazy, cached."""
|
||||
labels: Dict[str, str] = {}
|
||||
for pid in HERMES_OVERLAYS:
|
||||
labels[pid] = get_label(pid)
|
||||
labels.update(_LABEL_OVERRIDES)
|
||||
return labels
|
||||
|
||||
# Lazy-built on first access
|
||||
_labels_cache: Optional[Dict[str, str]] = None
|
||||
|
||||
# For direct import compat, expose as module-level dict
|
||||
# Built on demand by get_label() calls
|
||||
LABELS: Dict[str, str] = {
|
||||
|
|
|
|||
|
|
@ -42,18 +42,6 @@ def _model_config_dict(config: Dict[str, Any]) -> Dict[str, Any]:
|
|||
return {}
|
||||
|
||||
|
||||
def _set_model_provider(
|
||||
config: Dict[str, Any], provider_id: str, base_url: str = ""
|
||||
) -> None:
|
||||
model_cfg = _model_config_dict(config)
|
||||
model_cfg["provider"] = provider_id
|
||||
if base_url:
|
||||
model_cfg["base_url"] = base_url.rstrip("/")
|
||||
else:
|
||||
model_cfg.pop("base_url", None)
|
||||
config["model"] = model_cfg
|
||||
|
||||
|
||||
def _set_default_model(config: Dict[str, Any], model_name: str) -> None:
|
||||
if not model_name:
|
||||
return
|
||||
|
|
@ -326,16 +314,6 @@ def _setup_provider_model_selection(config, provider_id, current_model, prompt_c
|
|||
config["model"] = model_cfg
|
||||
|
||||
|
||||
def _sync_model_from_disk(config: Dict[str, Any]) -> None:
|
||||
disk_model = load_config().get("model")
|
||||
if isinstance(disk_model, dict):
|
||||
model_cfg = _model_config_dict(config)
|
||||
model_cfg.update(disk_model)
|
||||
config["model"] = model_cfg
|
||||
elif isinstance(disk_model, str) and disk_model.strip():
|
||||
_set_default_model(config, disk_model.strip())
|
||||
|
||||
|
||||
# Import config helpers
|
||||
from hermes_cli.config import (
|
||||
get_hermes_home,
|
||||
|
|
|
|||
|
|
@ -61,22 +61,6 @@ def _prompt(question: str, default: str = None, password: bool = False) -> str:
|
|||
print()
|
||||
return default or ""
|
||||
|
||||
def _prompt_yes_no(question: str, default: bool = True) -> bool:
|
||||
default_str = "Y/n" if default else "y/N"
|
||||
while True:
|
||||
try:
|
||||
value = input(color(f"{question} [{default_str}]: ", Colors.YELLOW)).strip().lower()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print()
|
||||
return default
|
||||
if not value:
|
||||
return default
|
||||
if value in ('y', 'yes'):
|
||||
return True
|
||||
if value in ('n', 'no'):
|
||||
return False
|
||||
|
||||
|
||||
# ─── Toolset Registry ─────────────────────────────────────────────────────────
|
||||
|
||||
# Toolsets shown in the configurator, grouped for display.
|
||||
|
|
|
|||
|
|
@ -23,10 +23,6 @@ def log_success(msg: str):
|
|||
def log_warn(msg: str):
|
||||
print(f"{color('⚠', Colors.YELLOW)} {msg}")
|
||||
|
||||
def log_error(msg: str):
|
||||
print(f"{color('✗', Colors.RED)} {msg}")
|
||||
|
||||
|
||||
def get_project_root() -> Path:
|
||||
"""Get the project installation directory."""
|
||||
return Path(__file__).parent.parent.resolve()
|
||||
|
|
|
|||
72
run_agent.py
72
run_agent.py
|
|
@ -1505,10 +1505,6 @@ class AIAgent:
|
|||
"""Return True when the base URL targets OpenRouter."""
|
||||
return "openrouter" in self._base_url_lower
|
||||
|
||||
def _is_anthropic_url(self) -> bool:
|
||||
"""Return True when the base URL targets Anthropic (native or /anthropic proxy path)."""
|
||||
return "api.anthropic.com" in self._base_url_lower or self._base_url_lower.rstrip("/").endswith("/anthropic")
|
||||
|
||||
def _max_tokens_param(self, value: int) -> dict:
|
||||
"""Return the correct max tokens kwarg for the current provider.
|
||||
|
||||
|
|
@ -1694,74 +1690,6 @@ class AIAgent:
|
|||
|
||||
return None
|
||||
|
||||
def _classify_empty_content_response(
|
||||
self,
|
||||
assistant_message,
|
||||
*,
|
||||
finish_reason: Optional[str],
|
||||
approx_tokens: int,
|
||||
api_messages: List[Dict[str, Any]],
|
||||
conversation_history: Optional[List[Dict[str, Any]]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Classify think-only/empty responses so we can retry, compress, or salvage.
|
||||
|
||||
We intentionally do NOT short-circuit all structured-reasoning responses.
|
||||
Prior discussion/PR history shows some models recover on retry. Instead we:
|
||||
- compress immediately when the pattern looks like implicit context pressure
|
||||
- salvage reasoning early when the same reasoning-only payload repeats
|
||||
- otherwise preserve the normal retry path
|
||||
"""
|
||||
reasoning_text = self._extract_reasoning(assistant_message)
|
||||
has_structured_reasoning = bool(
|
||||
getattr(assistant_message, "reasoning", None)
|
||||
or getattr(assistant_message, "reasoning_content", None)
|
||||
or getattr(assistant_message, "reasoning_details", None)
|
||||
)
|
||||
content = getattr(assistant_message, "content", None) or ""
|
||||
stripped_content = self._strip_think_blocks(content).strip()
|
||||
signature = (
|
||||
content,
|
||||
reasoning_text or "",
|
||||
bool(has_structured_reasoning),
|
||||
finish_reason or "",
|
||||
)
|
||||
repeated_signature = signature == getattr(self, "_last_empty_content_signature", None)
|
||||
|
||||
compressor = getattr(self, "context_compressor", None)
|
||||
ctx_len = getattr(compressor, "context_length", 0) or 0
|
||||
threshold_tokens = getattr(compressor, "threshold_tokens", 0) or 0
|
||||
is_large_session = bool(
|
||||
(ctx_len and approx_tokens >= max(int(ctx_len * 0.4), threshold_tokens))
|
||||
or len(api_messages) > 80
|
||||
)
|
||||
is_local_custom = is_local_endpoint(getattr(self, "base_url", "") or "")
|
||||
is_resumed = bool(conversation_history)
|
||||
context_pressure_signals = any(
|
||||
[
|
||||
finish_reason == "length",
|
||||
getattr(compressor, "_context_probed", False),
|
||||
is_large_session,
|
||||
is_resumed,
|
||||
]
|
||||
)
|
||||
should_compress = bool(
|
||||
self.compression_enabled
|
||||
and is_local_custom
|
||||
and context_pressure_signals
|
||||
and not stripped_content
|
||||
)
|
||||
|
||||
self._last_empty_content_signature = signature
|
||||
return {
|
||||
"reasoning_text": reasoning_text,
|
||||
"has_structured_reasoning": has_structured_reasoning,
|
||||
"repeated_signature": repeated_signature,
|
||||
"should_compress": should_compress,
|
||||
"is_local_custom": is_local_custom,
|
||||
"is_large_session": is_large_session,
|
||||
"is_resumed": is_resumed,
|
||||
}
|
||||
|
||||
def _cleanup_task_resources(self, task_id: str) -> None:
|
||||
"""Clean up VM and browser resources for a given task."""
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -2024,16 +2024,6 @@ def cleanup_all_browsers() -> None:
|
|||
cleanup_browser(task_id)
|
||||
|
||||
|
||||
def get_active_browser_sessions() -> Dict[str, Dict[str, str]]:
|
||||
"""
|
||||
Get information about active browser sessions.
|
||||
|
||||
Returns:
|
||||
Dict mapping task_id to session info (session_name, bb_session_id, cdp_url)
|
||||
"""
|
||||
with _cleanup_lock:
|
||||
return _active_sessions.copy()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Requirements Check
|
||||
|
|
|
|||
|
|
@ -501,11 +501,6 @@ def check_cronjob_requirements() -> bool:
|
|||
)
|
||||
|
||||
|
||||
def get_cronjob_tool_definitions():
|
||||
"""Return tool definitions for cronjob management."""
|
||||
return [CRONJOB_SCHEMA]
|
||||
|
||||
|
||||
# --- Registry ---
|
||||
from tools.registry import registry
|
||||
|
||||
|
|
|
|||
|
|
@ -555,75 +555,6 @@ class ShellFileOperations(FileOperations):
|
|||
hint=hint
|
||||
)
|
||||
|
||||
# Images larger than this are too expensive to inline as base64 in the
|
||||
# conversation context. Return metadata only and suggest vision_analyze.
|
||||
MAX_IMAGE_BYTES = 512 * 1024 # 512 KB
|
||||
|
||||
def _read_image(self, path: str) -> ReadResult:
|
||||
"""Read an image file, returning base64 content."""
|
||||
# Get file size (wc -c is POSIX, works on Linux + macOS)
|
||||
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
stat_result = self._exec(stat_cmd)
|
||||
try:
|
||||
file_size = int(stat_result.stdout.strip())
|
||||
except ValueError:
|
||||
file_size = 0
|
||||
|
||||
if file_size > self.MAX_IMAGE_BYTES:
|
||||
return ReadResult(
|
||||
is_image=True,
|
||||
is_binary=True,
|
||||
file_size=file_size,
|
||||
hint=(
|
||||
f"Image is too large to inline ({file_size:,} bytes). "
|
||||
"Use vision_analyze to inspect the image, or reference it by path."
|
||||
),
|
||||
)
|
||||
|
||||
# Get base64 content — pipe through tr to strip newlines portably.
|
||||
# GNU base64 supports -w 0 but macOS base64 does not; both wrap by
|
||||
# default, so stripping with tr is portable across all backends.
|
||||
b64_cmd = f"base64 {self._escape_shell_arg(path)} 2>/dev/null | tr -d '\\n'"
|
||||
b64_result = self._exec(b64_cmd, timeout=30)
|
||||
|
||||
if b64_result.exit_code != 0:
|
||||
return ReadResult(
|
||||
is_image=True,
|
||||
is_binary=True,
|
||||
file_size=file_size,
|
||||
error=f"Failed to read image: {b64_result.stdout}"
|
||||
)
|
||||
|
||||
# Try to get dimensions (requires ImageMagick)
|
||||
dimensions = None
|
||||
if self._has_command('identify'):
|
||||
dim_cmd = f"identify -format '%wx%h' {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
dim_result = self._exec(dim_cmd)
|
||||
if dim_result.exit_code == 0:
|
||||
dimensions = dim_result.stdout.strip()
|
||||
|
||||
# Determine MIME type from extension
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
mime_types = {
|
||||
'.png': 'image/png',
|
||||
'.jpg': 'image/jpeg',
|
||||
'.jpeg': 'image/jpeg',
|
||||
'.gif': 'image/gif',
|
||||
'.webp': 'image/webp',
|
||||
'.bmp': 'image/bmp',
|
||||
'.ico': 'image/x-icon',
|
||||
}
|
||||
mime_type = mime_types.get(ext, 'application/octet-stream')
|
||||
|
||||
return ReadResult(
|
||||
is_image=True,
|
||||
is_binary=True,
|
||||
file_size=file_size,
|
||||
base64_content=b64_result.stdout,
|
||||
mime_type=mime_type,
|
||||
dimensions=dimensions
|
||||
)
|
||||
|
||||
def _suggest_similar_files(self, path: str) -> ReadResult:
|
||||
"""Suggest similar files when the requested file is not found."""
|
||||
# Get directory and filename
|
||||
|
|
|
|||
|
|
@ -713,11 +713,6 @@ FILE_TOOLS = [
|
|||
]
|
||||
|
||||
|
||||
def get_file_tools():
|
||||
"""Get the list of file tool definitions."""
|
||||
return FILE_TOOLS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schemas + Registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -714,11 +714,6 @@ class ProcessRegistry:
|
|||
oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
|
||||
del self._finished[oldest_id]
|
||||
|
||||
def cleanup_expired(self):
|
||||
"""Public method to prune expired finished sessions."""
|
||||
with self._lock:
|
||||
self._prune_if_needed()
|
||||
|
||||
# ----- Checkpoint (crash recovery) -----
|
||||
|
||||
def _write_checkpoint(self):
|
||||
|
|
|
|||
|
|
@ -92,11 +92,6 @@ VALID_NAME_RE = re.compile(r'^[a-z0-9][a-z0-9._-]*$')
|
|||
ALLOWED_SUBDIRS = {"references", "templates", "scripts", "assets"}
|
||||
|
||||
|
||||
def check_skill_manage_requirements() -> bool:
|
||||
"""Skill management has no external requirements -- always available."""
|
||||
return True
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Validation helpers
|
||||
# =============================================================================
|
||||
|
|
|
|||
|
|
@ -142,12 +142,6 @@ from tools.approval import (
|
|||
)
|
||||
|
||||
|
||||
def _check_dangerous_command(command: str, env_type: str) -> dict:
|
||||
"""Delegate to the consolidated approval module, passing the CLI callback."""
|
||||
return _check_dangerous_command_impl(command, env_type,
|
||||
approval_callback=_approval_callback)
|
||||
|
||||
|
||||
def _check_all_guards(command: str, env_type: str) -> dict:
|
||||
"""Delegate to consolidated guard (tirith + dangerous cmd) with CLI callback."""
|
||||
return _check_all_guards_impl(command, env_type,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue