fix(openviking): gate memory writes and add viking_forget

Mirror built-in memory writes to external providers only after the native memory tool succeeds and is not staged for approval. Keep OpenViking's built-in memory mirroring add-only, since Hermes native memory entries do not yet have stable OpenViking file URIs for replace/remove.

Add a narrow viking_forget tool for exact user memory file deletion and document the current OpenViking write/delete behavior.
This commit is contained in:
Hao Zhe 2026-06-19 18:44:57 +08:00 committed by Teknium
parent 38c56a1e86
commit 70e7132e2f
9 changed files with 560 additions and 45 deletions

View file

@ -32,6 +32,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_cli.timeouts import get_provider_request_timeout
from agent.memory_write_bridge import collect_memory_write_notifications
from agent.prompt_builder import format_steer_marker
from agent.tool_dispatch_helpers import _trajectory_normalize_msg, make_tool_result_message
from agent.trajectory import convert_scratchpad_to_think
@ -1838,29 +1839,24 @@ def invoke_tool(agent, function_name: str, function_args: dict, effective_task_i
operations=operations,
store=agent._memory_store,
)
# Bridge: notify external memory provider of built-in memory writes.
# Covers both the single-op shape and each add/replace inside a batch.
# Bridge: notify external memory providers of successful built-in
# memory writes. Covers the single-op shape and each mutating op
# inside a successful batch.
if agent._memory_manager:
if operations:
_mem_ops = [
op for op in operations
if isinstance(op, dict) and op.get("action") in {"add", "replace"}
]
else:
_mem_ops = (
[{"action": next_args.get("action"), "content": next_args.get("content")}]
if next_args.get("action") in {"add", "replace"} else []
)
_mem_ops = collect_memory_write_notifications(result, next_args)
for _op in _mem_ops:
try:
metadata = agent._build_memory_write_metadata(
task_id=effective_task_id,
tool_call_id=tool_call_id,
)
if _op.get("old_text"):
metadata["old_text"] = _op["old_text"]
agent._memory_manager.on_memory_write(
_op.get("action", ""),
target,
_op.get("target", target),
_op.get("content", "") or "",
metadata=agent._build_memory_write_metadata(
task_id=effective_task_id,
tool_call_id=tool_call_id,
),
metadata=metadata,
)
except Exception:
pass

View file

@ -0,0 +1,61 @@
"""Helpers for mirroring built-in memory writes to external providers."""
from __future__ import annotations
import json
from typing import Any, Dict, List
_MIRRORED_MEMORY_ACTIONS = {"add", "replace", "remove"}
def _memory_tool_result_succeeded(result: Any) -> bool:
if isinstance(result, str):
try:
result = json.loads(result)
except Exception:
return False
if isinstance(result, dict):
if result.get("success") is False:
return False
if result.get("staged") is True:
return False
if "error" in result and result.get("success") is not True:
return False
return True
def collect_memory_write_notifications(
tool_result: Any,
tool_args: Dict[str, Any],
) -> List[Dict[str, str]]:
"""Return provider notifications for a successful built-in memory write."""
if not _memory_tool_result_succeeded(tool_result):
return []
target = str(tool_args.get("target") or "memory")
operations = tool_args.get("operations")
if isinstance(operations, list) and operations:
raw_operations = operations
else:
raw_operations = [{
"action": tool_args.get("action"),
"content": tool_args.get("content"),
"old_text": tool_args.get("old_text"),
}]
notifications: List[Dict[str, str]] = []
for op in raw_operations:
if not isinstance(op, dict):
continue
action = str(op.get("action") or "")
if action not in _MIRRORED_MEMORY_ACTIONS:
continue
notifications.append({
"action": action,
"target": target,
"content": str(op.get("content") or ""),
"old_text": str(op.get("old_text") or ""),
})
return notifications

View file

@ -29,6 +29,7 @@ from agent.display import (
_detect_tool_failure,
)
from agent.tool_guardrails import ToolGuardrailDecision
from agent.memory_write_bridge import collect_memory_write_notifications
from agent.tool_dispatch_helpers import (
_is_destructive_command,
_is_multimodal_tool_result,
@ -1046,29 +1047,24 @@ def execute_tool_calls_sequential(agent, assistant_message, messages: list, effe
operations=operations,
store=agent._memory_store,
)
# Bridge: notify external memory provider of built-in memory writes.
# Covers both the single-op shape and each add/replace inside a batch.
# Bridge: notify external memory providers of successful built-in
# memory writes. Covers the single-op shape and each mutating op
# inside a successful batch.
if agent._memory_manager:
if operations:
_mem_ops = [
op for op in operations
if isinstance(op, dict) and op.get("action") in {"add", "replace"}
]
else:
_mem_ops = (
[{"action": next_args.get("action"), "content": next_args.get("content")}]
if next_args.get("action") in {"add", "replace"} else []
)
_mem_ops = collect_memory_write_notifications(result, next_args)
for _op in _mem_ops:
try:
metadata = agent._build_memory_write_metadata(
task_id=effective_task_id,
tool_call_id=getattr(tool_call, "id", None),
)
if _op.get("old_text"):
metadata["old_text"] = _op["old_text"]
agent._memory_manager.on_memory_write(
_op.get("action", ""),
target,
_op.get("target", target),
_op.get("content", "") or "",
metadata=agent._build_memory_write_metadata(
task_id=effective_task_id,
tool_call_id=getattr(tool_call, "id", None),
),
metadata=metadata,
)
except Exception:
pass

View file

@ -47,5 +47,37 @@ Hermes sends `OPENVIKING_ACCOUNT` and `OPENVIKING_USER` as identity headers.
| `viking_search` | Semantic search with fast/deep/auto modes |
| `viking_read` | Read content at a viking:// URI (abstract/overview/full) |
| `viking_browse` | Filesystem-style navigation (list/tree/stat) |
| `viking_remember` | Store a fact for extraction on session commit |
| `viking_remember` | Store a fact directly with OpenViking `content/write` |
| `viking_forget` | Delete one exact `viking://` memory file URI |
| `viking_add_resource` | Ingest URLs/docs into the knowledge base |
## Memory Writes And Deletes
`viking_remember` writes directly to OpenViking with `POST /api/v1/content/write`
and `mode=create`. It creates peer-scoped memory files under
`viking://user/peers/${OPENVIKING_AGENT}/memories/...`; OpenViking may return a
canonical user-scoped form such as
`viking://user/default/peers/${OPENVIKING_AGENT}/memories/...` in API-key mode.
Explicit remembers do not depend on session commit extraction.
Hermes built-in `memory` tool additions are mirrored to OpenViking after the
local memory operation succeeds:
| Hermes action | OpenViking operation |
|---------------|----------------------|
| `add` | `content/write` with `mode=create` under the configured peer memory namespace |
Built-in `replace` and `remove` operations are not mirrored because Hermes
native memory entries do not yet carry stable OpenViking file URIs. Use
`viking_forget` when the user explicitly asks to delete a specific OpenViking
memory URI.
`viking_forget` is intentionally narrow. It only accepts concrete user memory
file URIs, such as
`viking://user/peers/hermes/memories/preferences/mem_abc123.md` or the canonical
`viking://user/default/peers/hermes/memories/preferences/mem_abc123.md`. Files
directly under `memories/`, such as `viking://user/default/memories/profile.md`,
are also allowed because OpenViking supports them. The tool rejects directories,
resources, skills, sessions, generated summary files, and URIs with query
strings or fragments. Use OpenViking's MCP, CLI, or admin APIs for broader
resource and directory cleanup.

View file

@ -91,6 +91,13 @@ _MEMORY_WRITE_TARGET_SUBDIR_MAP = {
"user": "preferences",
"memory": "patterns",
}
_DERIVED_MEMORY_FILENAMES = {
".abstract.md",
".overview.md",
".read.md",
".full.md",
".relations.json",
}
_LOCAL_OPENVIKING_HOSTS = {"localhost", "127.0.0.1", "::1"}
_LOCAL_OPENVIKING_AUTOSTART_TIMEOUT = 60.0
_OPENVIKING_SERVER_LOG_RELATIVE_PATH = Path("logs") / "openviking-server.log"
@ -320,6 +327,13 @@ class _VikingClient:
)
)
def delete(self, path: str, **kwargs) -> dict:
return self._send_with_trusted_identity_retry(
lambda headers: self._httpx.delete(
self._url(path), headers=headers, timeout=_TIMEOUT, **kwargs
)
)
def upload_temp_file(self, file_path: Path) -> str:
mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
@ -460,6 +474,26 @@ REMEMBER_SCHEMA = {
},
}
FORGET_SCHEMA = {
"name": "viking_forget",
"description": (
"Delete one OpenViking memory file by exact viking:// URI. "
"Use only when the user explicitly asks to forget or delete a specific "
"memory and you have the exact memory file URI. Resources, skills, "
"sessions, directories, generated summaries, and broad deletes are rejected."
),
"parameters": {
"type": "object",
"properties": {
"uri": {
"type": "string",
"description": "Exact viking:// memory file URI ending in .md.",
},
},
"required": ["uri"],
},
}
ADD_RESOURCE_SCHEMA = {
"name": "viking_add_resource",
"description": (
@ -552,6 +586,46 @@ def _is_remote_resource_source(value: str) -> bool:
return value.startswith(_REMOTE_RESOURCE_PREFIXES)
def _memory_segment_index(parts: List[str]) -> Optional[int]:
if len(parts) >= 2 and parts[0] == "user" and parts[1] == "memories":
return 1
if len(parts) >= 3 and parts[0] == "user" and parts[2] == "memories":
return 2
if len(parts) >= 4 and parts[0] == "user" and parts[1] == "peers" and parts[3] == "memories":
return 3
if len(parts) >= 5 and parts[0] == "user" and parts[2] == "peers" and parts[4] == "memories":
return 4
return None
def _validate_forget_memory_uri(raw_uri: Any) -> tuple[Optional[str], Optional[str]]:
if not isinstance(raw_uri, str):
return None, "uri is required"
uri = raw_uri.strip()
if not uri:
return None, "uri is required"
parsed = urlparse(uri)
if parsed.scheme != "viking" or not uri.startswith("viking://"):
return None, "viking_forget only accepts viking:// memory file URIs"
if parsed.query or parsed.fragment:
return None, "viking_forget requires an exact URI without query or fragment"
if uri.endswith("/") or not uri.endswith(".md"):
return None, "viking_forget only deletes concrete .md memory files"
parts = [part for part in uri[len("viking://") :].split("/") if part]
memories_idx = _memory_segment_index(parts)
if memories_idx is None or len(parts) < memories_idx + 2:
return None, "viking_forget only deletes user memory file URIs"
filename = uri.rsplit("/", 1)[-1]
if filename in _DERIVED_MEMORY_FILENAMES:
return None, "viking_forget cannot delete generated memory summary files"
return uri, None
def _is_local_path_reference(value: str) -> bool:
if not value or "\n" in value or "\r" in value:
return False
@ -2047,7 +2121,8 @@ class OpenVikingMemoryProvider(MemoryProvider):
f"Active. Endpoint: {self._endpoint}\n"
"Use viking_search to find information, viking_read for details "
"(abstract/overview/full), viking_browse to explore.\n"
"Use viking_remember to store facts, viking_add_resource to index URLs/docs."
"Use viking_remember to store facts, viking_forget to delete exact memory "
"file URIs, and viking_add_resource to index URLs/docs."
)
except Exception as e:
logger.warning("OpenViking system_prompt_block failed: %s", e)
@ -2055,7 +2130,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
"# OpenViking Knowledge Base\n"
f"Active. Endpoint: {self._endpoint}\n"
"Use viking_search, viking_read, viking_browse, "
"viking_remember, viking_add_resource."
"viking_remember, viking_forget, viking_add_resource."
)
def prefetch(self, query: str, *, session_id: str = "") -> str:
@ -2806,7 +2881,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Mirror built-in memory writes to OpenViking via content/write."""
"""Mirror successful built-in memory additions to OpenViking."""
if not self._client or action != "add" or not content:
return
@ -2831,7 +2906,14 @@ class OpenVikingMemoryProvider(MemoryProvider):
t.start()
def get_tool_schemas(self) -> List[Dict[str, Any]]:
return [SEARCH_SCHEMA, READ_SCHEMA, BROWSE_SCHEMA, REMEMBER_SCHEMA, ADD_RESOURCE_SCHEMA]
return [
SEARCH_SCHEMA,
READ_SCHEMA,
BROWSE_SCHEMA,
REMEMBER_SCHEMA,
FORGET_SCHEMA,
ADD_RESOURCE_SCHEMA,
]
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
if not self._client:
@ -2846,6 +2928,8 @@ class OpenVikingMemoryProvider(MemoryProvider):
return self._tool_browse(args)
elif tool_name == "viking_remember":
return self._tool_remember(args)
elif tool_name == "viking_forget":
return self._tool_forget(args)
elif tool_name == "viking_add_resource":
return self._tool_add_resource(args)
return tool_error(f"Unknown tool: {tool_name}")
@ -3097,6 +3181,31 @@ class OpenVikingMemoryProvider(MemoryProvider):
logger.error("OpenViking content/write failed: %s", e)
return tool_error(f"Failed to store memory: {e}")
def _tool_forget(self, args: dict) -> str:
uri, error = _validate_forget_memory_uri(args.get("uri"))
if error:
return tool_error(error)
resp = self._client.delete(
"/api/v1/fs",
params={"uri": uri, "recursive": False},
)
result = self._unwrap_result(resp)
payload: Dict[str, Any] = {"status": "deleted", "uri": uri}
if isinstance(result, dict):
payload["uri"] = result.get("uri") or uri
for key in (
"estimated_deleted_count",
"memory_cleanup",
"semantic_root_uri",
"semantic_status",
"queue_status",
):
if key in result:
payload[key] = result[key]
return json.dumps(payload, ensure_ascii=False)
def _tool_add_resource(self, args: dict) -> str:
url = args.get("url", "")
if not url:

View file

@ -1172,16 +1172,12 @@ class TestOnMemoryWriteBridge:
mgr.on_memory_write("replace", "user", "updated pref")
assert p.memory_writes == [("replace", "user", "updated pref")]
def test_on_memory_write_remove_not_bridged(self):
"""The bridge intentionally skips 'remove' — only add/replace notify."""
# This tests the contract that run_agent.py checks:
# function_args.get("action") in ("add", "replace")
def test_on_memory_write_remove_supported_by_manager(self):
"""The manager forwards remove actions when a caller elects to bridge them."""
mgr = MemoryManager()
p = FakeMemoryProvider("ext")
mgr.add_provider(p)
# Manager itself doesn't filter — run_agent.py does.
# But providers should handle remove gracefully.
mgr.on_memory_write("remove", "memory", "old fact")
assert p.memory_writes == [("remove", "memory", "old fact")]

View file

@ -0,0 +1,84 @@
import json
from agent.memory_write_bridge import collect_memory_write_notifications
def test_collect_notifications_includes_remove_with_old_text_after_success():
notifications = collect_memory_write_notifications(
json.dumps({"success": True}),
{
"action": "remove",
"target": "memory",
"old_text": "stale preference entry",
},
)
assert notifications == [
{
"action": "remove",
"target": "memory",
"content": "",
"old_text": "stale preference entry",
}
]
def test_collect_notifications_skips_failed_memory_write():
notifications = collect_memory_write_notifications(
json.dumps({"success": False, "error": "No entry matched"}),
{
"action": "remove",
"target": "memory",
"old_text": "stale preference entry",
},
)
assert notifications == []
def test_collect_notifications_skips_staged_memory_write():
notifications = collect_memory_write_notifications(
json.dumps({"success": True, "staged": True, "pending_id": "abc123"}),
{
"action": "remove",
"target": "memory",
"old_text": "stale preference entry",
},
)
assert notifications == []
def test_collect_notifications_preserves_old_text_for_replace_and_remove_batch():
notifications = collect_memory_write_notifications(
json.dumps({"success": True}),
{
"target": "user",
"operations": [
{"action": "replace", "old_text": "old preference", "content": "updated"},
{"action": "remove", "old_text": "obsolete preference"},
{"action": "add", "content": "new fact"},
],
},
)
assert notifications == [
{
"action": "replace",
"target": "user",
"content": "updated",
"old_text": "old preference",
},
{
"action": "remove",
"target": "user",
"content": "",
"old_text": "obsolete preference",
},
{
"action": "add",
"target": "user",
"content": "new fact",
"old_text": "",
},
]

View file

@ -1459,6 +1459,115 @@ def test_tool_add_resource_sends_git_remote_sources_as_path(url):
})
def test_get_tool_schemas_includes_narrow_forget_tool():
provider = OpenVikingMemoryProvider()
names = [schema["name"] for schema in provider.get_tool_schemas()]
assert "viking_forget" in names
def test_handle_tool_call_forget_deletes_exact_memory_file_uri():
uri = "viking://user/peers/hermes/memories/preferences/mem_abc123.md"
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
provider._client.delete.return_value = {
"status": "ok",
"result": {"uri": uri, "estimated_deleted_count": 1},
}
result = json.loads(provider.handle_tool_call("viking_forget", {"uri": uri}))
provider._client.delete.assert_called_once_with(
"/api/v1/fs",
params={"uri": uri, "recursive": False},
)
assert result == {
"status": "deleted",
"uri": uri,
"estimated_deleted_count": 1,
}
def test_handle_tool_call_forget_deletes_exact_memory_file_under_memories_root():
uri = "viking://user/default/memories/profile.md"
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
provider._client.delete.return_value = {
"status": "ok",
"result": {"uri": uri, "estimated_deleted_count": 1},
}
result = json.loads(provider.handle_tool_call("viking_forget", {"uri": uri}))
provider._client.delete.assert_called_once_with(
"/api/v1/fs",
params={"uri": uri, "recursive": False},
)
assert result == {
"status": "deleted",
"uri": uri,
"estimated_deleted_count": 1,
}
@pytest.mark.parametrize("uri", [
"",
"https://example.com/mem.md",
"viking:/user/memories/preferences/mem_abc123.md",
"viking://resources/project/doc.md",
"viking://resources/project/memories/mem_abc123.md",
"viking://memories/preferences/mem_abc123.md",
"viking://agent/hermes/memories/preferences/mem_abc123.md",
"viking://user/skills/example/SKILL.md",
"viking://user/sessions/session-1/messages.jsonl",
"viking://user/memories/preferences/",
"viking://user/memories/preferences/.overview.md",
"viking://user/memories/preferences/.abstract.md",
"viking://user/memories/preferences/mem_abc123.md?recursive=true",
])
def test_handle_tool_call_forget_rejects_non_memory_file_uris(uri):
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
result = json.loads(provider.handle_tool_call("viking_forget", {"uri": uri}))
assert "error" in result
provider._client.delete.assert_not_called()
def test_viking_client_delete_uses_identity_headers(monkeypatch):
client = _VikingClient(
"https://example.com",
api_key="test-key",
account="acct",
user="alice",
agent="hermes",
)
captured = {}
def capture_delete(url, **kwargs):
captured["url"] = url
captured["kwargs"] = kwargs
return SimpleNamespace(
status_code=200,
text="",
json=lambda: {"status": "ok", "result": {"uri": "viking://user/memories/x.md"}},
raise_for_status=lambda: None,
)
monkeypatch.setattr(client._httpx, "delete", capture_delete)
assert client.delete("/api/v1/fs", params={"uri": "viking://user/memories/x.md"}) == {
"status": "ok",
"result": {"uri": "viking://user/memories/x.md"},
}
assert captured["url"] == "https://example.com/api/v1/fs"
assert captured["kwargs"]["params"] == {"uri": "viking://user/memories/x.md"}
assert captured["kwargs"]["headers"]["Authorization"] == "Bearer test-key"
assert captured["kwargs"]["headers"]["X-OpenViking-Actor-Peer"] == "hermes"
def test_viking_client_upload_temp_file_uses_multipart_identity_headers(tmp_path, monkeypatch):
sample = tmp_path / "sample.md"
sample.write_text("# Local resource\n", encoding="utf-8")
@ -2637,6 +2746,46 @@ def test_on_memory_write_uses_content_write_independent_of_session_rotation():
)
@pytest.mark.parametrize(
("action", "content"),
[
("replace", "updated memory"),
("remove", ""),
("forget", ""),
("delete", ""),
],
)
def test_on_memory_write_ignores_non_add_actions(action, content, monkeypatch):
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
provider._endpoint = "http://test"
provider._api_key = ""
provider._account = "acct"
provider._user = "usr"
provider._agent = "hermes"
uri = "viking://user/peers/hermes/memories/preferences/mem_abc123.md"
spawned = []
class StubThread:
def __init__(self, *args, **kwargs):
spawned.append((args, kwargs))
def start(self):
raise AssertionError("non-URI remove should not spawn a mirror thread")
import plugins.memory.openviking as _mod
monkeypatch.setattr(_mod.threading, "Thread", StubThread)
provider.on_memory_write(
action,
"memory",
content,
metadata={"uri": uri, "old_text": "stale fact"},
)
assert spawned == []
# ---------------------------------------------------------------------------
# Prefetch staleness: a prefetch worker that finishes AFTER a session switch
# must drop its result instead of repopulating the new session with stale

View file

@ -2082,6 +2082,41 @@ class TestExecuteToolCalls:
assert messages[0]["role"] == "tool"
assert "search result" in messages[0]["content"]
def test_sequential_memory_remove_notifies_provider_with_tool_result(self, agent):
old_text = "stale preference entry"
tc = _mock_tool_call(
name="memory",
arguments=json.dumps({
"action": "remove",
"target": "memory",
"old_text": old_text,
}),
call_id="mem-1",
)
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
messages = []
calls = []
class FakeMemoryManager:
def has_tool(self, name):
return False
def on_memory_write(self, action, target, content, metadata=None):
calls.append((action, target, content, metadata or {}))
agent._memory_manager = FakeMemoryManager()
agent._memory_store = object()
with patch("tools.memory_tool.memory_tool", return_value=json.dumps({"success": True})):
agent._execute_tool_calls_sequential(mock_msg, messages, "task-1")
assert len(calls) == 1
action, target, content, metadata = calls[0]
assert (action, target, content) == ("remove", "memory", "")
assert metadata["old_text"] == old_text
assert metadata["tool_call_id"] == "mem-1"
assert messages[-1]["tool_call_id"] == "mem-1"
def test_keyboard_interrupt_emits_cancelled_post_tool_hook(self, agent, monkeypatch):
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
@ -2797,6 +2832,63 @@ class TestConcurrentToolExecution:
assert json.loads(result) == {"error": "Blocked"}
assert agent._turns_since_memory == 5
def test_invoke_tool_memory_remove_notifies_provider_with_old_text(self, agent, monkeypatch):
monkeypatch.setattr(
"hermes_cli.plugins.get_pre_tool_call_block_message",
lambda *args, **kwargs: None,
)
calls = []
class FakeMemoryManager:
def has_tool(self, name):
return False
def on_memory_write(self, action, target, content, metadata=None):
calls.append((action, target, content, metadata or {}))
old_text = "stale preference entry"
agent._memory_manager = FakeMemoryManager()
agent._memory_store = object()
with patch("tools.memory_tool.memory_tool", return_value=json.dumps({"success": True})):
agent._invoke_tool(
"memory",
{"action": "remove", "target": "memory", "old_text": old_text},
"task-1",
tool_call_id="mem-1",
)
assert len(calls) == 1
action, target, content, metadata = calls[0]
assert (action, target, content) == ("remove", "memory", "")
assert metadata["old_text"] == old_text
assert metadata["tool_call_id"] == "mem-1"
def test_invoke_tool_memory_failed_remove_skips_provider_notification(self, agent, monkeypatch):
monkeypatch.setattr(
"hermes_cli.plugins.get_pre_tool_call_block_message",
lambda *args, **kwargs: None,
)
manager = SimpleNamespace(
has_tool=lambda name: False,
on_memory_write=MagicMock(side_effect=AssertionError("should not notify")),
)
agent._memory_manager = manager
agent._memory_store = object()
with patch(
"tools.memory_tool.memory_tool",
return_value=json.dumps({"success": False, "error": "No entry matched"}),
):
agent._invoke_tool(
"memory",
{"action": "remove", "target": "memory", "old_text": "missing"},
"task-1",
tool_call_id="mem-1",
)
manager.on_memory_write.assert_not_called()
def test_concurrent_blocked_write_skips_checkpoint(self, agent, monkeypatch):
"""Concurrent path: blocked write_file should not trigger checkpoint."""
tc1 = _mock_tool_call(name="write_file",