hermes-agent/agent/memory_provider.py
Teknium 13683c0842
feat(memory): notify providers on mid-process session_id rotation (#17409)
Fixes #6672

Memory providers now receive on_session_switch() whenever AIAgent.session_id
rotates mid-process — /resume, /branch, /reset, /new, and context
compression. Before this, providers that cached per-session state in
initialize() (Hindsight's _session_id, _document_id, accumulated
_session_turns, _turn_counter) kept writing into the old session's
record after the agent had moved on.

MemoryProvider ABC
------------------
- New optional hook on_session_switch(new_session_id, *,
  parent_session_id='', reset=False, **kwargs) with no-op default for
  backward compat. reset=True signals /reset or /new — providers should
  flush accumulated per-session buffers. reset=False for /resume,
  /branch, compression where the logical conversation continues.

MemoryManager
-------------
- on_session_switch() fans the hook out to every registered provider.
  Isolated try/except per provider — one bad provider can't block others.
- Empty/None new_session_id is a no-op to avoid corrupting provider state
  during shutdown paths.

run_agent.py
------------
- _sync_external_memory_for_turn now passes session_id=self.session_id
  into sync_all() and queue_prefetch_all(). Providers with defensive
  session_id updates in sync_turn (Hindsight already had this at
  plugins/memory/hindsight/__init__.py:1199) now actually receive the
  current id.
- Compression block at ~L8884 already notified the context engine of
  the rollover; now also calls
  _memory_manager.on_session_switch(reason='compression').

cli.py
------
- new_session() fires reset=True, reason='new_session' so providers
  flush buffers.
- _handle_resume_command fires reset=False, reason='resume' with the
  previous session as parent_session_id.
- _handle_branch_command fires reset=False, reason='branch' with the
  parent session_id already captured for the DB parent link.

gateway/run.py
--------------
- _handle_resume_command now evicts the cached AIAgent, mirroring
  /branch and /reset. The next message rebuilds a fresh agent whose
  memory provider initialize() runs with the correct session_id —
  matches the pattern the gateway already uses for provider state
  cross-session transitions.

Hindsight reference implementation
----------------------------------
- plugins/memory/hindsight/__init__.py adds on_session_switch that:
  updates _session_id, mints a fresh _document_id (prevents
  vectorize-io/hindsight#1303 overwrite), and clears _session_turns /
  _turn_counter / _turn_index so in-flight batches don't flush under
  the new document id. parent_session_id only overwritten when provided
  (avoids clobbering on a bare switch).

Tests
-----
- tests/agent/test_memory_session_switch.py: new dedicated file. ABC
  default no-op, manager fan-out, failure isolation, empty-id no-op,
  session_id propagation through sync_all/queue_prefetch_all, Hindsight
  state transitions for every reset/non-reset case, parent preservation.
- tests/cli/test_branch_command.py: new test verifying /branch fires
  the hook with correct parent_session_id + reset=False + reason.
- tests/gateway/test_resume_command.py: new test verifying /resume
  evicts the cached agent.
- tests/run_agent/test_memory_sync_interrupted.py: updated existing
  assertions to account for the session_id kwarg on sync_all and
  queue_prefetch_all.

E2E verified (real imports, tmp HERMES_HOME):
- /resume: session_id updates, doc_id fresh, buffers cleared, parent set
- /branch: session_id forks, parent links to original
- /new: reset=True clears accumulated state
- compression: reason='compression' propagated, lineage preserved
- Empty id: no-op, state preserved
- Legacy provider without on_session_switch: no crash

Reported by @nicoloboschi (Hindsight maintainer); related scope-widening
comment by @kidonng extending coverage to compression.
2026-04-29 04:57:22 -07:00

280 lines
12 KiB
Python

"""Abstract base class for pluggable memory providers.
Memory providers give the agent persistent recall across sessions. One
external provider is active at a time alongside the always-on built-in
memory (MEMORY.md / USER.md). The MemoryManager enforces this limit.
Built-in memory is always active as the first provider and cannot be removed.
External providers (Honcho, Hindsight, Mem0, etc.) are additive — they never
disable the built-in store. Only one external provider runs at a time to
prevent tool schema bloat and conflicting memory backends.
Registration:
1. Built-in: BuiltinMemoryProvider — always present, not removable.
2. Plugins: Ship in plugins/memory/<name>/, activated by memory.provider config.
Lifecycle (called by MemoryManager, wired in run_agent.py):
initialize() — connect, create resources, warm up
system_prompt_block() — static text for the system prompt
prefetch(query) — background recall before each turn
sync_turn(user, asst) — async write after each turn
get_tool_schemas() — tool schemas to expose to the model
handle_tool_call() — dispatch a tool call
shutdown() — clean exit
Optional hooks (override to opt in):
on_turn_start(turn, message, **kwargs) — per-turn tick with runtime context
on_session_end(messages) — end-of-session extraction
on_session_switch(new_session_id, **kwargs) — mid-process session_id rotation
on_pre_compress(messages) -> str — extract before context compression
on_memory_write(action, target, content, metadata=None) — mirror built-in memory writes
on_delegation(task, result, **kwargs) — parent-side observation of subagent work
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class MemoryProvider(ABC):
"""Abstract base class for memory providers."""
@property
@abstractmethod
def name(self) -> str:
"""Short identifier for this provider (e.g. 'builtin', 'honcho', 'hindsight')."""
# -- Core lifecycle (implement these) ------------------------------------
@abstractmethod
def is_available(self) -> bool:
"""Return True if this provider is configured, has credentials, and is ready.
Called during agent init to decide whether to activate the provider.
Should not make network calls — just check config and installed deps.
"""
@abstractmethod
def initialize(self, session_id: str, **kwargs) -> None:
"""Initialize for a session.
Called once at agent startup. May create resources (banks, tables),
establish connections, start background threads, etc.
kwargs always include:
- hermes_home (str): The active HERMES_HOME directory path. Use this
for profile-scoped storage instead of hardcoding ``~/.hermes``.
- platform (str): "cli", "telegram", "discord", "cron", etc.
kwargs may also include:
- agent_context (str): "primary", "subagent", "cron", or "flush".
Providers should skip writes for non-primary contexts (cron system
prompts would corrupt user representations).
- agent_identity (str): Profile name (e.g. "coder"). Use for
per-profile provider identity scoping.
- agent_workspace (str): Shared workspace name (e.g. "hermes").
- parent_session_id (str): For subagents, the parent's session_id.
- user_id (str): Platform user identifier (gateway sessions).
"""
def system_prompt_block(self) -> str:
"""Return text to include in the system prompt.
Called during system prompt assembly. Return empty string to skip.
This is for STATIC provider info (instructions, status). Prefetched
recall context is injected separately via prefetch().
"""
return ""
def prefetch(self, query: str, *, session_id: str = "") -> str:
"""Recall relevant context for the upcoming turn.
Called before each API call. Return formatted text to inject as
context, or empty string if nothing relevant. Implementations
should be fast — use background threads for the actual recall
and return cached results here.
session_id is provided for providers serving concurrent sessions
(gateway group chats, cached agents). Providers that don't need
per-session scoping can ignore it.
"""
return ""
def queue_prefetch(self, query: str, *, session_id: str = "") -> None:
"""Queue a background recall for the NEXT turn.
Called after each turn completes. The result will be consumed
by prefetch() on the next turn. Default is no-op — providers
that do background prefetching should override this.
"""
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
"""Persist a completed turn to the backend.
Called after each turn. Should be non-blocking — queue for
background processing if the backend has latency.
"""
@abstractmethod
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""Return tool schemas this provider exposes.
Each schema follows the OpenAI function calling format:
{"name": "...", "description": "...", "parameters": {...}}
Return empty list if this provider has no tools (context-only).
"""
def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str:
"""Handle a tool call for one of this provider's tools.
Must return a JSON string (the tool result).
Only called for tool names returned by get_tool_schemas().
"""
raise NotImplementedError(f"Provider {self.name} does not handle tool {tool_name}")
def shutdown(self) -> None:
"""Clean shutdown — flush queues, close connections."""
# -- Optional hooks (override to opt in) ---------------------------------
def on_turn_start(self, turn_number: int, message: str, **kwargs) -> None:
"""Called at the start of each turn with the user message.
Use for turn-counting, scope management, periodic maintenance.
kwargs may include: remaining_tokens, model, platform, tool_count.
Providers use what they need; extras are ignored.
"""
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
"""Called when a session ends (explicit exit or timeout).
Use for end-of-session fact extraction, summarization, etc.
messages is the full conversation history.
NOT called after every turn — only at actual session boundaries
(CLI exit, /reset, gateway session expiry).
"""
def on_session_switch(
self,
new_session_id: str,
*,
parent_session_id: str = "",
reset: bool = False,
**kwargs,
) -> None:
"""Called when the agent switches session_id mid-process.
Fires on ``/resume``, ``/branch``, ``/reset``, ``/new`` (CLI), the
gateway equivalents, and context compression — any path that
reassigns ``AIAgent.session_id`` without tearing the provider down.
Providers that cache per-session state in ``initialize()``
(``_session_id``, ``_document_id``, accumulated turn buffers,
counters) should update or reset that state here so subsequent
writes land in the correct session's record.
Parameters
----------
new_session_id:
The session_id the agent just switched to.
parent_session_id:
The previous session_id, if meaningful — set for ``/branch``
(fork lineage), context compression (continuation lineage),
and ``/resume`` (the session we're leaving). Empty string
when no lineage applies.
reset:
``True`` when this is a genuinely new conversation, not a
resumption of an existing one. Fired by ``/reset`` / ``/new``.
Providers should flush accumulated per-session buffers
(``_session_turns``, ``_turn_counter``, etc.) when this is
set. ``False`` for ``/resume`` / ``/branch`` / compression
where the logical conversation continues under the new id.
Default is no-op for backward compatibility.
"""
def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str:
"""Called before context compression discards old messages.
Use to extract insights from messages about to be compressed.
messages is the list that will be summarized/discarded.
Return text to include in the compression summary prompt so the
compressor preserves provider-extracted insights. Return empty
string for no contribution (backwards-compatible default).
"""
return ""
def on_delegation(self, task: str, result: str, *,
child_session_id: str = "", **kwargs) -> None:
"""Called on the PARENT agent when a subagent completes.
The parent's memory provider gets the task+result pair as an
observation of what was delegated and what came back. The subagent
itself has no provider session (skip_memory=True).
task: the delegation prompt
result: the subagent's final response
child_session_id: the subagent's session_id
"""
def get_config_schema(self) -> List[Dict[str, Any]]:
"""Return config fields this provider needs for setup.
Used by 'hermes memory setup' to walk the user through configuration.
Each field is a dict with:
key: config key name (e.g. 'api_key', 'mode')
description: human-readable description
secret: True if this should go to .env (default: False)
required: True if required (default: False)
default: default value (optional)
choices: list of valid values (optional)
url: URL where user can get this credential (optional)
env_var: explicit env var name for secrets (default: auto-generated)
Return empty list if no config needed (e.g. local-only providers).
"""
return []
def save_config(self, values: Dict[str, Any], hermes_home: str) -> None:
"""Write non-secret config to the provider's native location.
Called by 'hermes memory setup' after collecting user inputs.
``values`` contains only non-secret fields (secrets go to .env).
``hermes_home`` is the active HERMES_HOME directory path.
Providers with native config files (JSON, YAML) should override
this to write to their expected location. Providers that use only
env vars can leave the default (no-op).
All new memory provider plugins MUST implement either:
- save_config() for native config file formats, OR
- use only env vars (in which case get_config_schema() fields
should all have ``env_var`` set and this method stays no-op).
"""
def on_memory_write(
self,
action: str,
target: str,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Called when the built-in memory tool writes an entry.
action: 'add', 'replace', or 'remove'
target: 'memory' or 'user'
content: the entry content
metadata: structured provenance for the write, when available. Common
keys include ``write_origin``, ``execution_context``, ``session_id``,
``parent_session_id``, ``platform``, and ``tool_name``.
Use to mirror built-in memory writes to your backend.
"""