Merge pull request #48924 from kshitijk4poor/salvage-48894-structured-sync

fix(openviking): structured turn sync — guard empty tool_id, reuse env_var_enabled (salvage #48894)
This commit is contained in:
kshitij 2026-06-19 14:11:48 +05:30 committed by GitHub
commit 527a47f2fe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 902 additions and 23 deletions

View file

@ -290,6 +290,7 @@ def run_codex_app_server_turn(
original_user_message=original_user_message,
final_response=turn.final_text,
interrupted=False,
messages=messages,
)
except Exception:
logger.debug("external memory sync raised", exc_info=True)

50
agent/message_content.py Normal file
View file

@ -0,0 +1,50 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
_NON_TEXT_PART_TYPES = {"image", "image_url", "input_image", "audio", "input_audio"}
_TEXT_KEYS = ("text", "content", "input_text", "output_text", "summary_text")
def _field(value: Any, key: str) -> Any:
if isinstance(value, Mapping):
return value.get(key)
return getattr(value, key, None)
def _text_from_part(part: Any) -> str:
if part is None:
return ""
if isinstance(part, str):
return part
part_type = str(_field(part, "type") or "").strip().lower()
if part_type in _NON_TEXT_PART_TYPES:
return ""
for key in _TEXT_KEYS:
text = _field(part, key)
if isinstance(text, str):
return text
return ""
def flatten_message_text(content: Any, *, sep: str = "\n") -> str:
"""Return the visible text from common chat/Responses message content shapes."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
chunks = [_text_from_part(part) for part in content]
return sep.join(chunk for chunk in chunks if chunk)
text = _text_from_part(content)
if text:
return text
try:
return str(content)
except Exception:
return ""

View file

@ -45,10 +45,11 @@ from typing import Any, Callable, Dict, List, Optional, Set
from urllib.parse import urlparse
from urllib.request import url2pathname
from agent.message_content import flatten_message_text
from agent.memory_provider import MemoryProvider
from agent.skill_commands import extract_user_instruction_from_skill_message
from tools.registry import tool_error
from utils import atomic_json_write
from utils import atomic_json_write, env_var_enabled
logger = logging.getLogger(__name__)
@ -70,6 +71,7 @@ _TIMEOUT = 30.0
_SESSION_DRAIN_TIMEOUT = 10.0
_DEFERRED_COMMIT_TIMEOUT = (_TIMEOUT * 2) + 5.0
_REMOTE_RESOURCE_PREFIXES = ("http://", "https://", "git@", "ssh://", "git://")
_SYNC_TRACE_ENV = "HERMES_OPENVIKING_SYNC_TRACE"
# Maps the viking_remember `category` enum to a viking:// subdirectory.
# Keep in sync with REMEMBER_SCHEMA.parameters.properties.category.enum.
@ -156,6 +158,18 @@ def _derive_openviking_user_text(content: Any) -> str:
return extract_user_instruction_from_skill_message(content) or ""
def _sync_trace_enabled() -> bool:
return env_var_enabled(_SYNC_TRACE_ENV)
def _preview(value: Any, limit: int = 160) -> str:
text = "" if value is None else str(value)
text = text.replace("\n", "\\n")
if len(text) > limit:
return text[:limit] + "..."
return text
# ---------------------------------------------------------------------------
# Process-level atexit safety net — ensures pending sessions are committed
# even if shutdown_memory_provider is never called (e.g. gateway crash,
@ -488,6 +502,25 @@ ADD_RESOURCE_SCHEMA = {
}
# Recall tools (read-only) whose results we never re-ingest into OpenViking —
# echoing recalled memory back into the session transcript would re-store it.
# Write tools (viking_remember / viking_add_resource) are intentionally NOT
# here. Derived from the canonical schema names so renames can't desync.
_OPENVIKING_RECALL_TOOL_NAMES = {
SEARCH_SCHEMA["name"],
READ_SCHEMA["name"],
BROWSE_SCHEMA["name"],
}
# Canonical tool_status values emitted in OpenViking batch tool parts.
_TOOL_STATUS_COMPLETED = "completed"
_TOOL_STATUS_ERROR = "error"
_TOOL_STATUS_PENDING = "pending"
# Inbound status aliases (from varied tool-result shapes) -> canonical above.
_TOOL_STATUS_ERROR_ALIASES = {"error", "failed", "failure"}
_TOOL_STATUS_COMPLETED_ALIASES = {"completed", "complete", "success", "succeeded"}
def _zip_directory(dir_path: Path) -> Path:
"""Create a temporary zip file containing a directory tree."""
root = dir_path.resolve()
@ -2221,7 +2254,10 @@ class OpenVikingMemoryProvider(MemoryProvider):
def _commit_session(self, sid: str, turn_count: int, *, context: str) -> bool:
try:
self._client.post(f"/api/v1/sessions/{sid}/commit")
self._client.post(
f"/api/v1/sessions/{sid}/commit",
{"keep_recent_count": 0},
)
self._mark_session_committed(sid)
logger.info("OpenViking session %s committed %s (%d turns)", sid, context, turn_count)
return True
@ -2293,7 +2329,265 @@ class OpenVikingMemoryProvider(MemoryProvider):
with self._prefetch_lock:
self._prefetch_result = ""
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
@staticmethod
def _message_text(content: Any) -> str:
"""Extract text from OpenAI-style string/list content."""
return flatten_message_text(content)
@classmethod
def _message_matches_text(cls, message: Dict[str, Any], expected: Any) -> bool:
expected_text = cls._message_text(expected).strip()
if not expected_text:
return False
actual_text = cls._message_text(message.get("content")).strip()
return actual_text == expected_text
@classmethod
def _extract_current_turn_messages(
cls,
messages: Optional[List[Dict[str, Any]]],
user_content: str,
assistant_content: str,
) -> List[Dict[str, Any]]:
"""Slice the completed turn out of Hermes' full canonical transcript."""
if not messages:
return []
end_idx: Optional[int] = None
if cls._message_text(assistant_content).strip():
for idx in range(len(messages) - 1, -1, -1):
message = messages[idx]
if (
isinstance(message, dict)
and message.get("role") == "assistant"
and cls._message_matches_text(message, assistant_content)
):
end_idx = idx
break
if end_idx is None:
for idx in range(len(messages) - 1, -1, -1):
message = messages[idx]
if isinstance(message, dict) and message.get("role") == "assistant":
end_idx = idx
break
if end_idx is None:
end_idx = len(messages) - 1
start_idx: Optional[int] = None
if cls._message_text(user_content).strip():
for idx in range(end_idx, -1, -1):
message = messages[idx]
if (
isinstance(message, dict)
and message.get("role") == "user"
and cls._message_matches_text(message, user_content)
):
start_idx = idx
break
if start_idx is None:
for idx in range(end_idx, -1, -1):
message = messages[idx]
if isinstance(message, dict) and message.get("role") == "user":
start_idx = idx
break
if start_idx is None:
return []
return [message for message in messages[start_idx : end_idx + 1] if isinstance(message, dict)]
@staticmethod
def _tool_call_id(tool_call: Dict[str, Any]) -> str:
return str(tool_call.get("id") or tool_call.get("tool_call_id") or "")
@staticmethod
def _tool_call_name(tool_call: Dict[str, Any]) -> str:
function = tool_call.get("function")
if isinstance(function, dict):
return str(function.get("name") or "")
return str(tool_call.get("name") or "")
@staticmethod
def _is_openviking_recall_tool_name(tool_name: Any) -> bool:
return str(tool_name or "").strip().lower() in _OPENVIKING_RECALL_TOOL_NAMES
@staticmethod
def _tool_call_input(tool_call: Dict[str, Any]) -> Dict[str, Any]:
function = tool_call.get("function")
raw_args: Any = None
if isinstance(function, dict):
raw_args = function.get("arguments")
if raw_args is None:
raw_args = tool_call.get("args")
if raw_args is None:
return {}
if isinstance(raw_args, dict):
return raw_args
if isinstance(raw_args, str):
if not raw_args.strip():
return {}
try:
parsed = json.loads(raw_args)
except Exception:
return {"value": raw_args}
if isinstance(parsed, dict):
return parsed
return {"value": parsed}
return {"value": raw_args}
@classmethod
def _tool_result_status(cls, message: Dict[str, Any]) -> str:
raw_status = str(message.get("status") or message.get("tool_status") or "").lower()
if raw_status in _TOOL_STATUS_ERROR_ALIASES:
return _TOOL_STATUS_ERROR
if raw_status in _TOOL_STATUS_COMPLETED_ALIASES:
return _TOOL_STATUS_COMPLETED
text = cls._message_text(message.get("content")).strip()
if text:
try:
parsed = json.loads(text)
except Exception:
parsed = None
if isinstance(parsed, dict):
status = str(parsed.get("status") or "").lower()
exit_code = parsed.get("exit_code")
if (
status in _TOOL_STATUS_ERROR_ALIASES
or parsed.get("success") is False
or bool(parsed.get("error"))
or (isinstance(exit_code, int) and exit_code != 0)
):
return _TOOL_STATUS_ERROR
return _TOOL_STATUS_COMPLETED
@classmethod
def _messages_to_openviking_batch(
cls,
messages: List[Dict[str, Any]],
*,
assistant_peer_id: str = "",
) -> List[Dict[str, Any]]:
"""Convert Hermes canonical messages into OpenViking batch payloads."""
assistant_peer_id = str(assistant_peer_id or "").strip()
tool_calls_by_id: Dict[str, Dict[str, Any]] = {}
completed_tool_ids: set[str] = set()
skipped_tool_ids: set[str] = set()
for message in messages:
if not isinstance(message, dict):
continue
if message.get("role") == "tool":
tool_id = str(message.get("tool_call_id") or message.get("id") or "")
if tool_id:
completed_tool_ids.add(tool_id)
if cls._is_openviking_recall_tool_name(message.get("name")):
skipped_tool_ids.add(tool_id)
continue
if message.get("role") != "assistant":
continue
for tool_call in message.get("tool_calls") or []:
if not isinstance(tool_call, dict):
continue
tool_id = cls._tool_call_id(tool_call)
tool_name = cls._tool_call_name(tool_call)
if tool_id:
tool_calls_by_id[tool_id] = {
"tool_name": tool_name,
"tool_input": cls._tool_call_input(tool_call),
}
if cls._is_openviking_recall_tool_name(tool_name):
skipped_tool_ids.add(tool_id)
payload_messages: List[Dict[str, Any]] = []
pending_tool_parts: List[Dict[str, Any]] = []
def payload_message(role: str, parts: List[Dict[str, Any]]) -> Dict[str, Any]:
payload: Dict[str, Any] = {"role": role, "parts": parts}
if role == "assistant" and assistant_peer_id:
payload["peer_id"] = assistant_peer_id
return payload
def flush_tool_parts() -> None:
nonlocal pending_tool_parts
if pending_tool_parts:
payload_messages.append(payload_message("assistant", pending_tool_parts))
pending_tool_parts = []
for message in messages:
if not isinstance(message, dict):
continue
role = str(message.get("role") or "")
if role in {"system", "developer"}:
continue
if role == "tool":
tool_id = str(message.get("tool_call_id") or message.get("id") or "")
prior_call = tool_calls_by_id.get(tool_id, {})
tool_name = str(message.get("name") or prior_call.get("tool_name") or "")
if tool_id in skipped_tool_ids or cls._is_openviking_recall_tool_name(tool_name):
continue
tool_part = {
"type": "tool",
"tool_id": tool_id,
"tool_name": tool_name,
"tool_input": prior_call.get("tool_input", {}),
"tool_output": cls._message_text(message.get("content")),
"tool_status": cls._tool_result_status(message),
}
pending_tool_parts.append(tool_part)
continue
if role not in {"user", "assistant"}:
continue
flush_tool_parts()
parts: List[Dict[str, Any]] = []
text = cls._message_text(message.get("content"))
if text:
parts.append({"type": "text", "text": text})
if role == "assistant":
for tool_call in message.get("tool_calls") or []:
if not isinstance(tool_call, dict):
continue
tool_id = cls._tool_call_id(tool_call)
tool_name = cls._tool_call_name(tool_call)
if tool_id in skipped_tool_ids or cls._is_openviking_recall_tool_name(tool_name):
continue
if tool_id in completed_tool_ids:
continue
# Reuse the tool_input parsed in the pre-scan when available
# (non-empty ids are cached); fall back to parsing for the
# uncached empty-id case so we never drop arguments.
prior_call = tool_calls_by_id.get(tool_id) if tool_id else None
tool_input = (
prior_call["tool_input"]
if prior_call is not None
else cls._tool_call_input(tool_call)
)
parts.append({
"type": "tool",
"tool_id": tool_id,
"tool_name": tool_name,
"tool_input": tool_input,
"tool_status": _TOOL_STATUS_PENDING,
})
if parts:
payload_messages.append(payload_message(role, parts))
flush_tool_parts()
return payload_messages
def sync_turn(
self,
user_content: str,
assistant_content: str,
*,
session_id: str = "",
messages: Optional[List[Dict[str, Any]]] = None,
) -> None:
"""Record the conversation turn in OpenViking's session (non-blocking)."""
if not self._client:
return
@ -2302,6 +2596,40 @@ class OpenVikingMemoryProvider(MemoryProvider):
if not user_content:
return
turn_messages = (
self._extract_current_turn_messages(messages, user_content, assistant_content)
if messages is not None
else []
)
if turn_messages:
turn_messages = [dict(message) for message in turn_messages]
for message in turn_messages:
if message.get("role") == "user":
message["content"] = user_content
break
batch_messages = self._messages_to_openviking_batch(
turn_messages,
assistant_peer_id=getattr(self, "_agent", _DEFAULT_AGENT),
)
if _sync_trace_enabled():
logger.info(
"OpenViking sync_turn trace: session_arg=%r cached_session=%r "
"messages_param_supported=true messages_present=%s message_count=%s "
"turn_message_count=%d batch_message_count=%d user_len=%d assistant_len=%d "
"user_preview=%r assistant_preview=%r",
session_id,
self._session_id,
messages is not None,
len(messages) if messages is not None else None,
len(turn_messages),
len(batch_messages),
len(str(user_content or "")),
len(str(assistant_content or "")),
_preview(user_content),
_preview(assistant_content),
)
# Snapshot the sid and bump the turn counter atomically so a
# concurrent on_session_switch/on_session_end can't interleave its
# snapshot+reset between the read and the increment (lost turn) and so
@ -2313,24 +2641,39 @@ class OpenVikingMemoryProvider(MemoryProvider):
self._turn_count += 1
def _sync():
try:
client = self._new_client()
def _post_turn(client: _VikingClient) -> None:
if batch_messages:
payload = {"messages": batch_messages}
if _sync_trace_enabled():
logger.info(
"OpenViking sync_turn trace: POST /api/v1/sessions/%s/messages/batch payload=%s",
sid,
json.dumps(payload, ensure_ascii=False),
)
try:
client.post(f"/api/v1/sessions/{sid}/messages/batch", payload)
return
except Exception as batch_error:
logger.warning(
"OpenViking structured sync failed; falling back to text sync: %s",
batch_error,
)
self._post_session_turn(
client,
sid,
user_content[:4000],
assistant_content[:4000],
self._message_text(assistant_content)[:4000],
)
try:
client = self._new_client()
_post_turn(client)
except Exception as e:
logger.debug("OpenViking sync_turn failed, reconnecting: %s", e)
try:
client = self._new_client()
self._post_session_turn(
client,
sid,
user_content[:4000],
assistant_content[:4000],
)
_post_turn(client)
except Exception as retry_error:
logger.warning("OpenViking sync_turn failed: %s", retry_error)

View file

@ -1577,6 +1577,7 @@ AUTHOR_MAP = {
"sunsky.lau@gmail.com": "liuhao1024", # PR #45494 salvage (claim session slot before auto-resume task; #45456)
"andrewdmwalker@gmail.com": "capt-marbles", # PR #38440 salvage (resolve xAI OAuth credentials across profiles; #43589)
"infinitycrew39@gmail.com": "infinitycrew39", # PR #47945 salvage (scope langfuse trace state by turn/request ids; #48292)
"eurekaxun@163.com": "huangxun375-stack", # PR #37251 / #48894 structured OpenViking sync
}

View file

@ -0,0 +1,25 @@
from __future__ import annotations
from types import SimpleNamespace
from agent.message_content import flatten_message_text
def test_flatten_message_text_accepts_chat_and_responses_text_parts():
content = [
{"type": "text", "text": "chat text"},
{"type": "input_text", "text": "user text"},
{"type": "output_text", "text": "assistant text"},
{"type": "summary_text", "text": "summary text"},
]
assert flatten_message_text(content) == "chat text\nuser text\nassistant text\nsummary text"
def test_flatten_message_text_accepts_object_parts():
content = [
SimpleNamespace(type="output_text", text="object text"),
{"content": "legacy content"},
]
assert flatten_message_text(content) == "object text\nlegacy content"

View file

@ -265,6 +265,355 @@ class TestOpenVikingSkillQuerySafety:
assert RecordingVikingClient.calls == []
class TestOpenVikingTurnConversion:
def test_extract_current_turn_anchors_on_latest_matching_user_and_assistant(self):
messages = [
{"role": "user", "content": "Please inspect the repository for assemble hooks."},
{"role": "assistant", "content": "Earlier answer."},
{"role": "user", "content": "Please inspect the repository for assemble hooks."},
{
"role": "assistant",
"content": "I will search the codebase.",
"tool_calls": [
{
"id": "call_rg_1",
"type": "function",
"function": {
"name": "shell_command",
"arguments": json.dumps({"command": "rg assemble"}),
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_rg_1",
"name": "shell_command",
"content": "agent/context_engine.py: no preassemble hook",
},
{"role": "assistant", "content": "The current main does not expose assemble."},
]
turn = OpenVikingMemoryProvider._extract_current_turn_messages(
messages,
"Please inspect the repository for assemble hooks.",
"The current main does not expose assemble.",
)
assert turn == messages[2:]
def test_messages_to_openviking_batch_coalesces_tool_results(self):
turn = [
{"role": "user", "content": "Please inspect the repository for assemble hooks."},
{
"role": "assistant",
"content": "I will search the codebase.",
"tool_calls": [
{
"id": "call_rg_1",
"type": "function",
"function": {
"name": "shell_command",
"arguments": json.dumps({"command": "rg assemble"}),
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_rg_1",
"name": "shell_command",
"content": "agent/context_engine.py: no preassemble hook",
},
{"role": "assistant", "content": "The current main does not expose assemble."},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert [message["role"] for message in batch] == ["user", "assistant", "assistant", "assistant"]
assert batch[0]["parts"] == [
{"type": "text", "text": "Please inspect the repository for assemble hooks."}
]
assert batch[1]["parts"] == [
{"type": "text", "text": "I will search the codebase."}
]
assert batch[2]["parts"] == [
{
"type": "tool",
"tool_id": "call_rg_1",
"tool_name": "shell_command",
"tool_input": {"command": "rg assemble"},
"tool_output": "agent/context_engine.py: no preassemble hook",
"tool_status": "completed",
}
]
assert batch[3]["parts"] == [
{"type": "text", "text": "The current main does not expose assemble."}
]
def test_messages_to_openviking_batch_marks_json_tool_error_results(self):
turn = [
{"role": "user", "content": "Check the file."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_read_1",
"type": "function",
"function": {
"name": "read_file",
"arguments": json.dumps({"path": "missing.md"}),
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_read_1",
"name": "read_file",
"content": json.dumps({"error": "File not found", "exit_code": 1}),
},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert batch[1]["role"] == "assistant"
assert batch[1]["parts"] == [
{
"type": "tool",
"tool_id": "call_read_1",
"tool_name": "read_file",
"tool_input": {"path": "missing.md"},
"tool_output": json.dumps({"error": "File not found", "exit_code": 1}),
"tool_status": "error",
}
]
def test_messages_to_openviking_batch_keeps_pending_tool_call_without_result(self):
turn = [
{"role": "user", "content": "Start a long running check."},
{
"role": "assistant",
"content": "Starting it now.",
"tool_calls": [
{
"id": "call_long_1",
"type": "function",
"function": {
"name": "long_check",
"arguments": json.dumps({"target": "repo"}),
},
}
],
},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert batch[1]["parts"] == [
{"type": "text", "text": "Starting it now."},
{
"type": "tool",
"tool_id": "call_long_1",
"tool_name": "long_check",
"tool_input": {"target": "repo"},
"tool_status": "pending",
},
]
def test_messages_to_openviking_batch_coalesces_adjacent_tool_results(self):
turn = [
{"role": "user", "content": "Run both tools."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_a",
"type": "function",
"function": {
"name": "first_tool",
"arguments": json.dumps({"x": 1}),
},
},
{
"id": "call_b",
"type": "function",
"function": {
"name": "second_tool",
"arguments": json.dumps({"y": 2}),
},
},
],
},
{"role": "tool", "tool_call_id": "call_a", "name": "first_tool", "content": "a"},
{"role": "tool", "tool_call_id": "call_b", "name": "second_tool", "content": "b"},
{"role": "assistant", "content": "Done."},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert [message["role"] for message in batch] == ["user", "assistant", "assistant"]
assert batch[1]["parts"] == [
{
"type": "tool",
"tool_id": "call_a",
"tool_name": "first_tool",
"tool_input": {"x": 1},
"tool_output": "a",
"tool_status": "completed",
},
{
"type": "tool",
"tool_id": "call_b",
"tool_name": "second_tool",
"tool_input": {"y": 2},
"tool_output": "b",
"tool_status": "completed",
},
]
def test_messages_to_openviking_batch_skips_openviking_recall_tool_results(self):
for recall_tool_name in ("viking_search", "viking_read", "viking_browse"):
turn = [
{"role": "user", "content": "What did we decide about context assembly?"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_recall_1",
"type": "function",
"function": {
"name": recall_tool_name,
"arguments": json.dumps({"query": "context assembly decision"}),
},
},
{
"id": "call_shell_1",
"type": "function",
"function": {
"name": "shell_command",
"arguments": json.dumps({"command": "rg preassemble"}),
},
},
],
},
{
"role": "tool",
"tool_call_id": "call_recall_1",
"name": recall_tool_name,
"content": json.dumps({
"results": [
{
"uri": "viking://user/hermes/memories/context",
"abstract": "Old OpenViking memory content",
}
]
}),
},
{
"role": "tool",
"tool_call_id": "call_shell_1",
"name": "shell_command",
"content": "plugins/memory/openviking/__init__.py",
},
{"role": "assistant", "content": "We decided to keep sync_turn scoped to ingestion."},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert [message["role"] for message in batch] == ["user", "assistant", "assistant"]
assert batch[1]["parts"] == [
{
"type": "tool",
"tool_id": "call_shell_1",
"tool_name": "shell_command",
"tool_input": {"command": "rg preassemble"},
"tool_output": "plugins/memory/openviking/__init__.py",
"tool_status": "completed",
}
]
batch_text = json.dumps(batch)
assert recall_tool_name not in batch_text
assert "Old OpenViking memory content" not in batch_text
def test_messages_to_openviking_batch_empty_tool_id_does_not_drop_other_results(self):
# A recall tool result that arrives with an empty tool_call_id must not
# poison the skip set with "" and silently drop unrelated tool results
# that also lack an id. Empty tool_call_id is reachable in the canonical
# transcript (agent_runtime_helpers defaults it to "").
turn = [
{"role": "user", "content": "What did we decide?"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "",
"type": "function",
"function": {
"name": "viking_search",
"arguments": json.dumps({"query": "decision"}),
},
}
],
},
{
"role": "tool",
"tool_call_id": "",
"name": "viking_search",
"content": json.dumps({"results": ["recall stuff"]}),
},
{
"role": "tool",
"tool_call_id": "",
"name": "shell_command",
"content": "important shell output",
},
{"role": "assistant", "content": "done"},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
batch_text = json.dumps(batch)
# The unrelated (empty-id) shell result must survive.
assert "important shell output" in batch_text
# The recall tool result must still be excluded.
assert "recall stuff" not in batch_text
assert "viking_search" not in batch_text
def test_messages_to_openviking_batch_preserves_responses_text_parts(self):
turn = [
{"role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"role": "assistant", "content": [{"type": "output_text", "text": "answer"}]},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert batch == [
{"role": "user", "parts": [{"type": "text", "text": "hello"}]},
{"role": "assistant", "parts": [{"type": "text", "text": "answer"}]},
]
def test_messages_to_openviking_batch_adds_assistant_peer_id_when_requested(self):
turn = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "answer"},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(
turn,
assistant_peer_id="hermes",
)
assert batch == [
{"role": "user", "parts": [{"type": "text", "text": "hello"}]},
{"role": "assistant", "parts": [{"type": "text", "text": "answer"}], "peer_id": "hermes"},
]
class TestOpenVikingRead:
def test_overview_read_normalizes_uri_and_unwraps_result(self):
provider = OpenVikingMemoryProvider()

View file

@ -1975,7 +1975,10 @@ def test_on_session_switch_commits_old_session_and_rotates_id():
provider.on_session_switch("new-sid", parent_session_id="old-sid")
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
assert provider._session_id == "new-sid"
assert provider._turn_count == 0
@ -1998,7 +2001,10 @@ def test_on_session_switch_commits_pending_tokens_without_turn_count():
provider.on_session_switch("new-sid")
provider._client.get.assert_called_once_with("/api/v1/sessions/old-sid")
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
assert provider._session_id == "new-sid"
assert provider._turn_count == 0
@ -2051,7 +2057,10 @@ def test_on_session_switch_waits_for_inflight_sync_thread():
provider.on_session_switch("new-sid")
assert join_calls, "expected on_session_switch to join the in-flight sync thread"
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
def test_on_session_switch_noop_on_empty_new_id():
@ -2186,6 +2195,78 @@ def test_sync_turn_retries_batch_write_with_fresh_client():
)]
def test_sync_turn_structured_messages_include_assistant_peer_id():
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
provider._endpoint = "http://test"
provider._api_key = ""
provider._account = "acct"
provider._user = "usr"
provider._agent = "hermes"
provider._session_id = "sid-structured"
captured = []
class StubClient:
def __init__(self, *a, **kw):
pass
def post(self, path, payload=None, **kwargs):
captured.append((path, payload))
return {}
import plugins.memory.openviking as _mod
real_client_cls = _mod._VikingClient
_mod._VikingClient = StubClient
messages = [
{"role": "user", "content": [{"type": "input_text", "text": "u"}]},
{
"role": "assistant",
"content": "Looking.",
"tool_calls": [
{
"id": "call-1",
"type": "function",
"function": {"name": "shell_command", "arguments": json.dumps({"cmd": "pwd"})},
}
],
},
{"role": "tool", "tool_call_id": "call-1", "name": "shell_command", "content": "ok"},
{"role": "assistant", "content": [{"type": "output_text", "text": "a"}]},
]
try:
provider.sync_turn("u", "a", messages=messages)
assert provider._drain_writers("sid-structured", timeout=2.0)
finally:
_mod._VikingClient = real_client_cls
assert captured == [(
"/api/v1/sessions/sid-structured/messages/batch",
{
"messages": [
{"role": "user", "parts": [{"type": "text", "text": "u"}]},
{"role": "assistant", "parts": [{"type": "text", "text": "Looking."}], "peer_id": "hermes"},
{
"role": "assistant",
"parts": [
{
"type": "tool",
"tool_id": "call-1",
"tool_name": "shell_command",
"tool_input": {"cmd": "pwd"},
"tool_output": "ok",
"tool_status": "completed",
}
],
"peer_id": "hermes",
},
{"role": "assistant", "parts": [{"type": "text", "text": "a"}], "peer_id": "hermes"},
]
},
)]
def test_sync_turn_noop_when_session_id_blank():
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
@ -2206,7 +2287,10 @@ def test_on_session_end_marks_session_clean_after_successful_commit():
provider.on_session_end([])
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
assert provider._turn_count == 0
@ -2228,7 +2312,10 @@ def test_on_session_end_commits_pending_tokens_without_turn_count():
provider.on_session_end([])
provider._client.get.assert_called_once_with("/api/v1/sessions/old-sid")
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
def test_end_then_switch_does_not_double_commit():
@ -2241,7 +2328,10 @@ def test_end_then_switch_does_not_double_commit():
provider.on_session_switch("new-sid", parent_session_id="old-sid")
# Exactly one commit call, on the OLD session, fired by on_session_end.
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
assert provider._session_id == "new-sid"
assert provider._turn_count == 0
@ -2253,7 +2343,10 @@ def test_end_then_switch_with_pending_tokens_does_not_double_commit():
provider.on_session_end([])
provider.on_session_switch("new-sid", parent_session_id="old-sid")
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
assert provider._session_id == "new-sid"
assert provider._turn_count == 0
@ -2400,7 +2493,10 @@ def test_on_session_switch_does_not_block_caller_on_slow_drain():
# Let the finalizer finish so it doesn't leak past the test.
release_drain.set()
assert provider._drain_finalizers(timeout=5.0)
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
def test_on_session_switch_defers_old_commit_to_finalizer_thread():
@ -2415,7 +2511,7 @@ def test_on_session_switch_defers_old_commit_to_finalizer_thread():
committed = threading.Event()
drain_timeouts = []
def fake_post(path):
def fake_post(path, payload=None):
committed.set()
return {}
@ -2433,7 +2529,10 @@ def test_on_session_switch_defers_old_commit_to_finalizer_thread():
assert provider._turn_count == 0
# The old-session commit lands on the finalizer thread, not inline.
assert committed.wait(timeout=5.0), "old session was not finalized off-thread"
provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit")
provider._client.post.assert_called_once_with(
"/api/v1/sessions/old-sid/commit",
{"keep_recent_count": 0},
)
# The finalizer drains with the deferred (longer) budget, not inline 10s.
assert drain_timeouts == [_DEFERRED_COMMIT_TIMEOUT]

View file

@ -12,7 +12,7 @@ Verifies that:
from __future__ import annotations
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import pytest
@ -148,6 +148,17 @@ class TestRunConversationCodexPath:
and m.get("content") == "echo: hello"]
assert final, f"expected final assistant message in {msgs}"
def test_projected_messages_are_synced_to_external_memory(self, fake_session):
agent = _make_codex_agent()
agent._memory_manager = MagicMock()
agent._memory_manager.build_system_prompt.return_value = ""
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hello")
agent._memory_manager.sync_all.assert_called_once()
assert agent._memory_manager.sync_all.call_args.kwargs["messages"] == result["messages"]
def test_nudge_counters_tick(self, fake_session):
"""The skill nudge counter must accumulate tool_iterations across
turns. The memory nudge counter is gated on memory being configured