hermes-agent/tools/memory_tool.py
Teknium 3dfb357001 feat(cross-platform): psutil for PID/process management + Windows footgun checker
## Why

Hermes supports Linux, macOS, and native Windows, but the codebase grew up
POSIX-first and has accumulated patterns that silently break (or worse,
silently kill!) on Windows:

- `os.kill(pid, 0)` as a liveness probe — on Windows this maps to
  CTRL_C_EVENT and broadcasts Ctrl+C to the target's entire console
  process group (bpo-14484, open since 2012).
- `os.killpg` — doesn't exist on Windows at all (AttributeError).
- `os.setsid` / `os.getuid` / `os.geteuid` — same.
- `signal.SIGKILL` / `signal.SIGHUP` / `signal.SIGUSR1` — module-attr
  errors at runtime on Windows.
- `open(path)` / `open(path, "r")` without explicit encoding= — inherits
  the platform default, which is cp1252/mbcs on Windows (UTF-8 on POSIX),
  causing mojibake round-tripping between hosts.
- `wmic` — removed from Windows 10 21H1+.

This commit does three things:

1. Makes `psutil` a core dependency and migrates critical callsites to it.
2. Adds a grep-based CI gate (`scripts/check-windows-footguns.py`) that
   blocks new instances of any of the above patterns.
3. Fixes every existing instance in the codebase so the baseline is clean.

## What changed

### 1. psutil as a core dependency (pyproject.toml)

Added `psutil>=5.9.0,<8` to core deps. psutil is the canonical
cross-platform answer for "is this PID alive" and "kill this process
tree" — its `pid_exists()` uses `OpenProcess + GetExitCodeProcess` on
Windows (NOT a signal call), and its `Process.children(recursive=True)`
+ `.kill()` combo replaces `os.killpg()` portably.

### 2. `gateway/status.py::_pid_exists`

Rewrote to call `psutil.pid_exists()` first, falling back to the
hand-rolled ctypes `OpenProcess + WaitForSingleObject` dance on Windows
(and `os.kill(pid, 0)` on POSIX) only if psutil is somehow missing —
e.g. during the scaffold phase of a fresh install before pip finishes.

### 3. `os.killpg` migration to psutil (7 callsites, 5 files)

- `tools/code_execution_tool.py`
- `tools/process_registry.py`
- `tools/tts_tool.py`
- `tools/environments/local.py` (3 sites kept as-is, suppressed with
  `# windows-footgun: ok` — the pgid semantics psutil can't replicate,
  and the calls are already Windows-guarded at the outer branch)
- `gateway/platforms/whatsapp.py`

### 4. `scripts/check-windows-footguns.py` (NEW, 500 lines)

Grep-based checker with 11 rules covering every Windows cross-platform
footgun we've hit so far:

1. `os.kill(pid, 0)` — the silent killer
2. `os.setsid` without guard
3. `os.killpg` (recommends psutil)
4. `os.getuid` / `os.geteuid` / `os.getgid`
5. `os.fork`
6. `signal.SIGKILL`
7. `signal.SIGHUP/SIGUSR1/SIGUSR2/SIGALRM/SIGCHLD/SIGPIPE/SIGQUIT`
8. `subprocess` shebang script invocation
9. `wmic` without `shutil.which` guard
10. Hardcoded `~/Desktop` (OneDrive trap)
11. `asyncio.add_signal_handler` without try/except
12. `open()` without `encoding=` on text mode

Features:
- Triple-quoted-docstring aware (won't flag prose inside docstrings)
- Trailing-comment aware (won't flag mentions in `# os.kill(pid, 0)` comments)
- Guard-hint aware (skips lines with `hasattr(os, ...)`,
  `shutil.which(...)`, `if platform.system() != 'Windows'`, etc.)
- Inline suppression with `# windows-footgun: ok — <reason>`
- `--list` to print all rules with fixes
- `--all` / `--diff <ref>` / staged-files (default) modes
- Scans 380 files in under 2 seconds

### 5. CI integration

A GitHub Actions workflow that runs the checker on every PR and push is
staged at `/tmp/hermes-stash/windows-footguns.yml` — not included in this
commit because the GH token on the push machine lacks `workflow` scope.
A maintainer with `workflow` permissions should add it as
`.github/workflows/windows-footguns.yml` in a follow-up. Content:

```yaml
name: Windows footgun check
on:
  push:
    branches: [main]
  pull_request:
    branches: [main]
jobs:
  check:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.11"}
      - run: python scripts/check-windows-footguns.py --all
```

### 6. CONTRIBUTING.md — "Cross-Platform Compatibility" expansion

Expanded from 5 to 16 rules, each with message, example, and fix.
Recommends psutil as the preferred API for PID / process-tree operations.

### 7. Baseline cleanup (91 → 0 findings)

- 14 `open()` sites → added `encoding='utf-8'` (internal logs/caches) or
  `encoding='utf-8-sig'` (user-editable files that Notepad may BOM)
- 23 POSIX-only callsites in systemd helpers, pty_bridge, and plugin
  tool subprocess management → annotated with
  `# windows-footgun: ok — <reason>`
- 7 `os.killpg` sites → migrated to psutil (see §3 above)

## Verification

```
$ python scripts/check-windows-footguns.py --all
✓ No Windows footguns found (380 file(s) scanned).

$ python -c "from gateway.status import _pid_exists; import os
> print('self:', _pid_exists(os.getpid())); print('bogus:', _pid_exists(999999))"
self: True
bogus: False
```

Proof-of-repro that `os.kill(pid, 0)` was actually killing processes
before this fix — see commit `1cbe39914` and bpo-14484. This commit
removes the last hand-rolled ctypes path from the hot liveness-check
path and defers to the best-maintained cross-platform answer.
2026-05-08 12:57:33 -07:00

586 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Memory Tool Module - Persistent Curated Memory
Provides bounded, file-backed memory that persists across sessions. Two stores:
- MEMORY.md: agent's personal notes and observations (environment facts, project
conventions, tool quirks, things learned)
- USER.md: what the agent knows about the user (preferences, communication style,
expectations, workflow habits)
Both are injected into the system prompt as a frozen snapshot at session start.
Mid-session writes update files on disk immediately (durable) but do NOT change
the system prompt -- this preserves the prefix cache for the entire session.
The snapshot refreshes on the next session start.
Entry delimiter: § (section sign). Entries can be multiline.
Character limits (not tokens) because char counts are model-independent.
Design:
- Single `memory` tool with action parameter: add, replace, remove, read
- replace/remove use short unique substring matching (not full text or IDs)
- Behavioral guidance lives in the tool schema description
- Frozen snapshot pattern: system prompt is stable, tool responses show live state
"""
import json
import logging
import os
import re
import tempfile
from contextlib import contextmanager
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Dict, Any, List, Optional
from utils import atomic_replace
# fcntl is Unix-only; on Windows use msvcrt for file locking
msvcrt = None
try:
import fcntl
except ImportError:
fcntl = None
try:
import msvcrt
except ImportError:
pass
logger = logging.getLogger(__name__)
# Where memory files live — resolved dynamically so profile overrides
# (HERMES_HOME env var changes) are always respected. The old module-level
# constant was cached at import time and could go stale if a profile switch
# happened after the first import.
def get_memory_dir() -> Path:
"""Return the profile-scoped memories directory."""
return get_hermes_home() / "memories"
ENTRY_DELIMITER = "\n§\n"
# ---------------------------------------------------------------------------
# Memory content scanning — lightweight check for injection/exfiltration
# in content that gets injected into the system prompt.
# ---------------------------------------------------------------------------
_MEMORY_THREAT_PATTERNS = [
# Prompt injection
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
(r'you\s+are\s+now\s+', "role_hijack"),
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
(r'system\s+prompt\s+override', "sys_prompt_override"),
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
# Exfiltration via curl/wget with secrets
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
(r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"),
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)', "read_secrets"),
# Persistence via shell rc
(r'authorized_keys', "ssh_backdoor"),
(r'\$HOME/\.ssh|\~/\.ssh', "ssh_access"),
(r'\$HOME/\.hermes/\.env|\~/\.hermes/\.env', "hermes_env"),
]
# Subset of invisible chars for injection detection
_INVISIBLE_CHARS = {
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
}
def _scan_memory_content(content: str) -> Optional[str]:
"""Scan memory content for injection/exfil patterns. Returns error string if blocked."""
# Check invisible unicode
for char in _INVISIBLE_CHARS:
if char in content:
return f"Blocked: content contains invisible unicode character U+{ord(char):04X} (possible injection)."
# Check threat patterns
for pattern, pid in _MEMORY_THREAT_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
return f"Blocked: content matches threat pattern '{pid}'. Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads."
return None
class MemoryStore:
"""
Bounded curated memory with file persistence. One instance per AIAgent.
Maintains two parallel states:
- _system_prompt_snapshot: frozen at load time, used for system prompt injection.
Never mutated mid-session. Keeps prefix cache stable.
- memory_entries / user_entries: live state, mutated by tool calls, persisted to disk.
Tool responses always reflect this live state.
"""
def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375):
self.memory_entries: List[str] = []
self.user_entries: List[str] = []
self.memory_char_limit = memory_char_limit
self.user_char_limit = user_char_limit
# Frozen snapshot for system prompt -- set once at load_from_disk()
self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""}
def load_from_disk(self):
"""Load entries from MEMORY.md and USER.md, capture system prompt snapshot."""
mem_dir = get_memory_dir()
mem_dir.mkdir(parents=True, exist_ok=True)
self.memory_entries = self._read_file(mem_dir / "MEMORY.md")
self.user_entries = self._read_file(mem_dir / "USER.md")
# Deduplicate entries (preserves order, keeps first occurrence)
self.memory_entries = list(dict.fromkeys(self.memory_entries))
self.user_entries = list(dict.fromkeys(self.user_entries))
# Capture frozen snapshot for system prompt injection
self._system_prompt_snapshot = {
"memory": self._render_block("memory", self.memory_entries),
"user": self._render_block("user", self.user_entries),
}
@staticmethod
@contextmanager
def _file_lock(path: Path):
"""Acquire an exclusive file lock for read-modify-write safety.
Uses a separate .lock file so the memory file itself can still be
atomically replaced via os.replace().
"""
lock_path = path.with_suffix(path.suffix + ".lock")
lock_path.parent.mkdir(parents=True, exist_ok=True)
if fcntl is None and msvcrt is None:
yield
return
if msvcrt and (not lock_path.exists() or lock_path.stat().st_size == 0):
lock_path.write_text(" ", encoding="utf-8")
fd = open(lock_path, "r+" if msvcrt else "a+", encoding="utf-8")
try:
if fcntl:
fcntl.flock(fd, fcntl.LOCK_EX)
else:
fd.seek(0)
msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1)
yield
finally:
if fcntl:
fcntl.flock(fd, fcntl.LOCK_UN)
elif msvcrt:
try:
fd.seek(0)
msvcrt.locking(fd.fileno(), msvcrt.LK_UNLCK, 1)
except (OSError, IOError):
pass
fd.close()
@staticmethod
def _path_for(target: str) -> Path:
mem_dir = get_memory_dir()
if target == "user":
return mem_dir / "USER.md"
return mem_dir / "MEMORY.md"
def _reload_target(self, target: str):
"""Re-read entries from disk into in-memory state.
Called under file lock to get the latest state before mutating.
"""
fresh = self._read_file(self._path_for(target))
fresh = list(dict.fromkeys(fresh)) # deduplicate
self._set_entries(target, fresh)
def save_to_disk(self, target: str):
"""Persist entries to the appropriate file. Called after every mutation."""
get_memory_dir().mkdir(parents=True, exist_ok=True)
self._write_file(self._path_for(target), self._entries_for(target))
def _entries_for(self, target: str) -> List[str]:
if target == "user":
return self.user_entries
return self.memory_entries
def _set_entries(self, target: str, entries: List[str]):
if target == "user":
self.user_entries = entries
else:
self.memory_entries = entries
def _char_count(self, target: str) -> int:
entries = self._entries_for(target)
if not entries:
return 0
return len(ENTRY_DELIMITER.join(entries))
def _char_limit(self, target: str) -> int:
if target == "user":
return self.user_char_limit
return self.memory_char_limit
def add(self, target: str, content: str) -> Dict[str, Any]:
"""Append a new entry. Returns error if it would exceed the char limit."""
content = content.strip()
if not content:
return {"success": False, "error": "Content cannot be empty."}
# Scan for injection/exfiltration before accepting
scan_error = _scan_memory_content(content)
if scan_error:
return {"success": False, "error": scan_error}
with self._file_lock(self._path_for(target)):
# Re-read from disk under lock to pick up writes from other sessions
self._reload_target(target)
entries = self._entries_for(target)
limit = self._char_limit(target)
# Reject exact duplicates
if content in entries:
return self._success_response(target, "Entry already exists (no duplicate added).")
# Calculate what the new total would be
new_entries = entries + [content]
new_total = len(ENTRY_DELIMITER.join(new_entries))
if new_total > limit:
current = self._char_count(target)
return {
"success": False,
"error": (
f"Memory at {current:,}/{limit:,} chars. "
f"Adding this entry ({len(content)} chars) would exceed the limit. "
f"Replace or remove existing entries first."
),
"current_entries": entries,
"usage": f"{current:,}/{limit:,}",
}
entries.append(content)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry added.")
def replace(self, target: str, old_text: str, new_content: str) -> Dict[str, Any]:
"""Find entry containing old_text substring, replace it with new_content."""
old_text = old_text.strip()
new_content = new_content.strip()
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
if not new_content:
return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."}
# Scan replacement content for injection/exfiltration
scan_error = _scan_memory_content(new_content)
if scan_error:
return {"success": False, "error": scan_error}
with self._file_lock(self._path_for(target)):
self._reload_target(target)
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
unique_texts = set(e for _, e in matches)
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
# All identical -- safe to replace just the first
idx = matches[0][0]
limit = self._char_limit(target)
# Check that replacement doesn't blow the budget
test_entries = entries.copy()
test_entries[idx] = new_content
new_total = len(ENTRY_DELIMITER.join(test_entries))
if new_total > limit:
return {
"success": False,
"error": (
f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
f"Shorten the new content or remove other entries first."
),
}
entries[idx] = new_content
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry replaced.")
def remove(self, target: str, old_text: str) -> Dict[str, Any]:
"""Remove the entry containing old_text substring."""
old_text = old_text.strip()
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
with self._file_lock(self._path_for(target)):
self._reload_target(target)
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
unique_texts = set(e for _, e in matches)
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
# All identical -- safe to remove just the first
idx = matches[0][0]
entries.pop(idx)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry removed.")
def format_for_system_prompt(self, target: str) -> Optional[str]:
"""
Return the frozen snapshot for system prompt injection.
This returns the state captured at load_from_disk() time, NOT the live
state. Mid-session writes do not affect this. This keeps the system
prompt stable across all turns, preserving the prefix cache.
Returns None if the snapshot is empty (no entries at load time).
"""
block = self._system_prompt_snapshot.get(target, "")
return block if block else None
# -- Internal helpers --
def _success_response(self, target: str, message: str = None) -> Dict[str, Any]:
entries = self._entries_for(target)
current = self._char_count(target)
limit = self._char_limit(target)
pct = min(100, int((current / limit) * 100)) if limit > 0 else 0
resp = {
"success": True,
"target": target,
"entries": entries,
"usage": f"{pct}% — {current:,}/{limit:,} chars",
"entry_count": len(entries),
}
if message:
resp["message"] = message
return resp
def _render_block(self, target: str, entries: List[str]) -> str:
"""Render a system prompt block with header and usage indicator."""
if not entries:
return ""
limit = self._char_limit(target)
content = ENTRY_DELIMITER.join(entries)
current = len(content)
pct = min(100, int((current / limit) * 100)) if limit > 0 else 0
if target == "user":
header = f"USER PROFILE (who the user is) [{pct}% — {current:,}/{limit:,} chars]"
else:
header = f"MEMORY (your personal notes) [{pct}% — {current:,}/{limit:,} chars]"
separator = "" * 46
return f"{separator}\n{header}\n{separator}\n{content}"
@staticmethod
def _read_file(path: Path) -> List[str]:
"""Read a memory file and split into entries.
No file locking needed: _write_file uses atomic rename, so readers
always see either the previous complete file or the new complete file.
"""
if not path.exists():
return []
try:
raw = path.read_text(encoding="utf-8")
except (OSError, IOError):
return []
if not raw.strip():
return []
# Use ENTRY_DELIMITER for consistency with _write_file. Splitting by "§"
# alone would incorrectly split entries that contain "§" in their content.
entries = [e.strip() for e in raw.split(ENTRY_DELIMITER)]
return [e for e in entries if e]
@staticmethod
def _write_file(path: Path, entries: List[str]):
"""Write entries to a memory file using atomic temp-file + rename.
Previous implementation used open("w") + flock, but "w" truncates the
file *before* the lock is acquired, creating a race window where
concurrent readers see an empty file. Atomic rename avoids this:
readers always see either the old complete file or the new one.
"""
content = ENTRY_DELIMITER.join(entries) if entries else ""
try:
# Write to temp file in same directory (same filesystem for atomic rename)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), suffix=".tmp", prefix=".mem_"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
atomic_replace(tmp_path, path)
except BaseException:
# Clean up temp file on any failure
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except (OSError, IOError) as e:
raise RuntimeError(f"Failed to write memory file {path}: {e}")
def memory_tool(
action: str,
target: str = "memory",
content: str = None,
old_text: str = None,
store: Optional[MemoryStore] = None,
) -> str:
"""
Single entry point for the memory tool. Dispatches to MemoryStore methods.
Returns JSON string with results.
"""
if store is None:
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
if target not in ("memory", "user"):
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
if action == "add":
if not content:
return tool_error("Content is required for 'add' action.", success=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return tool_error("old_text is required for 'replace' action.", success=False)
if not content:
return tool_error("content is required for 'replace' action.", success=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return tool_error("old_text is required for 'remove' action.", success=False)
result = store.remove(target, old_text)
else:
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps(result, ensure_ascii=False)
def check_memory_requirements() -> bool:
"""Memory tool has no external requirements -- always available."""
return True
# =============================================================================
# OpenAI Function-Calling Schema
# =============================================================================
MEMORY_SCHEMA = {
"name": "memory",
"description": (
"Save durable information to persistent memory that survives across sessions. "
"Memory is injected into future turns, so keep it compact and focused on facts "
"that will still matter later.\n\n"
"WHEN TO SAVE (do this proactively, don't wait to be asked):\n"
"- User corrects you or says 'remember this' / 'don't do that again'\n"
"- User shares a preference, habit, or personal detail (name, role, timezone, coding style)\n"
"- You discover something about the environment (OS, installed tools, project structure)\n"
"- You learn a convention, API quirk, or workflow specific to this user's setup\n"
"- You identify a stable fact that will be useful again in future sessions\n\n"
"PRIORITY: User preferences and corrections > environment facts > procedural knowledge. "
"The most valuable memory prevents the user from having to repeat themselves.\n\n"
"Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO "
"state to memory; use session_search to recall those from past transcripts.\n"
"If you've discovered a new way to do something, solved a problem that could be "
"necessary later, save it as a skill with the skill tool.\n\n"
"TWO TARGETS:\n"
"- 'user': who the user is -- name, role, preferences, communication style, pet peeves\n"
"- 'memory': your notes -- environment facts, project conventions, tool quirks, lessons learned\n\n"
"ACTIONS: add (new entry), replace (update existing -- old_text identifies it), "
"remove (delete -- old_text identifies it).\n\n"
"SKIP: trivial/obvious info, things easily re-discovered, raw data dumps, and temporary task state."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["add", "replace", "remove"],
"description": "The action to perform."
},
"target": {
"type": "string",
"enum": ["memory", "user"],
"description": "Which memory store: 'memory' for personal notes, 'user' for user profile."
},
"content": {
"type": "string",
"description": "The entry content. Required for 'add' and 'replace'."
},
"old_text": {
"type": "string",
"description": "Short unique substring identifying the entry to replace or remove."
},
},
"required": ["action", "target"],
},
}
# --- Registry ---
from tools.registry import registry, tool_error
registry.register(
name="memory",
toolset="memory",
schema=MEMORY_SCHEMA,
handler=lambda args, **kw: memory_tool(
action=args.get("action", ""),
target=args.get("target", "memory"),
content=args.get("content"),
old_text=args.get("old_text"),
store=kw.get("store")),
check_fn=check_memory_requirements,
emoji="🧠",
)