hermes-agent/tests/agent/test_memory_skill_scaffolding.py
Teknium c2c55c4443 fix(memory): strip skill scaffolding for all providers, not just openviking
Generalizes #32663 (@ehz0ah). The slash-skill scaffolding pollution
affected every auto-syncing memory provider — mem0, hindsight, retaindb,
byterover, honcho, supermemory all store/embed the raw user turn, so a
/skill invocation poisoned their stores with the full skill body, not just
openviking.

- Lift the contributor's parser into agent/skill_commands.py as the canonical
  extract_user_instruction_from_skill_message(), co-located with the message
  builders so the markers can't drift.
- Strip once in MemoryManager.{prefetch_all,queue_prefetch_all,sync_all} —
  fixes the whole provider fan-out, bare /skill turns are skipped entirely.
- OpenViking's _derive_openviking_user_text() now delegates to the shared
  helper as defense-in-depth (no duplicated marker literals).
- Marker-drift regression now asserts against the canonical skill_commands
  constants; add manager-level coverage proving every provider gets clean text.
2026-06-16 10:37:37 -07:00

161 lines
5.8 KiB
Python

"""MemoryManager strips slash-skill scaffolding for every provider.
When a user invokes a /skill or /bundle, Hermes expands the turn into a
model-facing message that embeds the full skill body. Feeding that verbatim to
memory providers pollutes their stores/embeddings with prompt scaffolding
instead of what the user actually asked. The strip lives once in MemoryManager
so it covers the whole provider fan-out — not per backend.
See: agent.skill_commands.extract_user_instruction_from_skill_message and
MemoryManager._strip_skill_scaffolding.
"""
from agent.memory_manager import MemoryManager
from agent.memory_provider import MemoryProvider
from agent.skill_commands import extract_user_instruction_from_skill_message
_SINGLE_SKILL_TURN = (
'[IMPORTANT: The user has invoked the "skill-creator" skill, indicating they want '
"you to follow its instructions. The full skill content is loaded below.]\n\n"
"# Skill Creator\n\n"
"Large skill body that must not be searched or embedded.\n\n"
"The user has provided the following instruction alongside the skill invocation: "
"make a skill for release triage"
)
_BUNDLE_TURN = (
'[IMPORTANT: The user has invoked the "backend-dev" skill bundle, '
"loading 2 skills together. Treat every skill below as active guidance for this turn.]\n\n"
"Bundle: backend-dev\n"
"Skills loaded: test-driven-development, code-review\n\n"
"User instruction: fix the failing retrieval test\n\n"
'[Loaded as part of the "backend-dev" skill bundle.]\n\n'
"Large bundled skill body that must not be searched or embedded."
)
_BARE_SKILL_TURN = (
'[IMPORTANT: The user has invoked the "skill-creator" skill, indicating they want '
"you to follow its instructions. The full skill content is loaded below.]\n\n"
"# Skill Creator\n\n"
"Large skill body, no user instruction."
)
class _RecordingProvider(MemoryProvider):
"""Captures exactly what user text each fan-out method received."""
_name = "recording"
def __init__(self):
self.prefetched = []
self.queued = []
self.synced = []
@property
def name(self) -> str:
return self._name
def initialize(self, session_id: str = "", **kwargs) -> None:
pass
def is_available(self) -> bool:
return True
def system_prompt_block(self) -> str:
return ""
def prefetch(self, query, *, session_id: str = "") -> str:
self.prefetched.append(query)
return ""
def queue_prefetch(self, query, *, session_id: str = "") -> None:
self.queued.append(query)
def sync_turn(self, user_content, assistant_content, *, session_id: str = "", messages=None) -> None:
self.synced.append(user_content)
def get_tool_schemas(self):
return []
def _manager_with_recorder():
mgr = MemoryManager()
provider = _RecordingProvider()
mgr.add_provider(provider)
return mgr, provider
class TestExtractUserInstruction:
def test_non_string_returns_none(self):
assert extract_user_instruction_from_skill_message(None) is None
assert extract_user_instruction_from_skill_message(123) is None
assert extract_user_instruction_from_skill_message([{"text": "hi"}]) is None
def test_plain_message_passes_through(self):
assert extract_user_instruction_from_skill_message("just a message") == "just a message"
def test_single_skill_with_instruction(self):
assert (
extract_user_instruction_from_skill_message(_SINGLE_SKILL_TURN)
== "make a skill for release triage"
)
def test_bundle_with_instruction(self):
assert (
extract_user_instruction_from_skill_message(_BUNDLE_TURN)
== "fix the failing retrieval test"
)
def test_bare_skill_returns_none(self):
assert extract_user_instruction_from_skill_message(_BARE_SKILL_TURN) is None
def test_runtime_note_trimmed_from_single_skill(self):
turn = _SINGLE_SKILL_TURN + "\n\n[Runtime note: in a subagent]"
assert (
extract_user_instruction_from_skill_message(turn)
== "make a skill for release triage"
)
class TestMemoryManagerStripsScaffolding:
def test_prefetch_all_strips_single_skill(self):
mgr, provider = _manager_with_recorder()
mgr.prefetch_all(_SINGLE_SKILL_TURN)
assert provider.prefetched == ["make a skill for release triage"]
def test_prefetch_all_skips_bare_skill(self):
mgr, provider = _manager_with_recorder()
result = mgr.prefetch_all(_BARE_SKILL_TURN)
assert result == ""
assert provider.prefetched == []
def test_queue_prefetch_all_strips_bundle(self):
mgr, provider = _manager_with_recorder()
mgr.queue_prefetch_all(_BUNDLE_TURN)
mgr.flush_pending(timeout=5.0)
assert provider.queued == ["fix the failing retrieval test"]
def test_queue_prefetch_all_skips_bare_skill(self):
mgr, provider = _manager_with_recorder()
mgr.queue_prefetch_all(_BARE_SKILL_TURN)
mgr.flush_pending(timeout=5.0)
assert provider.queued == []
def test_sync_all_strips_single_skill(self):
mgr, provider = _manager_with_recorder()
mgr.sync_all(_SINGLE_SKILL_TURN, "Done.")
mgr.flush_pending(timeout=5.0)
assert provider.synced == ["make a skill for release triage"]
def test_sync_all_skips_bare_skill(self):
mgr, provider = _manager_with_recorder()
mgr.sync_all(_BARE_SKILL_TURN, "Done.")
mgr.flush_pending(timeout=5.0)
assert provider.synced == []
def test_plain_message_passes_through_unchanged(self):
mgr, provider = _manager_with_recorder()
mgr.sync_all("what's the weather", "Sunny.")
mgr.flush_pending(timeout=5.0)
assert provider.synced == ["what's the weather"]