feat: component-separated logging with session context and filtering (#7991)

* feat: component-separated logging with session context and filtering

Phase 1 — Gateway log isolation:
- gateway.log now only receives records from gateway.* loggers
  (platform adapters, session management, slash commands, delivery)
- agent.log remains the catch-all (all components)
- errors.log remains WARNING+ catch-all
- Moved gateway.log handler creation from gateway/run.py into
  hermes_logging.setup_logging(mode='gateway') with _ComponentFilter

Phase 2 — Session ID injection:
- Added set_session_context(session_id) / clear_session_context() API
  using threading.local() for per-thread session tracking
- _SessionFilter enriches every log record with session_tag attribute
- Log format: '2026-04-11 10:23:45 INFO [session_id] logger.name: msg'
- Session context set at start of run_conversation() in run_agent.py
- Thread-isolated: gateway conversations on different threads don't leak

Phase 3 — Component filtering in hermes logs:
- Added --component flag: hermes logs --component gateway|agent|tools|cli|cron
- COMPONENT_PREFIXES maps component names to logger name prefixes
- Works with all existing filters (--level, --session, --since, -f)
- Logger name extraction handles both old and new log formats

Files changed:
- hermes_logging.py: _SessionFilter, _ComponentFilter, COMPONENT_PREFIXES,
  set/clear_session_context(), gateway.log creation in setup_logging()
- gateway/run.py: removed redundant gateway.log handler (now in hermes_logging)
- run_agent.py: set_session_context() at start of run_conversation()
- hermes_cli/logs.py: --component filter, logger name extraction
- hermes_cli/main.py: --component argument on logs subparser

Addresses community request for component-separated, filterable logging.
Zero changes to existing logger names — __name__ already provides hierarchy.

* fix: use LogRecord factory instead of per-handler _SessionFilter

The _SessionFilter approach required attaching a filter to every handler
we create. Any handler created outside our _add_rotating_handler (like
the gateway stderr handler, or third-party handlers) would crash with
KeyError: 'session_tag' if it used our format string.

Replace with logging.setLogRecordFactory() which injects session_tag
into every LogRecord at creation time — process-global, zero per-handler
wiring needed. The factory is installed at import time (before
setup_logging) so session_tag is available from the moment hermes_logging
is imported.

- Idempotent: marker attribute prevents double-wrapping on module reload
- Chains with existing factory: won't break third-party record factories
- Removes _SessionFilter from _add_rotating_handler and setup_verbose_logging
- Adds tests: record factory injection, idempotency, arbitrary handler compat
This commit is contained in:
Teknium 2026-04-11 17:23:36 -07:00 committed by GitHub
parent 723b5bec85
commit fd73937ec8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 728 additions and 230 deletions

View file

@ -8458,23 +8458,11 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
except Exception:
pass
# Centralized logging — agent.log (INFO+) and errors.log (WARNING+).
# Centralized logging — agent.log (INFO+), errors.log (WARNING+),
# and gateway.log (INFO+, gateway-component records only).
# Idempotent, so repeated calls from AIAgent.__init__ won't duplicate.
from hermes_logging import setup_logging
log_dir = setup_logging(hermes_home=_hermes_home, mode="gateway")
# Gateway-specific rotating log — captures all gateway-level messages
# (session management, platform adapters, slash commands, etc.).
from agent.redact import RedactingFormatter
from hermes_logging import _add_rotating_handler
_add_rotating_handler(
logging.getLogger(),
log_dir / 'gateway.log',
level=logging.INFO,
max_bytes=5 * 1024 * 1024,
backup_count=3,
formatter=RedactingFormatter('%(asctime)s %(levelname)s %(name)s: %(message)s'),
)
setup_logging(hermes_home=_hermes_home, mode="gateway")
# Optional stderr handler — level driven by -v/-q flags on the CLI.
# verbosity=None (-q/--quiet): no stderr output

View file

@ -1,16 +1,18 @@
"""``hermes logs`` — view and filter Hermes log files.
Supports tailing, following, session filtering, level filtering, and
relative time ranges. All log files live under ``~/.hermes/logs/``.
Supports tailing, following, session filtering, level filtering,
component filtering, and relative time ranges. All log files live
under ``~/.hermes/logs/``.
Usage examples::
hermes logs # last 50 lines of agent.log
hermes logs -f # follow agent.log in real time
hermes logs errors # last 50 lines of errors.log
hermes logs gateway -n 100 # last 100 lines of gateway.log
hermes logs gateway -n 100 # last 100 lines of gateway.log
hermes logs --level WARNING # only WARNING+ lines
hermes logs --session abc123 # filter by session ID substring
hermes logs --component tools # only tool-related lines
hermes logs --since 1h # lines from the last hour
hermes logs --since 30m -f # follow, starting 30 min ago
"""
@ -20,7 +22,7 @@ import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from typing import Optional, Sequence
from hermes_constants import get_hermes_home, display_hermes_home
@ -38,6 +40,15 @@ _TS_RE = re.compile(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})")
# Level extraction — matches " INFO ", " WARNING ", " ERROR ", " DEBUG ", " CRITICAL "
_LEVEL_RE = re.compile(r"\s(DEBUG|INFO|WARNING|ERROR|CRITICAL)\s")
# Logger name extraction — after level and optional session tag, the next
# non-space token before ":" is the logger name.
# Matches: "INFO gateway.run:" or "INFO [sess_abc] tools.terminal_tool:"
_LOGGER_NAME_RE = re.compile(
r"\s(?:DEBUG|INFO|WARNING|ERROR|CRITICAL)" # level
r"(?:\s+\[.*?\])?" # optional session tag
r"\s+(\S+):" # logger name
)
# Level ordering for >= filtering
_LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3, "CRITICAL": 4}
@ -79,12 +90,27 @@ def _extract_level(line: str) -> Optional[str]:
return m.group(1) if m else None
def _extract_logger_name(line: str) -> Optional[str]:
"""Extract the logger name from a log line."""
m = _LOGGER_NAME_RE.search(line)
return m.group(1) if m else None
def _line_matches_component(line: str, prefixes: Sequence[str]) -> bool:
"""Check if a log line's logger name starts with any of *prefixes*."""
name = _extract_logger_name(line)
if name is None:
return False
return name.startswith(tuple(prefixes))
def _matches_filters(
line: str,
*,
min_level: Optional[str] = None,
session_filter: Optional[str] = None,
since: Optional[datetime] = None,
component_prefixes: Optional[Sequence[str]] = None,
) -> bool:
"""Check if a log line passes all active filters."""
if since is not None:
@ -102,6 +128,10 @@ def _matches_filters(
if session_filter not in line:
return False
if component_prefixes is not None:
if not _line_matches_component(line, component_prefixes):
return False
return True
@ -113,6 +143,7 @@ def tail_log(
level: Optional[str] = None,
session: Optional[str] = None,
since: Optional[str] = None,
component: Optional[str] = None,
) -> None:
"""Read and display log lines, optionally following in real time.
@ -130,6 +161,8 @@ def tail_log(
Session ID substring to filter on.
since
Relative time string (e.g. ``"1h"``, ``"30m"``).
component
Component name to filter by (e.g. ``"gateway"``, ``"tools"``).
"""
filename = LOG_FILES.get(log_name)
if filename is None:
@ -155,13 +188,29 @@ def tail_log(
print(f"Invalid --level: {level!r}. Use DEBUG, INFO, WARNING, ERROR, or CRITICAL.")
sys.exit(1)
has_filters = min_level is not None or session is not None or since_dt is not None
# Resolve component to logger name prefixes
component_prefixes = None
if component:
from hermes_logging import COMPONENT_PREFIXES
component_lower = component.lower()
if component_lower not in COMPONENT_PREFIXES:
available = ", ".join(sorted(COMPONENT_PREFIXES))
print(f"Unknown component: {component!r}. Available: {available}")
sys.exit(1)
component_prefixes = COMPONENT_PREFIXES[component_lower]
has_filters = (
min_level is not None
or session is not None
or since_dt is not None
or component_prefixes is not None
)
# Read and display the tail
try:
lines = _read_tail(log_path, num_lines, has_filters=has_filters,
min_level=min_level, session_filter=session,
since=since_dt)
since=since_dt, component_prefixes=component_prefixes)
except PermissionError:
print(f"Permission denied: {log_path}")
sys.exit(1)
@ -172,6 +221,8 @@ def tail_log(
filter_parts.append(f"level>={min_level}")
if session:
filter_parts.append(f"session={session}")
if component:
filter_parts.append(f"component={component}")
if since:
filter_parts.append(f"since={since}")
filter_desc = f" [{', '.join(filter_parts)}]" if filter_parts else ""
@ -190,7 +241,7 @@ def tail_log(
# Follow mode — poll for new content
try:
_follow_log(log_path, min_level=min_level, session_filter=session,
since=since_dt)
since=since_dt, component_prefixes=component_prefixes)
except KeyboardInterrupt:
print("\n--- stopped ---")
@ -203,6 +254,7 @@ def _read_tail(
min_level: Optional[str] = None,
session_filter: Optional[str] = None,
since: Optional[datetime] = None,
component_prefixes: Optional[Sequence[str]] = None,
) -> list:
"""Read the last *num_lines* matching lines from a log file.
@ -215,7 +267,8 @@ def _read_tail(
filtered = [
l for l in raw_lines
if _matches_filters(l, min_level=min_level,
session_filter=session_filter, since=since)
session_filter=session_filter, since=since,
component_prefixes=component_prefixes)
]
return filtered[-num_lines:]
else:
@ -284,6 +337,7 @@ def _follow_log(
min_level: Optional[str] = None,
session_filter: Optional[str] = None,
since: Optional[datetime] = None,
component_prefixes: Optional[Sequence[str]] = None,
) -> None:
"""Poll a log file for new content and print matching lines."""
with open(path, "r", encoding="utf-8", errors="replace") as f:
@ -293,7 +347,8 @@ def _follow_log(
line = f.readline()
if line:
if _matches_filters(line, min_level=min_level,
session_filter=session_filter, since=since):
session_filter=session_filter, since=since,
component_prefixes=component_prefixes):
print(line, end="")
sys.stdout.flush()
else:

View file

@ -4338,6 +4338,7 @@ def cmd_logs(args):
level=getattr(args, "level", None),
session=getattr(args, "session", None),
since=getattr(args, "since", None),
component=getattr(args, "component", None),
)
@ -5737,6 +5738,7 @@ Examples:
hermes logs gateway -n 100 Show last 100 lines of gateway.log
hermes logs --level WARNING Only show WARNING and above
hermes logs --session abc123 Filter by session ID
hermes logs --component tools Only show tool-related lines
hermes logs --since 1h Lines from the last hour
hermes logs --since 30m -f Follow, starting from 30 min ago
hermes logs list List available log files with sizes
@ -5766,6 +5768,10 @@ Examples:
"--since", metavar="TIME",
help="Show lines since TIME ago (e.g. 1h, 30m, 2d)",
)
logs_parser.add_argument(
"--component", metavar="NAME",
help="Filter by component: gateway, agent, tools, cli, cron",
)
logs_parser.set_defaults(func=cmd_logs)
# =========================================================================

View file

@ -7,16 +7,28 @@ gateway call early in their startup path. All log files live under
Log files produced:
agent.log INFO+, all agent/tool/session activity (the main log)
errors.log WARNING+, errors and warnings only (quick triage)
gateway.log INFO+, gateway-only events (created when mode="gateway")
Both files use ``RotatingFileHandler`` with ``RedactingFormatter`` so
All files use ``RotatingFileHandler`` with ``RedactingFormatter`` so
secrets are never written to disk.
Component separation:
gateway.log only receives records from ``gateway.*`` loggers
platform adapters, session management, slash commands, delivery.
agent.log remains the catch-all (everything goes there).
Session context:
Call ``set_session_context(session_id)`` at the start of a conversation
and ``clear_session_context()`` when done. All log lines emitted on
that thread will include ``[session_id]`` for filtering/correlation.
"""
import logging
import os
import threading
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional
from typing import Optional, Sequence
from hermes_constants import get_config_path, get_hermes_home
@ -25,9 +37,14 @@ from hermes_constants import get_config_path, get_hermes_home
# unless ``force=True``.
_logging_initialized = False
# Default log format — includes timestamp, level, logger name, and message.
_LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s: %(message)s"
_LOG_FORMAT_VERBOSE = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Thread-local storage for per-conversation session context.
_session_context = threading.local()
# Default log format — includes timestamp, level, optional session tag,
# logger name, and message. The ``%(session_tag)s`` field is guaranteed to
# exist on every LogRecord via _install_session_record_factory() below.
_LOG_FORMAT = "%(asctime)s %(levelname)s%(session_tag)s %(name)s: %(message)s"
_LOG_FORMAT_VERBOSE = "%(asctime)s - %(name)s - %(levelname)s%(session_tag)s - %(message)s"
# Third-party loggers that are noisy at DEBUG/INFO level.
_NOISY_LOGGERS = (
@ -48,6 +65,99 @@ _NOISY_LOGGERS = (
)
# ---------------------------------------------------------------------------
# Public session context API
# ---------------------------------------------------------------------------
def set_session_context(session_id: str) -> None:
"""Set the session ID for the current thread.
All subsequent log records on this thread will include ``[session_id]``
in the formatted output. Call at the start of ``run_conversation()``.
"""
_session_context.session_id = session_id
def clear_session_context() -> None:
"""Clear the session ID for the current thread.
Optional ``set_session_context()`` overwrites the previous value,
so explicit clearing is only needed if the thread is reused for
non-conversation work after ``run_conversation()`` returns.
"""
_session_context.session_id = None
# ---------------------------------------------------------------------------
# Record factory — injects session_tag into every LogRecord at creation
# ---------------------------------------------------------------------------
def _install_session_record_factory() -> None:
"""Replace the global LogRecord factory with one that adds ``session_tag``.
Unlike a ``logging.Filter`` on a handler or logger, the record factory
runs for EVERY record in the process including records that propagate
from child loggers and records handled by third-party handlers. This
guarantees ``%(session_tag)s`` is always available in format strings,
eliminating the KeyError that would occur if a handler used our format
without having a ``_SessionFilter`` attached.
Idempotent checks for a marker attribute to avoid double-wrapping if
the module is reloaded.
"""
current_factory = logging.getLogRecordFactory()
if getattr(current_factory, "_hermes_session_injector", False):
return # already installed
def _session_record_factory(*args, **kwargs):
record = current_factory(*args, **kwargs)
sid = getattr(_session_context, "session_id", None)
record.session_tag = f" [{sid}]" if sid else "" # type: ignore[attr-defined]
return record
_session_record_factory._hermes_session_injector = True # type: ignore[attr-defined]
logging.setLogRecordFactory(_session_record_factory)
# Install immediately on import — session_tag is available on all records
# from this point forward, even before setup_logging() is called.
_install_session_record_factory()
# ---------------------------------------------------------------------------
# Filters
# ---------------------------------------------------------------------------
class _ComponentFilter(logging.Filter):
"""Only pass records whose logger name starts with one of *prefixes*.
Used to route gateway-specific records to ``gateway.log`` while
keeping ``agent.log`` as the catch-all.
"""
def __init__(self, prefixes: Sequence[str]) -> None:
super().__init__()
self._prefixes = tuple(prefixes)
def filter(self, record: logging.LogRecord) -> bool:
return record.name.startswith(self._prefixes)
# Logger name prefixes that belong to each component.
# Used by _ComponentFilter and exposed for ``hermes logs --component``.
COMPONENT_PREFIXES = {
"gateway": ("gateway",),
"agent": ("agent", "run_agent", "model_tools", "batch_runner"),
"tools": ("tools",),
"cli": ("hermes_cli", "cli"),
"cron": ("cron",),
}
# ---------------------------------------------------------------------------
# Main setup
# ---------------------------------------------------------------------------
def setup_logging(
*,
hermes_home: Optional[Path] = None,
@ -78,8 +188,9 @@ def setup_logging(
Number of rotated backup files to keep.
Defaults to 3 or the value from config.yaml ``logging.backup_count``.
mode
Hint for the caller context: ``"cli"``, ``"gateway"``, ``"cron"``.
Currently used only for log format tuning (gateway includes PID).
Caller context: ``"cli"``, ``"gateway"``, ``"cron"``.
When ``"gateway"``, an additional ``gateway.log`` file is created
that receives only gateway-component records.
force
Re-run setup even if it has already been called.
@ -130,6 +241,18 @@ def setup_logging(
formatter=RedactingFormatter(_LOG_FORMAT),
)
# --- gateway.log (INFO+, gateway component only) ------------------------
if mode == "gateway":
_add_rotating_handler(
root,
log_dir / "gateway.log",
level=logging.INFO,
max_bytes=5 * 1024 * 1024,
backup_count=3,
formatter=RedactingFormatter(_LOG_FORMAT),
log_filter=_ComponentFilter(COMPONENT_PREFIXES["gateway"]),
)
# Ensure root logger level is low enough for the handlers to fire.
if root.level == logging.NOTSET or root.level > level:
root.setLevel(level)
@ -218,9 +341,16 @@ def _add_rotating_handler(
max_bytes: int,
backup_count: int,
formatter: logging.Formatter,
log_filter: Optional[logging.Filter] = None,
) -> None:
"""Add a ``RotatingFileHandler`` to *logger*, skipping if one already
exists for the same resolved file path (idempotent).
Parameters
----------
log_filter
Optional filter to attach to the handler (e.g. ``_ComponentFilter``
for gateway.log).
"""
resolved = path.resolve()
for existing in logger.handlers:
@ -236,6 +366,8 @@ def _add_rotating_handler(
)
handler.setLevel(level)
handler.setFormatter(formatter)
if log_filter is not None:
handler.addFilter(log_filter)
logger.addHandler(handler)

View file

@ -7529,6 +7529,11 @@ class AIAgent:
# Installed once, transparent when streams are healthy, prevents crash on write.
_install_safe_stdio()
# Tag all log records on this thread with the session ID so
# ``hermes logs --session <id>`` can filter a single conversation.
from hermes_logging import set_session_context
set_session_context(self.session_id)
# If the previous turn activated fallback, restore the primary
# runtime so this turn gets a fresh attempt with the preferred model.
# No-op when _fallback_activated is False (gateway, first turn, etc.).

View file

@ -1,288 +1,255 @@
"""Tests for hermes_cli/logs.py — log viewing and filtering."""
"""Tests for hermes_cli.logs — log viewing and filtering."""
import os
import textwrap
from datetime import datetime, timedelta
from io import StringIO
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_cli.logs import (
LOG_FILES,
_extract_level,
_extract_logger_name,
_line_matches_component,
_matches_filters,
_parse_line_timestamp,
_parse_since,
_read_last_n_lines,
list_logs,
tail_log,
_read_tail,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def log_dir(tmp_path, monkeypatch):
"""Create a fake HERMES_HOME with a logs/ directory."""
home = Path(os.environ["HERMES_HOME"])
logs = home / "logs"
logs.mkdir(parents=True, exist_ok=True)
return logs
@pytest.fixture
def sample_agent_log(log_dir):
"""Write a realistic agent.log with mixed levels and sessions."""
lines = textwrap.dedent("""\
2026-04-05 10:00:00,000 INFO run_agent: conversation turn: session=sess_aaa model=claude provider=openrouter platform=cli history=0 msg='hello'
2026-04-05 10:00:01,000 INFO run_agent: tool terminal completed (0.50s, 200 chars)
2026-04-05 10:00:02,000 INFO run_agent: API call #1: model=claude provider=openrouter in=1000 out=200 total=1200 latency=1.5s
2026-04-05 10:00:03,000 WARNING run_agent: Tool web_search returned error (2.00s): timeout
2026-04-05 10:00:04,000 INFO run_agent: conversation turn: session=sess_bbb model=gpt-5 provider=openai platform=telegram history=5 msg='fix bug'
2026-04-05 10:00:05,000 ERROR run_agent: API call failed after 3 retries. rate limited
2026-04-05 10:00:06,000 INFO run_agent: tool read_file completed (0.01s, 500 chars)
2026-04-05 10:00:07,000 DEBUG run_agent: verbose internal detail
2026-04-05 10:00:08,000 INFO credential_pool: credential pool: marking key-1 exhausted (status=429), rotating
2026-04-05 10:00:09,000 INFO credential_pool: credential pool: rotated to key-2
""")
path = log_dir / "agent.log"
path.write_text(lines)
return path
@pytest.fixture
def sample_errors_log(log_dir):
"""Write a small errors.log."""
lines = textwrap.dedent("""\
2026-04-05 10:00:03,000 WARNING run_agent: Tool web_search returned error (2.00s): timeout
2026-04-05 10:00:05,000 ERROR run_agent: API call failed after 3 retries. rate limited
""")
path = log_dir / "errors.log"
path.write_text(lines)
return path
# ---------------------------------------------------------------------------
# _parse_since
# Timestamp parsing
# ---------------------------------------------------------------------------
class TestParseSince:
def test_hours(self):
cutoff = _parse_since("2h")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(7200, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 7200) < 2
def test_minutes(self):
cutoff = _parse_since("30m")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(1800, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 1800) < 2
def test_days(self):
cutoff = _parse_since("1d")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(86400, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 86400) < 2
def test_seconds(self):
cutoff = _parse_since("60s")
cutoff = _parse_since("120s")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(60, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 120) < 2
def test_invalid_returns_none(self):
assert _parse_since("abc") is None
assert _parse_since("") is None
assert _parse_since("10x") is None
def test_whitespace_handling(self):
cutoff = _parse_since(" 1h ")
def test_whitespace_tolerance(self):
cutoff = _parse_since(" 5m ")
assert cutoff is not None
# ---------------------------------------------------------------------------
# _parse_line_timestamp
# ---------------------------------------------------------------------------
class TestParseLineTimestamp:
def test_standard_format(self):
ts = _parse_line_timestamp("2026-04-05 10:00:00,123 INFO something")
assert ts is not None
assert ts.year == 2026
assert ts.hour == 10
ts = _parse_line_timestamp("2026-04-11 10:23:45 INFO gateway.run: msg")
assert ts == datetime(2026, 4, 11, 10, 23, 45)
def test_no_timestamp(self):
assert _parse_line_timestamp("just some text") is None
assert _parse_line_timestamp("no timestamp here") is None
def test_continuation_line(self):
assert _parse_line_timestamp(" at module.function (line 42)") is None
# ---------------------------------------------------------------------------
# _extract_level
# ---------------------------------------------------------------------------
class TestExtractLevel:
def test_info(self):
assert _extract_level("2026-04-05 10:00:00 INFO run_agent: something") == "INFO"
assert _extract_level("2026-01-01 00:00:00 INFO gateway.run: msg") == "INFO"
def test_warning(self):
assert _extract_level("2026-04-05 10:00:00 WARNING run_agent: bad") == "WARNING"
assert _extract_level("2026-01-01 00:00:00 WARNING tools.file: msg") == "WARNING"
def test_error(self):
assert _extract_level("2026-04-05 10:00:00 ERROR run_agent: crash") == "ERROR"
assert _extract_level("2026-01-01 00:00:00 ERROR run_agent: msg") == "ERROR"
def test_debug(self):
assert _extract_level("2026-04-05 10:00:00 DEBUG run_agent: detail") == "DEBUG"
assert _extract_level("2026-01-01 00:00:00 DEBUG agent.aux: msg") == "DEBUG"
def test_no_level(self):
assert _extract_level("just a plain line") is None
assert _extract_level("random text") is None
# ---------------------------------------------------------------------------
# _matches_filters
# Logger name extraction (new for component filtering)
# ---------------------------------------------------------------------------
class TestExtractLoggerName:
def test_standard_line(self):
line = "2026-04-11 10:23:45 INFO gateway.run: Starting gateway"
assert _extract_logger_name(line) == "gateway.run"
def test_nested_logger(self):
line = "2026-04-11 10:23:45 INFO gateway.platforms.telegram: connected"
assert _extract_logger_name(line) == "gateway.platforms.telegram"
def test_warning_level(self):
line = "2026-04-11 10:23:45 WARNING tools.terminal_tool: timeout"
assert _extract_logger_name(line) == "tools.terminal_tool"
def test_with_session_tag(self):
line = "2026-04-11 10:23:45 INFO [abc123] tools.file_tools: reading file"
assert _extract_logger_name(line) == "tools.file_tools"
def test_with_session_tag_and_error(self):
line = "2026-04-11 10:23:45 ERROR [sess_xyz] agent.context_compressor: failed"
assert _extract_logger_name(line) == "agent.context_compressor"
def test_top_level_module(self):
line = "2026-04-11 10:23:45 INFO run_agent: starting conversation"
assert _extract_logger_name(line) == "run_agent"
def test_no_match(self):
assert _extract_logger_name("random text") is None
class TestLineMatchesComponent:
def test_gateway_component(self):
line = "2026-04-11 10:23:45 INFO gateway.run: msg"
assert _line_matches_component(line, ("gateway",))
def test_gateway_nested(self):
line = "2026-04-11 10:23:45 INFO gateway.platforms.telegram: msg"
assert _line_matches_component(line, ("gateway",))
def test_tools_component(self):
line = "2026-04-11 10:23:45 INFO tools.terminal_tool: msg"
assert _line_matches_component(line, ("tools",))
def test_agent_with_multiple_prefixes(self):
prefixes = ("agent", "run_agent", "model_tools")
assert _line_matches_component(
"2026-04-11 10:23:45 INFO agent.context_compressor: msg", prefixes)
assert _line_matches_component(
"2026-04-11 10:23:45 INFO run_agent: msg", prefixes)
assert _line_matches_component(
"2026-04-11 10:23:45 INFO model_tools: msg", prefixes)
def test_no_match(self):
line = "2026-04-11 10:23:45 INFO tools.browser: msg"
assert not _line_matches_component(line, ("gateway",))
def test_with_session_tag(self):
line = "2026-04-11 10:23:45 INFO [abc] gateway.run: msg"
assert _line_matches_component(line, ("gateway",))
def test_unparseable_line(self):
assert not _line_matches_component("random text", ("gateway",))
# ---------------------------------------------------------------------------
# Combined filter
# ---------------------------------------------------------------------------
class TestMatchesFilters:
def test_no_filters_always_matches(self):
assert _matches_filters("any line") is True
def test_no_filters_passes_everything(self):
assert _matches_filters("any line")
def test_level_filter_passes(self):
def test_level_filter(self):
assert _matches_filters(
"2026-04-05 10:00:00 WARNING something",
min_level="WARNING",
) is True
"2026-01-01 00:00:00 WARNING x: msg", min_level="WARNING")
assert not _matches_filters(
"2026-01-01 00:00:00 INFO x: msg", min_level="WARNING")
def test_level_filter_rejects(self):
def test_session_filter(self):
assert _matches_filters(
"2026-04-05 10:00:00 INFO something",
min_level="WARNING",
) is False
"2026-01-01 00:00:00 INFO [abc123] x: msg", session_filter="abc123")
assert not _matches_filters(
"2026-01-01 00:00:00 INFO [xyz789] x: msg", session_filter="abc123")
def test_session_filter_passes(self):
def test_component_filter(self):
assert _matches_filters(
"session=sess_aaa model=claude",
session_filter="sess_aaa",
) is True
def test_session_filter_rejects(self):
assert _matches_filters(
"session=sess_aaa model=claude",
session_filter="sess_bbb",
) is False
def test_since_filter_passes(self):
# Line from the future should always pass
assert _matches_filters(
"2099-01-01 00:00:00 INFO future",
since=datetime.now(),
) is True
def test_since_filter_rejects(self):
assert _matches_filters(
"2020-01-01 00:00:00 INFO past",
since=datetime.now(),
) is False
"2026-01-01 00:00:00 INFO gateway.run: msg",
component_prefixes=("gateway",))
assert not _matches_filters(
"2026-01-01 00:00:00 INFO tools.file: msg",
component_prefixes=("gateway",))
def test_combined_filters(self):
line = "2099-01-01 00:00:00 WARNING run_agent: session=abc error"
"""All filters must pass for a line to match."""
line = "2026-04-11 10:00:00 WARNING [sess_1] gateway.run: connection lost"
assert _matches_filters(
line, min_level="WARNING", session_filter="abc",
since=datetime.now(),
) is True
# Fails session filter
line,
min_level="WARNING",
session_filter="sess_1",
component_prefixes=("gateway",),
)
# Fails component filter
assert not _matches_filters(
line,
min_level="WARNING",
session_filter="sess_1",
component_prefixes=("tools",),
)
def test_since_filter(self):
# Line with a very old timestamp should be filtered out
assert not _matches_filters(
"2020-01-01 00:00:00 INFO x: old msg",
since=datetime.now() - timedelta(hours=1))
# Line with a recent timestamp should pass
recent = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
assert _matches_filters(
line, min_level="WARNING", session_filter="xyz",
) is False
f"{recent} INFO x: recent msg",
since=datetime.now() - timedelta(hours=1))
# ---------------------------------------------------------------------------
# _read_last_n_lines
# File reading
# ---------------------------------------------------------------------------
class TestReadLastNLines:
def test_reads_correct_count(self, sample_agent_log):
lines = _read_last_n_lines(sample_agent_log, 3)
assert len(lines) == 3
class TestReadTail:
def test_read_small_file(self, tmp_path):
log_file = tmp_path / "test.log"
lines = [f"2026-01-01 00:00:0{i} INFO x: line {i}\n" for i in range(10)]
log_file.write_text("".join(lines))
def test_reads_all_when_fewer(self, sample_agent_log):
lines = _read_last_n_lines(sample_agent_log, 100)
assert len(lines) == 10 # sample has 10 lines
result = _read_last_n_lines(log_file, 5)
assert len(result) == 5
assert "line 9" in result[-1]
def test_empty_file(self, log_dir):
empty = log_dir / "empty.log"
empty.write_text("")
lines = _read_last_n_lines(empty, 10)
assert lines == []
def test_read_with_component_filter(self, tmp_path):
log_file = tmp_path / "test.log"
lines = [
"2026-01-01 00:00:00 INFO gateway.run: gw msg\n",
"2026-01-01 00:00:01 INFO tools.file: tool msg\n",
"2026-01-01 00:00:02 INFO gateway.session: session msg\n",
"2026-01-01 00:00:03 INFO agent.compressor: agent msg\n",
]
log_file.write_text("".join(lines))
def test_last_line_content(self, sample_agent_log):
lines = _read_last_n_lines(sample_agent_log, 1)
assert "rotated to key-2" in lines[0]
result = _read_tail(
log_file, 50,
has_filters=True,
component_prefixes=("gateway",),
)
assert len(result) == 2
assert "gw msg" in result[0]
assert "session msg" in result[1]
def test_empty_file(self, tmp_path):
log_file = tmp_path / "empty.log"
log_file.write_text("")
result = _read_last_n_lines(log_file, 10)
assert result == []
# ---------------------------------------------------------------------------
# tail_log
# LOG_FILES registry
# ---------------------------------------------------------------------------
class TestTailLog:
def test_basic_tail(self, sample_agent_log, capsys):
tail_log("agent", num_lines=3)
captured = capsys.readouterr()
assert "agent.log" in captured.out
# Should have the header + 3 lines
lines = captured.out.strip().split("\n")
assert len(lines) == 4 # 1 header + 3 content
def test_level_filter(self, sample_agent_log, capsys):
tail_log("agent", num_lines=50, level="ERROR")
captured = capsys.readouterr()
assert "level>=ERROR" in captured.out
# Only the ERROR line should appear
content_lines = [l for l in captured.out.strip().split("\n") if not l.startswith("---")]
assert len(content_lines) == 1
assert "API call failed" in content_lines[0]
def test_session_filter(self, sample_agent_log, capsys):
tail_log("agent", num_lines=50, session="sess_bbb")
captured = capsys.readouterr()
content_lines = [l for l in captured.out.strip().split("\n") if not l.startswith("---")]
assert len(content_lines) == 1
assert "sess_bbb" in content_lines[0]
def test_errors_log(self, sample_errors_log, capsys):
tail_log("errors", num_lines=10)
captured = capsys.readouterr()
assert "errors.log" in captured.out
assert "WARNING" in captured.out or "ERROR" in captured.out
def test_unknown_log_exits(self):
with pytest.raises(SystemExit):
tail_log("nonexistent")
def test_missing_file_exits(self, log_dir):
with pytest.raises(SystemExit):
tail_log("agent") # agent.log doesn't exist in clean log_dir
# ---------------------------------------------------------------------------
# list_logs
# ---------------------------------------------------------------------------
class TestListLogs:
def test_lists_files(self, sample_agent_log, sample_errors_log, capsys):
list_logs()
captured = capsys.readouterr()
assert "agent.log" in captured.out
assert "errors.log" in captured.out
def test_empty_dir(self, log_dir, capsys):
list_logs()
captured = capsys.readouterr()
assert "no log files yet" in captured.out
def test_shows_sizes(self, sample_agent_log, capsys):
list_logs()
captured = capsys.readouterr()
# File is small, should show as bytes or KB
assert "B" in captured.out or "KB" in captured.out
class TestLogFiles:
def test_known_log_files(self):
assert "agent" in LOG_FILES
assert "errors" in LOG_FILES
assert "gateway" in LOG_FILES

View file

@ -3,6 +3,7 @@
import logging
import os
import stat
import threading
from logging.handlers import RotatingFileHandler
from pathlib import Path
from unittest.mock import patch
@ -34,6 +35,8 @@ def _reset_logging_state():
h.close()
else:
pre_existing.append(h)
# Ensure the record factory is installed (it's idempotent).
hermes_logging._install_session_record_factory()
yield
# Restore — remove any handlers added during the test.
for h in list(root.handlers):
@ -41,6 +44,7 @@ def _reset_logging_state():
root.removeHandler(h)
h.close()
hermes_logging._logging_initialized = False
hermes_logging.clear_session_context()
@pytest.fixture
@ -220,6 +224,294 @@ class TestSetupLogging:
]
assert agent_handlers[0].level == logging.WARNING
def test_record_factory_installed(self, hermes_home):
"""The custom record factory injects session_tag on all records."""
hermes_logging.setup_logging(hermes_home=hermes_home)
factory = logging.getLogRecordFactory()
assert getattr(factory, "_hermes_session_injector", False), (
"Record factory should have _hermes_session_injector marker"
)
# Verify session_tag exists on a fresh record
record = factory("test", logging.INFO, "", 0, "msg", (), None)
assert hasattr(record, "session_tag")
class TestGatewayMode:
"""setup_logging(mode='gateway') creates a filtered gateway.log."""
def test_gateway_log_created(self, hermes_home):
hermes_logging.setup_logging(hermes_home=hermes_home, mode="gateway")
root = logging.getLogger()
gw_handlers = [
h for h in root.handlers
if isinstance(h, RotatingFileHandler)
and "gateway.log" in getattr(h, "baseFilename", "")
]
assert len(gw_handlers) == 1
def test_gateway_log_not_created_in_cli_mode(self, hermes_home):
hermes_logging.setup_logging(hermes_home=hermes_home, mode="cli")
root = logging.getLogger()
gw_handlers = [
h for h in root.handlers
if isinstance(h, RotatingFileHandler)
and "gateway.log" in getattr(h, "baseFilename", "")
]
assert len(gw_handlers) == 0
def test_gateway_log_receives_gateway_records(self, hermes_home):
"""gateway.log captures records from gateway.* loggers."""
hermes_logging.setup_logging(hermes_home=hermes_home, mode="gateway")
gw_logger = logging.getLogger("gateway.platforms.telegram")
gw_logger.info("telegram connected")
for h in logging.getLogger().handlers:
h.flush()
gw_log = hermes_home / "logs" / "gateway.log"
assert gw_log.exists()
assert "telegram connected" in gw_log.read_text()
def test_gateway_log_rejects_non_gateway_records(self, hermes_home):
"""gateway.log does NOT capture records from tools.*, agent.*, etc."""
hermes_logging.setup_logging(hermes_home=hermes_home, mode="gateway")
tool_logger = logging.getLogger("tools.terminal_tool")
tool_logger.info("running command")
agent_logger = logging.getLogger("agent.context_compressor")
agent_logger.info("compressing context")
for h in logging.getLogger().handlers:
h.flush()
gw_log = hermes_home / "logs" / "gateway.log"
if gw_log.exists():
content = gw_log.read_text()
assert "running command" not in content
assert "compressing context" not in content
def test_agent_log_still_receives_all(self, hermes_home):
"""agent.log (catch-all) still receives gateway AND tool records."""
hermes_logging.setup_logging(hermes_home=hermes_home, mode="gateway")
logging.getLogger("gateway.run").info("gateway msg")
logging.getLogger("tools.file_tools").info("file msg")
for h in logging.getLogger().handlers:
h.flush()
agent_log = hermes_home / "logs" / "agent.log"
content = agent_log.read_text()
assert "gateway msg" in content
assert "file msg" in content
class TestSessionContext:
"""set_session_context / clear_session_context + _SessionFilter."""
def test_session_tag_in_log_output(self, hermes_home):
"""When session context is set, log lines include [session_id]."""
hermes_logging.setup_logging(hermes_home=hermes_home)
hermes_logging.set_session_context("abc123")
test_logger = logging.getLogger("test.session_tag")
test_logger.info("tagged message")
for h in logging.getLogger().handlers:
h.flush()
agent_log = hermes_home / "logs" / "agent.log"
content = agent_log.read_text()
assert "[abc123]" in content
assert "tagged message" in content
def test_no_session_tag_without_context(self, hermes_home):
"""Without session context, log lines have no session tag."""
hermes_logging.setup_logging(hermes_home=hermes_home)
hermes_logging.clear_session_context()
test_logger = logging.getLogger("test.no_session")
test_logger.info("untagged message")
for h in logging.getLogger().handlers:
h.flush()
agent_log = hermes_home / "logs" / "agent.log"
content = agent_log.read_text()
assert "untagged message" in content
# Should not have any [xxx] session tag
import re
for line in content.splitlines():
if "untagged message" in line:
assert not re.search(r"\[.+?\]", line.split("INFO")[1].split("test.no_session")[0])
def test_clear_session_context(self, hermes_home):
"""After clearing, session tag disappears."""
hermes_logging.setup_logging(hermes_home=hermes_home)
hermes_logging.set_session_context("xyz789")
hermes_logging.clear_session_context()
test_logger = logging.getLogger("test.cleared")
test_logger.info("after clear")
for h in logging.getLogger().handlers:
h.flush()
agent_log = hermes_home / "logs" / "agent.log"
content = agent_log.read_text()
assert "[xyz789]" not in content
def test_session_context_thread_isolated(self, hermes_home):
"""Session context is per-thread — one thread's context doesn't leak."""
hermes_logging.setup_logging(hermes_home=hermes_home)
results = {}
def thread_a():
hermes_logging.set_session_context("thread_a_session")
logging.getLogger("test.thread_a").info("from thread A")
for h in logging.getLogger().handlers:
h.flush()
def thread_b():
hermes_logging.set_session_context("thread_b_session")
logging.getLogger("test.thread_b").info("from thread B")
for h in logging.getLogger().handlers:
h.flush()
ta = threading.Thread(target=thread_a)
tb = threading.Thread(target=thread_b)
ta.start()
ta.join()
tb.start()
tb.join()
agent_log = hermes_home / "logs" / "agent.log"
content = agent_log.read_text()
# Each thread's message should have its own session tag
for line in content.splitlines():
if "from thread A" in line:
assert "[thread_a_session]" in line
assert "[thread_b_session]" not in line
if "from thread B" in line:
assert "[thread_b_session]" in line
assert "[thread_a_session]" not in line
class TestRecordFactory:
"""Unit tests for the custom LogRecord factory."""
def test_record_has_session_tag(self):
"""Every record gets a session_tag attribute."""
factory = logging.getLogRecordFactory()
record = factory("test", logging.INFO, "", 0, "msg", (), None)
assert hasattr(record, "session_tag")
def test_empty_tag_without_context(self):
hermes_logging.clear_session_context()
factory = logging.getLogRecordFactory()
record = factory("test", logging.INFO, "", 0, "msg", (), None)
assert record.session_tag == ""
def test_tag_with_context(self):
hermes_logging.set_session_context("sess_42")
factory = logging.getLogRecordFactory()
record = factory("test", logging.INFO, "", 0, "msg", (), None)
assert record.session_tag == " [sess_42]"
def test_idempotent_install(self):
"""Calling _install_session_record_factory() twice doesn't double-wrap."""
hermes_logging._install_session_record_factory()
factory_a = logging.getLogRecordFactory()
hermes_logging._install_session_record_factory()
factory_b = logging.getLogRecordFactory()
assert factory_a is factory_b
def test_works_with_any_handler(self):
"""A handler using %(session_tag)s works even without _SessionFilter."""
hermes_logging.set_session_context("any_handler_test")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(session_tag)s %(message)s"))
logger = logging.getLogger("_test_any_handler")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
try:
# Should not raise KeyError
logger.info("hello")
finally:
logger.removeHandler(handler)
class TestComponentFilter:
"""Unit tests for _ComponentFilter."""
def test_passes_matching_prefix(self):
f = hermes_logging._ComponentFilter(("gateway",))
record = logging.LogRecord(
"gateway.run", logging.INFO, "", 0, "msg", (), None
)
assert f.filter(record) is True
def test_passes_nested_matching_prefix(self):
f = hermes_logging._ComponentFilter(("gateway",))
record = logging.LogRecord(
"gateway.platforms.telegram", logging.INFO, "", 0, "msg", (), None
)
assert f.filter(record) is True
def test_blocks_non_matching(self):
f = hermes_logging._ComponentFilter(("gateway",))
record = logging.LogRecord(
"tools.terminal_tool", logging.INFO, "", 0, "msg", (), None
)
assert f.filter(record) is False
def test_multiple_prefixes(self):
f = hermes_logging._ComponentFilter(("agent", "run_agent", "model_tools"))
assert f.filter(logging.LogRecord(
"agent.compressor", logging.INFO, "", 0, "", (), None
))
assert f.filter(logging.LogRecord(
"run_agent", logging.INFO, "", 0, "", (), None
))
assert f.filter(logging.LogRecord(
"model_tools", logging.INFO, "", 0, "", (), None
))
assert not f.filter(logging.LogRecord(
"tools.browser", logging.INFO, "", 0, "", (), None
))
class TestComponentPrefixes:
"""COMPONENT_PREFIXES covers the expected components."""
def test_gateway_prefix(self):
assert "gateway" in hermes_logging.COMPONENT_PREFIXES
assert ("gateway",) == hermes_logging.COMPONENT_PREFIXES["gateway"]
def test_agent_prefix(self):
prefixes = hermes_logging.COMPONENT_PREFIXES["agent"]
assert "agent" in prefixes
assert "run_agent" in prefixes
assert "model_tools" in prefixes
def test_tools_prefix(self):
assert ("tools",) == hermes_logging.COMPONENT_PREFIXES["tools"]
def test_cli_prefix(self):
prefixes = hermes_logging.COMPONENT_PREFIXES["cli"]
assert "hermes_cli" in prefixes
assert "cli" in prefixes
def test_cron_prefix(self):
assert ("cron",) == hermes_logging.COMPONENT_PREFIXES["cron"]
class TestSetupVerboseLogging:
"""setup_verbose_logging() adds a DEBUG-level console handler."""
@ -301,6 +593,59 @@ class TestAddRotatingHandler:
logger.removeHandler(h)
h.close()
def test_log_filter_attached(self, tmp_path):
"""Optional log_filter is attached to the handler."""
log_path = tmp_path / "filtered.log"
logger = logging.getLogger("_test_rotating_filter")
formatter = logging.Formatter("%(message)s")
component_filter = hermes_logging._ComponentFilter(("test",))
hermes_logging._add_rotating_handler(
logger, log_path,
level=logging.INFO, max_bytes=1024, backup_count=1,
formatter=formatter,
log_filter=component_filter,
)
handlers = [h for h in logger.handlers if isinstance(h, RotatingFileHandler)]
assert len(handlers) == 1
assert component_filter in handlers[0].filters
# Clean up
for h in list(logger.handlers):
if isinstance(h, RotatingFileHandler):
logger.removeHandler(h)
h.close()
def test_no_session_filter_on_handler(self, tmp_path):
"""Handlers rely on record factory, not per-handler _SessionFilter."""
log_path = tmp_path / "no_session_filter.log"
logger = logging.getLogger("_test_no_session_filter")
formatter = logging.Formatter("%(session_tag)s%(message)s")
hermes_logging._add_rotating_handler(
logger, log_path,
level=logging.INFO, max_bytes=1024, backup_count=1,
formatter=formatter,
)
handlers = [h for h in logger.handlers if isinstance(h, RotatingFileHandler)]
assert len(handlers) == 1
# No _SessionFilter on the handler — record factory handles it
assert len(handlers[0].filters) == 0
# But session_tag still works (via record factory)
hermes_logging.set_session_context("factory_test")
logger.info("test msg")
handlers[0].flush()
content = log_path.read_text()
assert "[factory_test]" in content
# Clean up
for h in list(logger.handlers):
if isinstance(h, RotatingFileHandler):
logger.removeHandler(h)
h.close()
def test_managed_mode_initial_open_sets_group_writable(self, tmp_path):
log_path = tmp_path / "managed-open.log"
logger = logging.getLogger("_test_rotating_managed_open")