diff --git a/gateway/memory_monitor.py b/gateway/memory_monitor.py new file mode 100644 index 00000000000..bacbbba34ef --- /dev/null +++ b/gateway/memory_monitor.py @@ -0,0 +1,230 @@ +"""Periodic process memory usage logging for the gateway. + +Ported from cline/cline#10343 (src/standalone/memory-monitor.ts). + +The gateway is a long-lived process that accumulates memory as it caches +agent instances, session transcripts, tool schemas, memory providers, MCP +connections, etc. A slow leak in any of those subsystems is invisible +in a single log line — you only see it by watching RSS climb over hours. + +This module emits a single structured ``[MEMORY] ...`` line every N +minutes (default 5) so maintainers investigating a suspected leak can +grep ``agent.log`` / ``gateway.log`` for a time series of RSS + Python +GC stats. The timer runs in a background thread and shuts down cleanly +with the gateway. + +Design notes (parity with the Cline port): + * Grep-friendly single-line format beginning ``[MEMORY]``. + * Final snapshot logged on shutdown so "last RSS before exit" is + always in the log. + * Baseline snapshot logged immediately on start. + * Daemon thread — never blocks process exit. + * Uses ``resource`` (stdlib, Linux/macOS) first and falls back to + ``psutil`` when ``resource`` isn't available (Windows). Both are + optional; when neither works we emit a single WARNING and disable + the monitor rather than crashing the gateway. + +Config: ``logging.memory_monitor`` in ``config.yaml`` — see +``hermes_cli/config.py`` for the defaults block. +""" + +from __future__ import annotations + +import gc +import logging +import os +import sys +import threading +import time +from typing import Optional + +logger = logging.getLogger(__name__) + +_BYTES_TO_MB = 1024 * 1024 + +_monitor_thread: Optional[threading.Thread] = None +_stop_event: Optional[threading.Event] = None +_start_time: Optional[float] = None +_interval_seconds: float = 300.0 # 5 minutes +_lock = threading.Lock() + + +def _get_rss_mb() -> Optional[int]: + """Return current process resident set size in MB, or None if unavailable. + + Tries ``resource.getrusage`` first (Linux/macOS, no extra deps), then + falls back to ``psutil`` which is an optional hermes-agent dep. + """ + # Linux / macOS — resource is stdlib. On Linux ru_maxrss is in KB, + # on macOS it is in bytes (yes, really). We use it as a cheap + # "current" RSS — ru_maxrss reports the high-water mark for the + # process, which is what you actually want for leak detection. + try: + import resource + + maxrss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if sys.platform == "darwin": + return int(maxrss / _BYTES_TO_MB) + # Linux / other unices: KB + return int(maxrss / 1024) + except Exception: + pass + + # Fallback: psutil (Windows, or unusual unix without resource). + try: + import psutil # type: ignore + + rss = psutil.Process(os.getpid()).memory_info().rss + return int(rss / _BYTES_TO_MB) + except Exception: + return None + + +def log_memory_usage(prefix: str = "") -> None: + """Log current memory usage in a grep-friendly ``[MEMORY] ...`` line. + + Safe to call on-demand from any thread at important lifecycle + moments (after shutdown, after context compression, etc.). + + Parameters + ---------- + prefix + Optional extra tag inserted after ``[MEMORY]`` — e.g. + ``"baseline"``, ``"shutdown"``. + """ + rss = _get_rss_mb() + uptime = int(time.monotonic() - _start_time) if _start_time else 0 + # gc.get_stats() returns per-generation collection counts; the sum + # is a cheap proxy for "how much garbage have we created". + try: + gc_counts = gc.get_count() # (gen0, gen1, gen2) + except Exception: + gc_counts = (0, 0, 0) + # Thread count is a handy correlate when diagnosing thread leaks. + try: + thread_count = threading.active_count() + except Exception: + thread_count = 0 + + tag = f"{prefix} " if prefix else "" + if rss is None: + logger.info( + "[MEMORY] %srss=unavailable gc=%s threads=%d uptime=%ds", + tag, + gc_counts, + thread_count, + uptime, + ) + else: + logger.info( + "[MEMORY] %srss=%dMB gc=%s threads=%d uptime=%ds", + tag, + rss, + gc_counts, + thread_count, + uptime, + ) + + +def _monitor_loop(stop_event: threading.Event, interval: float) -> None: + """Background thread body — log every ``interval`` seconds until stopped.""" + while not stop_event.wait(interval): + try: + log_memory_usage() + except Exception as e: + # Never let the monitor crash the gateway; just log and carry on. + logger.debug("Memory monitor iteration failed: %s", e) + + +def start_memory_monitoring(interval_seconds: float = 300.0) -> bool: + """Start periodic memory usage logging in a daemon thread. + + Logs immediately to capture a baseline, then every ``interval_seconds``. + Safe to call multiple times — subsequent calls are no-ops while the + first monitor is still running. + + Parameters + ---------- + interval_seconds + How often to log. Default 300s (5 minutes), matching the + upstream cline/cline implementation. + + Returns + ------- + bool + True if a fresh monitor thread was started, False if one was + already running or if memory introspection isn't available. + """ + global _monitor_thread, _stop_event, _start_time, _interval_seconds + + with _lock: + if _monitor_thread is not None and _monitor_thread.is_alive(): + return False + + # Sanity-check that we can read RSS at all. If neither resource + # nor psutil works, no point spinning a thread that can only log + # "rss=unavailable" forever — warn once and bail. + if _get_rss_mb() is None: + logger.warning( + "[MEMORY] Memory monitoring unavailable: neither resource.getrusage " + "nor psutil could read process RSS — skipping periodic logging.", + ) + return False + + _start_time = time.monotonic() + _interval_seconds = float(interval_seconds) + _stop_event = threading.Event() + + # Baseline snapshot before the loop starts. + log_memory_usage(prefix="baseline") + + _monitor_thread = threading.Thread( + target=_monitor_loop, + args=(_stop_event, _interval_seconds), + name="gateway-memory-monitor", + daemon=True, + ) + _monitor_thread.start() + + logger.info( + "[MEMORY] Periodic memory monitoring started (interval: %ds)", + int(_interval_seconds), + ) + return True + + +def stop_memory_monitoring(timeout: float = 2.0) -> None: + """Stop the monitor thread and log a final snapshot. + + Safe to call even if ``start_memory_monitoring()`` was never called. + """ + global _monitor_thread, _stop_event + + with _lock: + if _stop_event is None or _monitor_thread is None: + return + + # Final snapshot before teardown so "last RSS" is always in the log. + try: + log_memory_usage(prefix="shutdown") + except Exception: + pass + + _stop_event.set() + thread = _monitor_thread + _monitor_thread = None + _stop_event = None + + # Join outside the lock so a stuck log call can't deadlock shutdown. + try: + thread.join(timeout=timeout) + except Exception: + pass + + logger.info("[MEMORY] Periodic memory monitoring stopped") + + +def is_running() -> bool: + """True if the background monitor thread is alive.""" + with _lock: + return _monitor_thread is not None and _monitor_thread.is_alive() diff --git a/gateway/run.py b/gateway/run.py index f9a282a413f..a5eaafcb063 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -16800,6 +16800,33 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = from hermes_logging import setup_logging setup_logging(hermes_home=_hermes_home, mode="gateway") + # Periodic process memory usage logging (gateway only) — emits a + # grep-friendly "[MEMORY] rss=...MB ..." line every N minutes so + # slow leaks in the long-lived gateway process show up as a time + # series in agent.log / gateway.log. Ported from cline/cline#10343. + # Controlled by the logging.memory_monitor section in config.yaml. + try: + from gateway import memory_monitor as _memory_monitor + + _mm_cfg = {} + try: + # config is loaded a few lines up; re-read the logging section + # here so we pick up user overrides without coupling to local + # variable names inside the start_gateway body. + from hermes_cli.config import load_config as _load_cli_config + + _mm_cfg = (_load_cli_config() or {}).get("logging", {}).get("memory_monitor", {}) or {} + except Exception: + _mm_cfg = {} + if _mm_cfg.get("enabled", True): + try: + _mm_interval = float(_mm_cfg.get("interval_seconds", 300)) + except (TypeError, ValueError): + _mm_interval = 300.0 + _memory_monitor.start_memory_monitoring(interval_seconds=_mm_interval) + except Exception as _mm_exc: + logger.debug("Failed to start memory monitor: %s", _mm_exc) + # Optional stderr handler — level driven by -v/-q flags on the CLI. # verbosity=None (-q/--quiet): no stderr output # verbosity=0 (default): WARNING and above @@ -17016,6 +17043,16 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = except Exception: pass + # Stop the periodic memory monitor (if it was started above). + # This also emits one final "[MEMORY] shutdown rss=..." line so the + # last RSS reading before gateway exit is always in the log. + try: + from gateway import memory_monitor as _memory_monitor + + _memory_monitor.stop_memory_monitoring(timeout=2.0) + except Exception: + pass + if runner.exit_code is not None: raise SystemExit(runner.exit_code) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 574f2397d91..81706d1edb4 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1477,6 +1477,15 @@ DEFAULT_CONFIG = { "level": "INFO", # Minimum level for agent.log: DEBUG, INFO, WARNING "max_size_mb": 5, # Max size per log file before rotation "backup_count": 3, # Number of rotated backup files to keep + # Periodic process memory usage logging (gateway only). Emits a + # grep-friendly "[MEMORY] rss=...MB ..." line at the configured + # interval so slow leaks in the long-lived gateway are visible + # in agent.log / gateway.log as a time series. Ported from + # cline/cline#10343. + "memory_monitor": { + "enabled": True, # Flip to false to silence the periodic line + "interval_seconds": 300, # Default: every 5 minutes + }, }, # Remotely-hosted model catalog manifest. When enabled, the CLI fetches diff --git a/tests/gateway/test_memory_monitor.py b/tests/gateway/test_memory_monitor.py new file mode 100644 index 00000000000..64903dc81f8 --- /dev/null +++ b/tests/gateway/test_memory_monitor.py @@ -0,0 +1,122 @@ +"""Tests for gateway.memory_monitor — periodic process memory logging. + +Ported from cline/cline#10343. The module logs a structured +``[MEMORY] rss=...MB ...`` line periodically so long-running gateway +leaks show up as a time series in agent.log / gateway.log. +""" + +from __future__ import annotations + +import logging +import time + +import pytest + +from gateway import memory_monitor as mm + + +@pytest.fixture(autouse=True) +def _ensure_monitor_stopped(): + """Every test starts from a clean state and leaves one behind.""" + mm.stop_memory_monitoring(timeout=1.0) + yield + mm.stop_memory_monitoring(timeout=1.0) + + +def test_log_memory_usage_emits_memory_line(caplog): + caplog.set_level(logging.INFO, logger="gateway.memory_monitor") + mm.log_memory_usage() + memory_lines = [r for r in caplog.records if "[MEMORY]" in r.getMessage()] + assert memory_lines, "expected at least one [MEMORY] log record" + + +def test_log_memory_usage_has_grep_friendly_format(caplog): + caplog.set_level(logging.INFO, logger="gateway.memory_monitor") + mm.log_memory_usage() + msg = caplog.records[-1].getMessage() + # Grep-friendly contract: line starts with [MEMORY] and carries RSS + # (or 'unavailable'), GC counts, thread count, uptime. + assert msg.startswith("[MEMORY]"), msg + assert "rss=" in msg + assert "gc=" in msg + assert "threads=" in msg + assert "uptime=" in msg + + +def test_log_memory_usage_with_prefix(caplog): + caplog.set_level(logging.INFO, logger="gateway.memory_monitor") + mm.log_memory_usage(prefix="baseline") + msg = caplog.records[-1].getMessage() + assert "[MEMORY] baseline " in msg + + +def test_start_logs_baseline_and_returns_true(caplog): + caplog.set_level(logging.INFO, logger="gateway.memory_monitor") + # Large interval so the background timer never fires during the test — + # we're only checking the synchronous baseline behavior here. + started = mm.start_memory_monitoring(interval_seconds=3600.0) + assert started is True + assert mm.is_running() is True + + messages = [r.getMessage() for r in caplog.records] + assert any("[MEMORY] baseline " in m for m in messages), messages + assert any("Periodic memory monitoring started" in m for m in messages), messages + + +def test_double_start_is_noop(): + assert mm.start_memory_monitoring(interval_seconds=3600.0) is True + assert mm.start_memory_monitoring(interval_seconds=3600.0) is False + assert mm.is_running() is True + + +def test_stop_logs_shutdown_snapshot(caplog): + mm.start_memory_monitoring(interval_seconds=3600.0) + caplog.clear() + caplog.set_level(logging.INFO, logger="gateway.memory_monitor") + mm.stop_memory_monitoring(timeout=1.0) + assert mm.is_running() is False + + messages = [r.getMessage() for r in caplog.records] + assert any("[MEMORY] shutdown " in m for m in messages), messages + assert any("Periodic memory monitoring stopped" in m for m in messages), messages + + +def test_stop_without_start_is_noop(): + # Must not raise, must not log shutdown snapshot. + mm.stop_memory_monitoring(timeout=0.5) + assert mm.is_running() is False + + +def test_periodic_timer_fires(caplog): + caplog.set_level(logging.INFO, logger="gateway.memory_monitor") + # Short interval so we can observe multiple ticks inside the test budget. + mm.start_memory_monitoring(interval_seconds=0.1) + time.sleep(0.45) + mm.stop_memory_monitoring(timeout=1.0) + + periodic = [ + r for r in caplog.records + if r.getMessage().startswith("[MEMORY] rss=") or r.getMessage().startswith("[MEMORY] rss=unavailable") + ] + # baseline + at least 2 periodic + shutdown — but shutdown has the + # "shutdown " prefix so it won't match the strict "[MEMORY] rss=" start. + # We expect >= 3 bare "[MEMORY] rss=..." lines. + assert len(periodic) >= 3, [r.getMessage() for r in caplog.records] + + +def test_thread_is_daemon(): + mm.start_memory_monitoring(interval_seconds=3600.0) + assert mm._monitor_thread is not None + assert mm._monitor_thread.daemon is True, ( + "memory monitor thread must be daemon so it can never block process exit" + ) + + +def test_unavailable_rss_warns_and_does_not_start(caplog, monkeypatch): + # Force both backends to claim unavailable; start should bail. + monkeypatch.setattr(mm, "_get_rss_mb", lambda: None) + caplog.set_level(logging.WARNING, logger="gateway.memory_monitor") + started = mm.start_memory_monitoring(interval_seconds=3600.0) + assert started is False + assert mm.is_running() is False + assert any("Memory monitoring unavailable" in r.getMessage() for r in caplog.records)