Merge remote-tracking branch 'origin/main' into hermes/hermes-1f7bfa9e

# Conflicts:
#	cron/scheduler.py
#	tools/send_message_tool.py
This commit is contained in:
Teknium 2026-04-11 19:23:02 -07:00
commit e7fc6450fc
No known key found for this signature in database
99 changed files with 9609 additions and 1075 deletions

View file

@ -40,11 +40,18 @@ def reset_current_session_key(token: contextvars.Token[str]) -> None:
def get_current_session_key(default: str = "default") -> str:
"""Return the active session key, preferring context-local state."""
"""Return the active session key, preferring context-local state.
Resolution order:
1. approval-specific contextvars (set by gateway before agent.run)
2. session_context contextvars (set by _set_session_env)
3. os.environ fallback (CLI, cron, tests)
"""
session_key = _approval_session_key.get()
if session_key:
return session_key
return os.getenv("HERMES_SESSION_KEY", default)
from gateway.session_context import get_session_env
return get_session_env("HERMES_SESSION_KEY", default)
# Sensitive write targets that should trigger approval even when referenced
# via shell expansions like $HOME or $HERMES_HOME.

View file

@ -21,6 +21,7 @@ into the user's project directory.
import hashlib
import logging
import os
import re
import shutil
import subprocess
from pathlib import Path
@ -64,23 +65,72 @@ _GIT_TIMEOUT: int = max(10, min(60, int(os.getenv("HERMES_CHECKPOINT_TIMEOUT", "
# Max files to snapshot — skip huge directories to avoid slowdowns.
_MAX_FILES = 50_000
# Valid git commit hash pattern: 440 hex chars (short or full SHA-1/SHA-256).
_COMMIT_HASH_RE = re.compile(r'^[0-9a-fA-F]{4,64}$')
# ---------------------------------------------------------------------------
# Input validation helpers
# ---------------------------------------------------------------------------
def _validate_commit_hash(commit_hash: str) -> Optional[str]:
"""Validate a commit hash to prevent git argument injection.
Returns an error string if invalid, None if valid.
Values starting with '-' would be interpreted as git flags
(e.g., '--patch', '-p') instead of revision specifiers.
"""
if not commit_hash or not commit_hash.strip():
return "Empty commit hash"
if commit_hash.startswith("-"):
return f"Invalid commit hash (must not start with '-'): {commit_hash!r}"
if not _COMMIT_HASH_RE.match(commit_hash):
return f"Invalid commit hash (expected 4-64 hex characters): {commit_hash!r}"
return None
def _validate_file_path(file_path: str, working_dir: str) -> Optional[str]:
"""Validate a file path to prevent path traversal outside the working directory.
Returns an error string if invalid, None if valid.
"""
if not file_path or not file_path.strip():
return "Empty file path"
# Reject absolute paths — restore targets must be relative to the workdir
if os.path.isabs(file_path):
return f"File path must be relative, got absolute path: {file_path!r}"
# Resolve and check containment within working_dir
abs_workdir = _normalize_path(working_dir)
resolved = (abs_workdir / file_path).resolve()
try:
resolved.relative_to(abs_workdir)
except ValueError:
return f"File path escapes the working directory via traversal: {file_path!r}"
return None
# ---------------------------------------------------------------------------
# Shadow repo helpers
# ---------------------------------------------------------------------------
def _normalize_path(path_value: str) -> Path:
"""Return a canonical absolute path for checkpoint operations."""
return Path(path_value).expanduser().resolve()
def _shadow_repo_path(working_dir: str) -> Path:
"""Deterministic shadow repo path: sha256(abs_path)[:16]."""
abs_path = str(Path(working_dir).resolve())
abs_path = str(_normalize_path(working_dir))
dir_hash = hashlib.sha256(abs_path.encode()).hexdigest()[:16]
return CHECKPOINT_BASE / dir_hash
def _git_env(shadow_repo: Path, working_dir: str) -> dict:
"""Build env dict that redirects git to the shadow repo."""
normalized_working_dir = _normalize_path(working_dir)
env = os.environ.copy()
env["GIT_DIR"] = str(shadow_repo)
env["GIT_WORK_TREE"] = str(Path(working_dir).resolve())
env["GIT_WORK_TREE"] = str(normalized_working_dir)
env.pop("GIT_INDEX_FILE", None)
env.pop("GIT_NAMESPACE", None)
env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
@ -100,7 +150,17 @@ def _run_git(
exits while preserving the normal ``ok = (returncode == 0)`` contract.
Example: ``git diff --cached --quiet`` returns 1 when changes exist.
"""
env = _git_env(shadow_repo, working_dir)
normalized_working_dir = _normalize_path(working_dir)
if not normalized_working_dir.exists():
msg = f"working directory not found: {normalized_working_dir}"
logger.error("Git command skipped: %s (%s)", " ".join(["git"] + list(args)), msg)
return False, "", msg
if not normalized_working_dir.is_dir():
msg = f"working directory is not a directory: {normalized_working_dir}"
logger.error("Git command skipped: %s (%s)", " ".join(["git"] + list(args)), msg)
return False, "", msg
env = _git_env(shadow_repo, str(normalized_working_dir))
cmd = ["git"] + list(args)
allowed_returncodes = allowed_returncodes or set()
try:
@ -110,7 +170,7 @@ def _run_git(
text=True,
timeout=timeout,
env=env,
cwd=str(Path(working_dir).resolve()),
cwd=str(normalized_working_dir),
)
ok = result.returncode == 0
stdout = result.stdout.strip()
@ -125,9 +185,14 @@ def _run_git(
msg = f"git timed out after {timeout}s: {' '.join(cmd)}"
logger.error(msg, exc_info=True)
return False, "", msg
except FileNotFoundError:
logger.error("Git executable not found: %s", " ".join(cmd), exc_info=True)
return False, "", "git not found"
except FileNotFoundError as exc:
missing_target = getattr(exc, "filename", None)
if missing_target == "git":
logger.error("Git executable not found: %s", " ".join(cmd), exc_info=True)
return False, "", "git not found"
msg = f"working directory not found: {normalized_working_dir}"
logger.error("Git command failed before execution: %s (%s)", " ".join(cmd), msg, exc_info=True)
return False, "", msg
except Exception as exc:
logger.error("Unexpected git error running %s: %s", " ".join(cmd), exc, exc_info=True)
return False, "", str(exc)
@ -154,7 +219,7 @@ def _init_shadow_repo(shadow_repo: Path, working_dir: str) -> Optional[str]:
)
(shadow_repo / "HERMES_WORKDIR").write_text(
str(Path(working_dir).resolve()) + "\n", encoding="utf-8"
str(_normalize_path(working_dir)) + "\n", encoding="utf-8"
)
logger.debug("Initialised checkpoint repo at %s for %s", shadow_repo, working_dir)
@ -229,7 +294,7 @@ class CheckpointManager:
if not self._git_available:
return False
abs_dir = str(Path(working_dir).resolve())
abs_dir = str(_normalize_path(working_dir))
# Skip root, home, and other overly broad directories
if abs_dir in ("/", str(Path.home())):
@ -254,7 +319,7 @@ class CheckpointManager:
Returns a list of dicts with keys: hash, short_hash, timestamp, reason,
files_changed, insertions, deletions. Most recent first.
"""
abs_dir = str(Path(working_dir).resolve())
abs_dir = str(_normalize_path(working_dir))
shadow = _shadow_repo_path(abs_dir)
if not (shadow / "HEAD").exists():
@ -311,7 +376,12 @@ class CheckpointManager:
Returns dict with success, diff text, and stat summary.
"""
abs_dir = str(Path(working_dir).resolve())
# Validate commit_hash to prevent git argument injection
hash_err = _validate_commit_hash(commit_hash)
if hash_err:
return {"success": False, "error": hash_err}
abs_dir = str(_normalize_path(working_dir))
shadow = _shadow_repo_path(abs_dir)
if not (shadow / "HEAD").exists():
@ -364,7 +434,19 @@ class CheckpointManager:
Returns dict with success/error info.
"""
abs_dir = str(Path(working_dir).resolve())
# Validate commit_hash to prevent git argument injection
hash_err = _validate_commit_hash(commit_hash)
if hash_err:
return {"success": False, "error": hash_err}
abs_dir = str(_normalize_path(working_dir))
# Validate file_path to prevent path traversal outside the working dir
if file_path:
path_err = _validate_file_path(file_path, abs_dir)
if path_err:
return {"success": False, "error": path_err}
shadow = _shadow_repo_path(abs_dir)
if not (shadow / "HEAD").exists():
@ -413,7 +495,7 @@ class CheckpointManager:
(directory containing .git, pyproject.toml, package.json, etc.).
Falls back to the file's parent directory.
"""
path = Path(file_path).resolve()
path = _normalize_path(file_path)
if path.is_dir():
candidate = path
else:

View file

@ -301,7 +301,7 @@ def _call(tool_name, args):
# ---------------------------------------------------------------------------
# Terminal parameters that must not be used from ephemeral sandbox scripts
_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete", "watch_patterns"}
_TERMINAL_BLOCKED_PARAMS = {"background", "pty", "notify_on_complete", "watch_patterns"}
def _rpc_server_loop(

View file

@ -456,7 +456,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
},
"deliver": {
"type": "string",
"description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, weixin, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, email, sms, bluebubbles, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'"
"description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, weixin, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, wecom_callback, email, sms, bluebubbles, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'"
},
"skills": {
"type": "array",

View file

@ -23,6 +23,19 @@ from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
# Thread-local activity callback. The agent sets this before a tool call so
# long-running _wait_for_process loops can report liveness to the gateway.
_activity_callback_local = threading.local()
def set_activity_callback(cb: Callable[[str], None] | None) -> None:
"""Register a callback that _wait_for_process fires periodically."""
_activity_callback_local.callback = cb
def _get_activity_callback() -> Callable[[str], None] | None:
return getattr(_activity_callback_local, "callback", None)
def get_sandbox_dir() -> Path:
"""Return the host-side root for all sandbox storage (Docker workspaces,
@ -370,6 +383,10 @@ class BaseEnvironment(ABC):
"""Poll-based wait with interrupt checking and stdout draining.
Shared across all backends not overridden.
Fires the ``activity_callback`` (if set on this instance) every 10s
while the process is running so the gateway's inactivity timeout
doesn't kill long-running commands.
"""
output_chunks: list[str] = []
@ -388,6 +405,8 @@ class BaseEnvironment(ABC):
drain_thread = threading.Thread(target=_drain, daemon=True)
drain_thread.start()
deadline = time.monotonic() + timeout
_last_activity_touch = time.monotonic()
_ACTIVITY_INTERVAL = 10.0 # seconds between activity touches
while proc.poll() is None:
if is_interrupted():
@ -408,6 +427,17 @@ class BaseEnvironment(ABC):
else timeout_msg.lstrip(),
"returncode": 124,
}
# Periodic activity touch so the gateway knows we're alive
_now = time.monotonic()
if _now - _last_activity_touch >= _ACTIVITY_INTERVAL:
_last_activity_touch = _now
_cb = _get_activity_callback()
if _cb:
try:
_elapsed = int(_now - (deadline - timeout))
_cb(f"terminal command running ({_elapsed}s elapsed)")
except Exception:
pass
time.sleep(0.2)
drain_thread.join(timeout=5)

View file

@ -15,7 +15,13 @@ from tools.environments.base import (
BaseEnvironment,
_ThreadedProcessHandle,
)
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
from tools.environments.file_sync import (
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
quoted_rm_command,
unique_parent_dirs,
)
logger = logging.getLogger(__name__)
@ -150,11 +156,9 @@ class DaytonaEnvironment(BaseEnvironment):
if not files:
return
# Pre-create all unique parent directories in one shell call
parents = sorted({str(Path(remote).parent) for _, remote in files})
parents = unique_parent_dirs(files)
if parents:
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(p) for p in parents)
self._sandbox.process.exec(mkdir_cmd)
self._sandbox.process.exec(quoted_mkdir_command(parents))
uploads = [
FileUpload(source=host_path, destination=remote_path)

View file

@ -10,6 +10,7 @@ import logging
import os
import shlex
import time
from pathlib import Path
from typing import Callable
from tools.environments.base import _file_mtime_key
@ -60,6 +61,16 @@ def quoted_rm_command(remote_paths: list[str]) -> str:
return "rm -f " + " ".join(shlex.quote(p) for p in remote_paths)
def quoted_mkdir_command(dirs: list[str]) -> str:
"""Build a shell ``mkdir -p`` command for a batch of directories."""
return "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
def unique_parent_dirs(files: list[tuple[str, str]]) -> list[str]:
"""Extract sorted unique parent directories from (host, remote) pairs."""
return sorted({str(Path(remote).parent) for _, remote in files})
class FileSyncManager:
"""Tracks local file changes and syncs to a remote environment.

View file

@ -5,8 +5,11 @@ wrapper, while preserving Hermes' persistent snapshot behavior across sessions.
"""
import asyncio
import base64
import io
import logging
import shlex
import tarfile
import threading
from pathlib import Path
from typing import Any, Optional
@ -18,7 +21,13 @@ from tools.environments.base import (
_load_json_store,
_save_json_store,
)
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
from tools.environments.file_sync import (
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
quoted_rm_command,
unique_parent_dirs,
)
logger = logging.getLogger(__name__)
@ -259,26 +268,84 @@ class ModalEnvironment(BaseEnvironment):
get_files_fn=lambda: iter_sync_files("/root/.hermes"),
upload_fn=self._modal_upload,
delete_fn=self._modal_delete,
bulk_upload_fn=self._modal_bulk_upload,
)
self._sync_manager.sync(force=True)
self.init_session()
def _modal_upload(self, host_path: str, remote_path: str) -> None:
"""Upload a single file via base64-over-exec."""
import base64
"""Upload a single file via base64 piped through stdin."""
content = Path(host_path).read_bytes()
b64 = base64.b64encode(content).decode("ascii")
container_dir = str(Path(remote_path).parent)
cmd = (
f"mkdir -p {shlex.quote(container_dir)} && "
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
f"base64 -d > {shlex.quote(remote_path)}"
)
async def _write():
proc = await self._sandbox.exec.aio("bash", "-c", cmd)
offset = 0
chunk_size = self._STDIN_CHUNK_SIZE
while offset < len(b64):
proc.stdin.write(b64[offset:offset + chunk_size])
await proc.stdin.drain.aio()
offset += chunk_size
proc.stdin.write_eof()
await proc.stdin.drain.aio()
await proc.wait.aio()
self._worker.run_coroutine(_write(), timeout=15)
self._worker.run_coroutine(_write(), timeout=30)
# Modal SDK stdin buffer limit (legacy server path). The command-router
# path allows 16 MB, but we must stay under the smaller 2 MB cap for
# compatibility. Chunks are written below this threshold and flushed
# individually via drain().
_STDIN_CHUNK_SIZE = 1 * 1024 * 1024 # 1 MB — safe for both transport paths
def _modal_bulk_upload(self, files: list[tuple[str, str]]) -> None:
"""Upload many files via tar archive piped through stdin.
Builds a gzipped tar archive in memory and streams it into a
``base64 -d | tar xzf -`` pipeline via the process's stdin,
avoiding the Modal SDK's 64 KB ``ARG_MAX_BYTES`` exec-arg limit.
"""
if not files:
return
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for host_path, remote_path in files:
tar.add(host_path, arcname=remote_path.lstrip("/"))
payload = base64.b64encode(buf.getvalue()).decode("ascii")
parents = unique_parent_dirs(files)
mkdir_part = quoted_mkdir_command(parents)
cmd = f"{mkdir_part} && base64 -d | tar xzf - -C /"
async def _bulk():
proc = await self._sandbox.exec.aio("bash", "-c", cmd)
# Stream payload through stdin in chunks to stay under the
# SDK's per-write buffer limit (2 MB legacy / 16 MB router).
offset = 0
chunk_size = self._STDIN_CHUNK_SIZE
while offset < len(payload):
proc.stdin.write(payload[offset:offset + chunk_size])
await proc.stdin.drain.aio()
offset += chunk_size
proc.stdin.write_eof()
await proc.stdin.drain.aio()
exit_code = await proc.wait.aio()
if exit_code != 0:
stderr_text = await proc.stderr.read.aio()
raise RuntimeError(
f"Modal bulk upload failed (exit {exit_code}): {stderr_text}"
)
self._worker.run_coroutine(_bulk(), timeout=120)
def _modal_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files via exec."""

View file

@ -1,6 +1,7 @@
"""SSH remote execution environment with ControlMaster connection persistence."""
import logging
import os
import shlex
import shutil
import subprocess
@ -8,7 +9,13 @@ import tempfile
from pathlib import Path
from tools.environments.base import BaseEnvironment, _popen_bash
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
from tools.environments.file_sync import (
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
quoted_rm_command,
unique_parent_dirs,
)
logger = logging.getLogger(__name__)
@ -50,6 +57,7 @@ class SSHEnvironment(BaseEnvironment):
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
upload_fn=self._scp_upload,
delete_fn=self._ssh_delete,
bulk_upload_fn=self._ssh_bulk_upload,
)
self._sync_manager.sync(force=True)
@ -107,9 +115,8 @@ class SSHEnvironment(BaseEnvironment):
"""Create base ~/.hermes directory tree on remote in one SSH call."""
base = f"{self._remote_home}/.hermes"
dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"]
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
cmd = self._build_ssh_command()
cmd.append(mkdir_cmd)
cmd.append(quoted_mkdir_command(dirs))
subprocess.run(cmd, capture_output=True, text=True, timeout=10)
# _get_sync_files provided via iter_sync_files in FileSyncManager init
@ -131,6 +138,84 @@ class SSHEnvironment(BaseEnvironment):
if result.returncode != 0:
raise RuntimeError(f"scp failed: {result.stderr.strip()}")
def _ssh_bulk_upload(self, files: list[tuple[str, str]]) -> None:
"""Upload many files in a single tar-over-SSH stream.
Pipes ``tar c`` on the local side through an SSH connection to
``tar x`` on the remote, transferring all files in one TCP stream
instead of spawning a subprocess per file. Directory creation is
batched into a single ``mkdir -p`` call beforehand.
Typical improvement: ~580 files goes from O(N) scp round-trips
to a single streaming transfer.
"""
if not files:
return
parents = unique_parent_dirs(files)
if parents:
cmd = self._build_ssh_command()
cmd.append(quoted_mkdir_command(parents))
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"remote mkdir failed: {result.stderr.strip()}")
# Symlink staging avoids fragile GNU tar --transform rules.
with tempfile.TemporaryDirectory(prefix="hermes-ssh-bulk-") as staging:
for host_path, remote_path in files:
staged = os.path.join(staging, remote_path.lstrip("/"))
os.makedirs(os.path.dirname(staged), exist_ok=True)
os.symlink(os.path.abspath(host_path), staged)
tar_cmd = ["tar", "-chf", "-", "-C", staging, "."]
ssh_cmd = self._build_ssh_command()
ssh_cmd.append("tar xf - -C /")
tar_proc = subprocess.Popen(
tar_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
try:
ssh_proc = subprocess.Popen(
ssh_cmd, stdin=tar_proc.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception:
tar_proc.kill()
tar_proc.wait()
raise
# Allow tar_proc to receive SIGPIPE if ssh_proc exits early
tar_proc.stdout.close()
try:
_, ssh_stderr = ssh_proc.communicate(timeout=120)
# Use communicate() instead of wait() to drain stderr and
# avoid deadlock if tar produces more than PIPE_BUF of errors.
tar_stderr_raw = b""
if tar_proc.poll() is None:
_, tar_stderr_raw = tar_proc.communicate(timeout=10)
else:
tar_stderr_raw = tar_proc.stderr.read() if tar_proc.stderr else b""
except subprocess.TimeoutExpired:
tar_proc.kill()
ssh_proc.kill()
tar_proc.wait()
ssh_proc.wait()
raise RuntimeError("SSH bulk upload timed out")
if tar_proc.returncode != 0:
raise RuntimeError(
f"tar create failed (rc={tar_proc.returncode}): "
f"{tar_stderr_raw.decode(errors='replace').strip()}"
)
if ssh_proc.returncode != 0:
raise RuntimeError(
f"tar extract over SSH failed (rc={ssh_proc.returncode}): "
f"{ssh_stderr.decode(errors='replace').strip()}"
)
logger.debug("SSH: bulk-uploaded %d file(s) via tar pipe", len(files))
def _ssh_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files in one SSH call."""
cmd = self._build_ssh_command()

View file

@ -1137,7 +1137,6 @@ def terminal_tool(
task_id: Optional[str] = None,
force: bool = False,
workdir: Optional[str] = None,
check_interval: Optional[int] = None,
pty: bool = False,
notify_on_complete: bool = False,
watch_patterns: Optional[List[str]] = None,
@ -1152,7 +1151,6 @@ def terminal_tool(
task_id: Unique identifier for environment isolation (optional)
force: If True, skip dangerous command check (use after user confirms)
workdir: Working directory for this command (optional, uses session cwd if not set)
check_interval: Seconds between auto-checks for background processes (gateway only, min 30)
pty: If True, use pseudo-terminal for interactive CLI tools (local backend only)
notify_on_complete: If True and background=True, auto-notify the agent when the process exits
watch_patterns: List of strings to watch for in background output; triggers notification on match
@ -1424,7 +1422,7 @@ def terminal_tool(
# turn. CLI mode uses the completion_queue directly.
from gateway.session_context import get_session_env as _gse
_gw_platform = _gse("HERMES_SESSION_PLATFORM", "")
if _gw_platform and not check_interval:
if _gw_platform:
_gw_chat_id = _gse("HERMES_SESSION_CHAT_ID", "")
_gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "")
_gw_user_id = _gse("HERMES_SESSION_USER_ID", "")
@ -1452,39 +1450,6 @@ def terminal_tool(
proc_session.watch_patterns = list(watch_patterns)
result_data["watch_patterns"] = proc_session.watch_patterns
# Register check_interval watcher (gateway picks this up after agent run)
if check_interval and background:
effective_interval = max(30, check_interval)
if check_interval < 30:
result_data["check_interval_note"] = (
f"Requested {check_interval}s raised to minimum 30s"
)
from gateway.session_context import get_session_env as _gse2
watcher_platform = _gse2("HERMES_SESSION_PLATFORM", "")
watcher_chat_id = _gse2("HERMES_SESSION_CHAT_ID", "")
watcher_thread_id = _gse2("HERMES_SESSION_THREAD_ID", "")
watcher_user_id = _gse2("HERMES_SESSION_USER_ID", "")
watcher_user_name = _gse2("HERMES_SESSION_USER_NAME", "")
# Store on session for checkpoint persistence
proc_session.watcher_platform = watcher_platform
proc_session.watcher_chat_id = watcher_chat_id
proc_session.watcher_user_id = watcher_user_id
proc_session.watcher_user_name = watcher_user_name
proc_session.watcher_thread_id = watcher_thread_id
proc_session.watcher_interval = effective_interval
process_registry.pending_watchers.append({
"session_id": proc_session.id,
"check_interval": effective_interval,
"session_key": session_key,
"platform": watcher_platform,
"chat_id": watcher_chat_id,
"user_id": watcher_user_id,
"user_name": watcher_user_name,
"thread_id": watcher_thread_id,
})
return json.dumps(result_data, ensure_ascii=False)
except Exception as e:
return json.dumps({
@ -1767,11 +1732,6 @@ TERMINAL_SCHEMA = {
"type": "string",
"description": "Working directory for this command (absolute path). Defaults to the session working directory."
},
"check_interval": {
"type": "integer",
"description": "Seconds between automatic status checks for background processes (gateway/messaging only, minimum 30). When set, I'll proactively report progress.",
"minimum": 30
},
"pty": {
"type": "boolean",
"description": "Run in pseudo-terminal (PTY) mode for interactive CLI tools like Codex, Claude Code, or Python REPL. Only works with local and SSH backends. Default: false.",
@ -1800,7 +1760,6 @@ def _handle_terminal(args, **kw):
timeout=args.get("timeout"),
task_id=kw.get("task_id"),
workdir=args.get("workdir"),
check_interval=args.get("check_interval"),
pty=args.get("pty", False),
notify_on_complete=args.get("notify_on_complete", False),
watch_patterns=args.get("watch_patterns"),

View file

@ -46,11 +46,11 @@ class TodoStore:
"""
if not merge:
# Replace mode: new list entirely
self._items = [self._validate(t) for t in todos]
self._items = [self._validate(t) for t in self._dedupe_by_id(todos)]
else:
# Merge mode: update existing items by id, append new ones
existing = {item["id"]: item for item in self._items}
for t in todos:
for t in self._dedupe_by_id(todos):
item_id = str(t.get("id", "")).strip()
if not item_id:
continue # Can't merge without an id
@ -143,6 +143,15 @@ class TodoStore:
return {"id": item_id, "content": content, "status": status}
@staticmethod
def _dedupe_by_id(todos: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Collapse duplicate ids, keeping the last occurrence in its position."""
last_index: Dict[str, int] = {}
for i, item in enumerate(todos):
item_id = str(item.get("id", "")).strip() or "?"
last_index[item_id] = i
return [todos[i] for i in sorted(last_index.values())]
def todo_tool(
todos: Optional[List[Dict[str, Any]]] = None,

View file

@ -24,6 +24,7 @@ Defense against context-window overflow operates at three levels:
import logging
import os
import shlex
import uuid
from tools.budget_config import (
@ -79,7 +80,7 @@ def _write_to_sandbox(content: str, remote_path: str, env) -> bool:
marker = _heredoc_marker(content)
storage_dir = os.path.dirname(remote_path)
cmd = (
f"mkdir -p {storage_dir} && cat > {remote_path} << '{marker}'\n"
f"mkdir -p {shlex.quote(storage_dir)} && cat > {shlex.quote(remote_path)} << '{marker}'\n"
f"{content}\n"
f"{marker}"
)