diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 7f175fff97f..4ff67871934 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -290,6 +290,7 @@ def run_codex_app_server_turn( original_user_message=original_user_message, final_response=turn.final_text, interrupted=False, + messages=messages, ) except Exception: logger.debug("external memory sync raised", exc_info=True) diff --git a/agent/message_content.py b/agent/message_content.py new file mode 100644 index 00000000000..c42bf408550 --- /dev/null +++ b/agent/message_content.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + + +_NON_TEXT_PART_TYPES = {"image", "image_url", "input_image", "audio", "input_audio"} +_TEXT_KEYS = ("text", "content", "input_text", "output_text", "summary_text") + + +def _field(value: Any, key: str) -> Any: + if isinstance(value, Mapping): + return value.get(key) + return getattr(value, key, None) + + +def _text_from_part(part: Any) -> str: + if part is None: + return "" + if isinstance(part, str): + return part + + part_type = str(_field(part, "type") or "").strip().lower() + if part_type in _NON_TEXT_PART_TYPES: + return "" + + for key in _TEXT_KEYS: + text = _field(part, key) + if isinstance(text, str): + return text + return "" + + +def flatten_message_text(content: Any, *, sep: str = "\n") -> str: + """Return the visible text from common chat/Responses message content shapes.""" + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + chunks = [_text_from_part(part) for part in content] + return sep.join(chunk for chunk in chunks if chunk) + + text = _text_from_part(content) + if text: + return text + try: + return str(content) + except Exception: + return "" diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 7ebe6869a46..b4d44be88af 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -45,10 +45,11 @@ from typing import Any, Callable, Dict, List, Optional, Set from urllib.parse import urlparse from urllib.request import url2pathname +from agent.message_content import flatten_message_text from agent.memory_provider import MemoryProvider from agent.skill_commands import extract_user_instruction_from_skill_message from tools.registry import tool_error -from utils import atomic_json_write +from utils import atomic_json_write, env_var_enabled logger = logging.getLogger(__name__) @@ -70,6 +71,7 @@ _TIMEOUT = 30.0 _SESSION_DRAIN_TIMEOUT = 10.0 _DEFERRED_COMMIT_TIMEOUT = (_TIMEOUT * 2) + 5.0 _REMOTE_RESOURCE_PREFIXES = ("http://", "https://", "git@", "ssh://", "git://") +_SYNC_TRACE_ENV = "HERMES_OPENVIKING_SYNC_TRACE" # Maps the viking_remember `category` enum to a viking:// subdirectory. # Keep in sync with REMEMBER_SCHEMA.parameters.properties.category.enum. @@ -156,6 +158,18 @@ def _derive_openviking_user_text(content: Any) -> str: return extract_user_instruction_from_skill_message(content) or "" +def _sync_trace_enabled() -> bool: + return env_var_enabled(_SYNC_TRACE_ENV) + + +def _preview(value: Any, limit: int = 160) -> str: + text = "" if value is None else str(value) + text = text.replace("\n", "\\n") + if len(text) > limit: + return text[:limit] + "..." + return text + + # --------------------------------------------------------------------------- # Process-level atexit safety net — ensures pending sessions are committed # even if shutdown_memory_provider is never called (e.g. gateway crash, @@ -488,6 +502,25 @@ ADD_RESOURCE_SCHEMA = { } +# Recall tools (read-only) whose results we never re-ingest into OpenViking — +# echoing recalled memory back into the session transcript would re-store it. +# Write tools (viking_remember / viking_add_resource) are intentionally NOT +# here. Derived from the canonical schema names so renames can't desync. +_OPENVIKING_RECALL_TOOL_NAMES = { + SEARCH_SCHEMA["name"], + READ_SCHEMA["name"], + BROWSE_SCHEMA["name"], +} + +# Canonical tool_status values emitted in OpenViking batch tool parts. +_TOOL_STATUS_COMPLETED = "completed" +_TOOL_STATUS_ERROR = "error" +_TOOL_STATUS_PENDING = "pending" +# Inbound status aliases (from varied tool-result shapes) -> canonical above. +_TOOL_STATUS_ERROR_ALIASES = {"error", "failed", "failure"} +_TOOL_STATUS_COMPLETED_ALIASES = {"completed", "complete", "success", "succeeded"} + + def _zip_directory(dir_path: Path) -> Path: """Create a temporary zip file containing a directory tree.""" root = dir_path.resolve() @@ -2221,7 +2254,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _commit_session(self, sid: str, turn_count: int, *, context: str) -> bool: try: - self._client.post(f"/api/v1/sessions/{sid}/commit") + self._client.post( + f"/api/v1/sessions/{sid}/commit", + {"keep_recent_count": 0}, + ) self._mark_session_committed(sid) logger.info("OpenViking session %s committed %s (%d turns)", sid, context, turn_count) return True @@ -2293,7 +2329,265 @@ class OpenVikingMemoryProvider(MemoryProvider): with self._prefetch_lock: self._prefetch_result = "" - def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: + @staticmethod + def _message_text(content: Any) -> str: + """Extract text from OpenAI-style string/list content.""" + return flatten_message_text(content) + + @classmethod + def _message_matches_text(cls, message: Dict[str, Any], expected: Any) -> bool: + expected_text = cls._message_text(expected).strip() + if not expected_text: + return False + actual_text = cls._message_text(message.get("content")).strip() + return actual_text == expected_text + + @classmethod + def _extract_current_turn_messages( + cls, + messages: Optional[List[Dict[str, Any]]], + user_content: str, + assistant_content: str, + ) -> List[Dict[str, Any]]: + """Slice the completed turn out of Hermes' full canonical transcript.""" + if not messages: + return [] + + end_idx: Optional[int] = None + if cls._message_text(assistant_content).strip(): + for idx in range(len(messages) - 1, -1, -1): + message = messages[idx] + if ( + isinstance(message, dict) + and message.get("role") == "assistant" + and cls._message_matches_text(message, assistant_content) + ): + end_idx = idx + break + if end_idx is None: + for idx in range(len(messages) - 1, -1, -1): + message = messages[idx] + if isinstance(message, dict) and message.get("role") == "assistant": + end_idx = idx + break + if end_idx is None: + end_idx = len(messages) - 1 + + start_idx: Optional[int] = None + if cls._message_text(user_content).strip(): + for idx in range(end_idx, -1, -1): + message = messages[idx] + if ( + isinstance(message, dict) + and message.get("role") == "user" + and cls._message_matches_text(message, user_content) + ): + start_idx = idx + break + if start_idx is None: + for idx in range(end_idx, -1, -1): + message = messages[idx] + if isinstance(message, dict) and message.get("role") == "user": + start_idx = idx + break + if start_idx is None: + return [] + + return [message for message in messages[start_idx : end_idx + 1] if isinstance(message, dict)] + + @staticmethod + def _tool_call_id(tool_call: Dict[str, Any]) -> str: + return str(tool_call.get("id") or tool_call.get("tool_call_id") or "") + + @staticmethod + def _tool_call_name(tool_call: Dict[str, Any]) -> str: + function = tool_call.get("function") + if isinstance(function, dict): + return str(function.get("name") or "") + return str(tool_call.get("name") or "") + + @staticmethod + def _is_openviking_recall_tool_name(tool_name: Any) -> bool: + return str(tool_name or "").strip().lower() in _OPENVIKING_RECALL_TOOL_NAMES + + @staticmethod + def _tool_call_input(tool_call: Dict[str, Any]) -> Dict[str, Any]: + function = tool_call.get("function") + raw_args: Any = None + if isinstance(function, dict): + raw_args = function.get("arguments") + if raw_args is None: + raw_args = tool_call.get("args") + if raw_args is None: + return {} + if isinstance(raw_args, dict): + return raw_args + if isinstance(raw_args, str): + if not raw_args.strip(): + return {} + try: + parsed = json.loads(raw_args) + except Exception: + return {"value": raw_args} + if isinstance(parsed, dict): + return parsed + return {"value": parsed} + return {"value": raw_args} + + @classmethod + def _tool_result_status(cls, message: Dict[str, Any]) -> str: + raw_status = str(message.get("status") or message.get("tool_status") or "").lower() + if raw_status in _TOOL_STATUS_ERROR_ALIASES: + return _TOOL_STATUS_ERROR + if raw_status in _TOOL_STATUS_COMPLETED_ALIASES: + return _TOOL_STATUS_COMPLETED + + text = cls._message_text(message.get("content")).strip() + if text: + try: + parsed = json.loads(text) + except Exception: + parsed = None + if isinstance(parsed, dict): + status = str(parsed.get("status") or "").lower() + exit_code = parsed.get("exit_code") + if ( + status in _TOOL_STATUS_ERROR_ALIASES + or parsed.get("success") is False + or bool(parsed.get("error")) + or (isinstance(exit_code, int) and exit_code != 0) + ): + return _TOOL_STATUS_ERROR + + return _TOOL_STATUS_COMPLETED + + @classmethod + def _messages_to_openviking_batch( + cls, + messages: List[Dict[str, Any]], + *, + assistant_peer_id: str = "", + ) -> List[Dict[str, Any]]: + """Convert Hermes canonical messages into OpenViking batch payloads.""" + assistant_peer_id = str(assistant_peer_id or "").strip() + tool_calls_by_id: Dict[str, Dict[str, Any]] = {} + completed_tool_ids: set[str] = set() + skipped_tool_ids: set[str] = set() + for message in messages: + if not isinstance(message, dict): + continue + if message.get("role") == "tool": + tool_id = str(message.get("tool_call_id") or message.get("id") or "") + if tool_id: + completed_tool_ids.add(tool_id) + if cls._is_openviking_recall_tool_name(message.get("name")): + skipped_tool_ids.add(tool_id) + continue + if message.get("role") != "assistant": + continue + for tool_call in message.get("tool_calls") or []: + if not isinstance(tool_call, dict): + continue + tool_id = cls._tool_call_id(tool_call) + tool_name = cls._tool_call_name(tool_call) + if tool_id: + tool_calls_by_id[tool_id] = { + "tool_name": tool_name, + "tool_input": cls._tool_call_input(tool_call), + } + if cls._is_openviking_recall_tool_name(tool_name): + skipped_tool_ids.add(tool_id) + + payload_messages: List[Dict[str, Any]] = [] + pending_tool_parts: List[Dict[str, Any]] = [] + + def payload_message(role: str, parts: List[Dict[str, Any]]) -> Dict[str, Any]: + payload: Dict[str, Any] = {"role": role, "parts": parts} + if role == "assistant" and assistant_peer_id: + payload["peer_id"] = assistant_peer_id + return payload + + def flush_tool_parts() -> None: + nonlocal pending_tool_parts + if pending_tool_parts: + payload_messages.append(payload_message("assistant", pending_tool_parts)) + pending_tool_parts = [] + + for message in messages: + if not isinstance(message, dict): + continue + + role = str(message.get("role") or "") + if role in {"system", "developer"}: + continue + + if role == "tool": + tool_id = str(message.get("tool_call_id") or message.get("id") or "") + prior_call = tool_calls_by_id.get(tool_id, {}) + tool_name = str(message.get("name") or prior_call.get("tool_name") or "") + if tool_id in skipped_tool_ids or cls._is_openviking_recall_tool_name(tool_name): + continue + tool_part = { + "type": "tool", + "tool_id": tool_id, + "tool_name": tool_name, + "tool_input": prior_call.get("tool_input", {}), + "tool_output": cls._message_text(message.get("content")), + "tool_status": cls._tool_result_status(message), + } + pending_tool_parts.append(tool_part) + continue + + if role not in {"user", "assistant"}: + continue + + flush_tool_parts() + parts: List[Dict[str, Any]] = [] + text = cls._message_text(message.get("content")) + if text: + parts.append({"type": "text", "text": text}) + + if role == "assistant": + for tool_call in message.get("tool_calls") or []: + if not isinstance(tool_call, dict): + continue + tool_id = cls._tool_call_id(tool_call) + tool_name = cls._tool_call_name(tool_call) + if tool_id in skipped_tool_ids or cls._is_openviking_recall_tool_name(tool_name): + continue + if tool_id in completed_tool_ids: + continue + # Reuse the tool_input parsed in the pre-scan when available + # (non-empty ids are cached); fall back to parsing for the + # uncached empty-id case so we never drop arguments. + prior_call = tool_calls_by_id.get(tool_id) if tool_id else None + tool_input = ( + prior_call["tool_input"] + if prior_call is not None + else cls._tool_call_input(tool_call) + ) + parts.append({ + "type": "tool", + "tool_id": tool_id, + "tool_name": tool_name, + "tool_input": tool_input, + "tool_status": _TOOL_STATUS_PENDING, + }) + + if parts: + payload_messages.append(payload_message(role, parts)) + + flush_tool_parts() + return payload_messages + + def sync_turn( + self, + user_content: str, + assistant_content: str, + *, + session_id: str = "", + messages: Optional[List[Dict[str, Any]]] = None, + ) -> None: """Record the conversation turn in OpenViking's session (non-blocking).""" if not self._client: return @@ -2302,6 +2596,40 @@ class OpenVikingMemoryProvider(MemoryProvider): if not user_content: return + turn_messages = ( + self._extract_current_turn_messages(messages, user_content, assistant_content) + if messages is not None + else [] + ) + if turn_messages: + turn_messages = [dict(message) for message in turn_messages] + for message in turn_messages: + if message.get("role") == "user": + message["content"] = user_content + break + batch_messages = self._messages_to_openviking_batch( + turn_messages, + assistant_peer_id=getattr(self, "_agent", _DEFAULT_AGENT), + ) + + if _sync_trace_enabled(): + logger.info( + "OpenViking sync_turn trace: session_arg=%r cached_session=%r " + "messages_param_supported=true messages_present=%s message_count=%s " + "turn_message_count=%d batch_message_count=%d user_len=%d assistant_len=%d " + "user_preview=%r assistant_preview=%r", + session_id, + self._session_id, + messages is not None, + len(messages) if messages is not None else None, + len(turn_messages), + len(batch_messages), + len(str(user_content or "")), + len(str(assistant_content or "")), + _preview(user_content), + _preview(assistant_content), + ) + # Snapshot the sid and bump the turn counter atomically so a # concurrent on_session_switch/on_session_end can't interleave its # snapshot+reset between the read and the increment (lost turn) and so @@ -2313,24 +2641,39 @@ class OpenVikingMemoryProvider(MemoryProvider): self._turn_count += 1 def _sync(): - try: - client = self._new_client() + def _post_turn(client: _VikingClient) -> None: + if batch_messages: + payload = {"messages": batch_messages} + if _sync_trace_enabled(): + logger.info( + "OpenViking sync_turn trace: POST /api/v1/sessions/%s/messages/batch payload=%s", + sid, + json.dumps(payload, ensure_ascii=False), + ) + try: + client.post(f"/api/v1/sessions/{sid}/messages/batch", payload) + return + except Exception as batch_error: + logger.warning( + "OpenViking structured sync failed; falling back to text sync: %s", + batch_error, + ) + self._post_session_turn( client, sid, user_content[:4000], - assistant_content[:4000], + self._message_text(assistant_content)[:4000], ) + + try: + client = self._new_client() + _post_turn(client) except Exception as e: logger.debug("OpenViking sync_turn failed, reconnecting: %s", e) try: client = self._new_client() - self._post_session_turn( - client, - sid, - user_content[:4000], - assistant_content[:4000], - ) + _post_turn(client) except Exception as retry_error: logger.warning("OpenViking sync_turn failed: %s", retry_error) diff --git a/scripts/release.py b/scripts/release.py index 6c5d33ec3a1..4e5f8844439 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -1577,6 +1577,7 @@ AUTHOR_MAP = { "sunsky.lau@gmail.com": "liuhao1024", # PR #45494 salvage (claim session slot before auto-resume task; #45456) "andrewdmwalker@gmail.com": "capt-marbles", # PR #38440 salvage (resolve xAI OAuth credentials across profiles; #43589) "infinitycrew39@gmail.com": "infinitycrew39", # PR #47945 salvage (scope langfuse trace state by turn/request ids; #48292) + "eurekaxun@163.com": "huangxun375-stack", # PR #37251 / #48894 structured OpenViking sync } diff --git a/tests/agent/test_message_content.py b/tests/agent/test_message_content.py new file mode 100644 index 00000000000..0207d63600b --- /dev/null +++ b/tests/agent/test_message_content.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from types import SimpleNamespace + +from agent.message_content import flatten_message_text + + +def test_flatten_message_text_accepts_chat_and_responses_text_parts(): + content = [ + {"type": "text", "text": "chat text"}, + {"type": "input_text", "text": "user text"}, + {"type": "output_text", "text": "assistant text"}, + {"type": "summary_text", "text": "summary text"}, + ] + + assert flatten_message_text(content) == "chat text\nuser text\nassistant text\nsummary text" + + +def test_flatten_message_text_accepts_object_parts(): + content = [ + SimpleNamespace(type="output_text", text="object text"), + {"content": "legacy content"}, + ] + + assert flatten_message_text(content) == "object text\nlegacy content" diff --git a/tests/openviking_plugin/test_openviking.py b/tests/openviking_plugin/test_openviking.py index f10fc502000..171e6abc8ac 100644 --- a/tests/openviking_plugin/test_openviking.py +++ b/tests/openviking_plugin/test_openviking.py @@ -265,6 +265,355 @@ class TestOpenVikingSkillQuerySafety: assert RecordingVikingClient.calls == [] +class TestOpenVikingTurnConversion: + def test_extract_current_turn_anchors_on_latest_matching_user_and_assistant(self): + messages = [ + {"role": "user", "content": "Please inspect the repository for assemble hooks."}, + {"role": "assistant", "content": "Earlier answer."}, + {"role": "user", "content": "Please inspect the repository for assemble hooks."}, + { + "role": "assistant", + "content": "I will search the codebase.", + "tool_calls": [ + { + "id": "call_rg_1", + "type": "function", + "function": { + "name": "shell_command", + "arguments": json.dumps({"command": "rg assemble"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_rg_1", + "name": "shell_command", + "content": "agent/context_engine.py: no preassemble hook", + }, + {"role": "assistant", "content": "The current main does not expose assemble."}, + ] + + turn = OpenVikingMemoryProvider._extract_current_turn_messages( + messages, + "Please inspect the repository for assemble hooks.", + "The current main does not expose assemble.", + ) + + assert turn == messages[2:] + + def test_messages_to_openviking_batch_coalesces_tool_results(self): + turn = [ + {"role": "user", "content": "Please inspect the repository for assemble hooks."}, + { + "role": "assistant", + "content": "I will search the codebase.", + "tool_calls": [ + { + "id": "call_rg_1", + "type": "function", + "function": { + "name": "shell_command", + "arguments": json.dumps({"command": "rg assemble"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_rg_1", + "name": "shell_command", + "content": "agent/context_engine.py: no preassemble hook", + }, + {"role": "assistant", "content": "The current main does not expose assemble."}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert [message["role"] for message in batch] == ["user", "assistant", "assistant", "assistant"] + assert batch[0]["parts"] == [ + {"type": "text", "text": "Please inspect the repository for assemble hooks."} + ] + assert batch[1]["parts"] == [ + {"type": "text", "text": "I will search the codebase."} + ] + assert batch[2]["parts"] == [ + { + "type": "tool", + "tool_id": "call_rg_1", + "tool_name": "shell_command", + "tool_input": {"command": "rg assemble"}, + "tool_output": "agent/context_engine.py: no preassemble hook", + "tool_status": "completed", + } + ] + assert batch[3]["parts"] == [ + {"type": "text", "text": "The current main does not expose assemble."} + ] + + def test_messages_to_openviking_batch_marks_json_tool_error_results(self): + turn = [ + {"role": "user", "content": "Check the file."}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_read_1", + "type": "function", + "function": { + "name": "read_file", + "arguments": json.dumps({"path": "missing.md"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_read_1", + "name": "read_file", + "content": json.dumps({"error": "File not found", "exit_code": 1}), + }, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch[1]["role"] == "assistant" + assert batch[1]["parts"] == [ + { + "type": "tool", + "tool_id": "call_read_1", + "tool_name": "read_file", + "tool_input": {"path": "missing.md"}, + "tool_output": json.dumps({"error": "File not found", "exit_code": 1}), + "tool_status": "error", + } + ] + + def test_messages_to_openviking_batch_keeps_pending_tool_call_without_result(self): + turn = [ + {"role": "user", "content": "Start a long running check."}, + { + "role": "assistant", + "content": "Starting it now.", + "tool_calls": [ + { + "id": "call_long_1", + "type": "function", + "function": { + "name": "long_check", + "arguments": json.dumps({"target": "repo"}), + }, + } + ], + }, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch[1]["parts"] == [ + {"type": "text", "text": "Starting it now."}, + { + "type": "tool", + "tool_id": "call_long_1", + "tool_name": "long_check", + "tool_input": {"target": "repo"}, + "tool_status": "pending", + }, + ] + + def test_messages_to_openviking_batch_coalesces_adjacent_tool_results(self): + turn = [ + {"role": "user", "content": "Run both tools."}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_a", + "type": "function", + "function": { + "name": "first_tool", + "arguments": json.dumps({"x": 1}), + }, + }, + { + "id": "call_b", + "type": "function", + "function": { + "name": "second_tool", + "arguments": json.dumps({"y": 2}), + }, + }, + ], + }, + {"role": "tool", "tool_call_id": "call_a", "name": "first_tool", "content": "a"}, + {"role": "tool", "tool_call_id": "call_b", "name": "second_tool", "content": "b"}, + {"role": "assistant", "content": "Done."}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert [message["role"] for message in batch] == ["user", "assistant", "assistant"] + assert batch[1]["parts"] == [ + { + "type": "tool", + "tool_id": "call_a", + "tool_name": "first_tool", + "tool_input": {"x": 1}, + "tool_output": "a", + "tool_status": "completed", + }, + { + "type": "tool", + "tool_id": "call_b", + "tool_name": "second_tool", + "tool_input": {"y": 2}, + "tool_output": "b", + "tool_status": "completed", + }, + ] + + def test_messages_to_openviking_batch_skips_openviking_recall_tool_results(self): + for recall_tool_name in ("viking_search", "viking_read", "viking_browse"): + turn = [ + {"role": "user", "content": "What did we decide about context assembly?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_recall_1", + "type": "function", + "function": { + "name": recall_tool_name, + "arguments": json.dumps({"query": "context assembly decision"}), + }, + }, + { + "id": "call_shell_1", + "type": "function", + "function": { + "name": "shell_command", + "arguments": json.dumps({"command": "rg preassemble"}), + }, + }, + ], + }, + { + "role": "tool", + "tool_call_id": "call_recall_1", + "name": recall_tool_name, + "content": json.dumps({ + "results": [ + { + "uri": "viking://user/hermes/memories/context", + "abstract": "Old OpenViking memory content", + } + ] + }), + }, + { + "role": "tool", + "tool_call_id": "call_shell_1", + "name": "shell_command", + "content": "plugins/memory/openviking/__init__.py", + }, + {"role": "assistant", "content": "We decided to keep sync_turn scoped to ingestion."}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert [message["role"] for message in batch] == ["user", "assistant", "assistant"] + assert batch[1]["parts"] == [ + { + "type": "tool", + "tool_id": "call_shell_1", + "tool_name": "shell_command", + "tool_input": {"command": "rg preassemble"}, + "tool_output": "plugins/memory/openviking/__init__.py", + "tool_status": "completed", + } + ] + batch_text = json.dumps(batch) + assert recall_tool_name not in batch_text + assert "Old OpenViking memory content" not in batch_text + + def test_messages_to_openviking_batch_empty_tool_id_does_not_drop_other_results(self): + # A recall tool result that arrives with an empty tool_call_id must not + # poison the skip set with "" and silently drop unrelated tool results + # that also lack an id. Empty tool_call_id is reachable in the canonical + # transcript (agent_runtime_helpers defaults it to ""). + turn = [ + {"role": "user", "content": "What did we decide?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "", + "type": "function", + "function": { + "name": "viking_search", + "arguments": json.dumps({"query": "decision"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "", + "name": "viking_search", + "content": json.dumps({"results": ["recall stuff"]}), + }, + { + "role": "tool", + "tool_call_id": "", + "name": "shell_command", + "content": "important shell output", + }, + {"role": "assistant", "content": "done"}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + batch_text = json.dumps(batch) + # The unrelated (empty-id) shell result must survive. + assert "important shell output" in batch_text + # The recall tool result must still be excluded. + assert "recall stuff" not in batch_text + assert "viking_search" not in batch_text + + def test_messages_to_openviking_batch_preserves_responses_text_parts(self): + turn = [ + {"role": "user", "content": [{"type": "input_text", "text": "hello"}]}, + {"role": "assistant", "content": [{"type": "output_text", "text": "answer"}]}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch == [ + {"role": "user", "parts": [{"type": "text", "text": "hello"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "answer"}]}, + ] + + def test_messages_to_openviking_batch_adds_assistant_peer_id_when_requested(self): + turn = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "answer"}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch( + turn, + assistant_peer_id="hermes", + ) + + assert batch == [ + {"role": "user", "parts": [{"type": "text", "text": "hello"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "answer"}], "peer_id": "hermes"}, + ] + + class TestOpenVikingRead: def test_overview_read_normalizes_uri_and_unwraps_result(self): provider = OpenVikingMemoryProvider() diff --git a/tests/plugins/memory/test_openviking_provider.py b/tests/plugins/memory/test_openviking_provider.py index 954385fa54e..28f2d8e9d46 100644 --- a/tests/plugins/memory/test_openviking_provider.py +++ b/tests/plugins/memory/test_openviking_provider.py @@ -1975,7 +1975,10 @@ def test_on_session_switch_commits_old_session_and_rotates_id(): provider.on_session_switch("new-sid", parent_session_id="old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -1998,7 +2001,10 @@ def test_on_session_switch_commits_pending_tokens_without_turn_count(): provider.on_session_switch("new-sid") provider._client.get.assert_called_once_with("/api/v1/sessions/old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -2051,7 +2057,10 @@ def test_on_session_switch_waits_for_inflight_sync_thread(): provider.on_session_switch("new-sid") assert join_calls, "expected on_session_switch to join the in-flight sync thread" - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) def test_on_session_switch_noop_on_empty_new_id(): @@ -2186,6 +2195,78 @@ def test_sync_turn_retries_batch_write_with_fresh_client(): )] +def test_sync_turn_structured_messages_include_assistant_peer_id(): + provider = OpenVikingMemoryProvider() + provider._client = MagicMock() + provider._endpoint = "http://test" + provider._api_key = "" + provider._account = "acct" + provider._user = "usr" + provider._agent = "hermes" + provider._session_id = "sid-structured" + + captured = [] + + class StubClient: + def __init__(self, *a, **kw): + pass + + def post(self, path, payload=None, **kwargs): + captured.append((path, payload)) + return {} + + import plugins.memory.openviking as _mod + + real_client_cls = _mod._VikingClient + _mod._VikingClient = StubClient + messages = [ + {"role": "user", "content": [{"type": "input_text", "text": "u"}]}, + { + "role": "assistant", + "content": "Looking.", + "tool_calls": [ + { + "id": "call-1", + "type": "function", + "function": {"name": "shell_command", "arguments": json.dumps({"cmd": "pwd"})}, + } + ], + }, + {"role": "tool", "tool_call_id": "call-1", "name": "shell_command", "content": "ok"}, + {"role": "assistant", "content": [{"type": "output_text", "text": "a"}]}, + ] + try: + provider.sync_turn("u", "a", messages=messages) + assert provider._drain_writers("sid-structured", timeout=2.0) + finally: + _mod._VikingClient = real_client_cls + + assert captured == [( + "/api/v1/sessions/sid-structured/messages/batch", + { + "messages": [ + {"role": "user", "parts": [{"type": "text", "text": "u"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "Looking."}], "peer_id": "hermes"}, + { + "role": "assistant", + "parts": [ + { + "type": "tool", + "tool_id": "call-1", + "tool_name": "shell_command", + "tool_input": {"cmd": "pwd"}, + "tool_output": "ok", + "tool_status": "completed", + } + ], + "peer_id": "hermes", + }, + {"role": "assistant", "parts": [{"type": "text", "text": "a"}], "peer_id": "hermes"}, + ] + }, + )] + + def test_sync_turn_noop_when_session_id_blank(): provider = OpenVikingMemoryProvider() provider._client = MagicMock() @@ -2206,7 +2287,10 @@ def test_on_session_end_marks_session_clean_after_successful_commit(): provider.on_session_end([]) - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._turn_count == 0 @@ -2228,7 +2312,10 @@ def test_on_session_end_commits_pending_tokens_without_turn_count(): provider.on_session_end([]) provider._client.get.assert_called_once_with("/api/v1/sessions/old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) def test_end_then_switch_does_not_double_commit(): @@ -2241,7 +2328,10 @@ def test_end_then_switch_does_not_double_commit(): provider.on_session_switch("new-sid", parent_session_id="old-sid") # Exactly one commit call, on the OLD session, fired by on_session_end. - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -2253,7 +2343,10 @@ def test_end_then_switch_with_pending_tokens_does_not_double_commit(): provider.on_session_end([]) provider.on_session_switch("new-sid", parent_session_id="old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -2400,7 +2493,10 @@ def test_on_session_switch_does_not_block_caller_on_slow_drain(): # Let the finalizer finish so it doesn't leak past the test. release_drain.set() assert provider._drain_finalizers(timeout=5.0) - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) def test_on_session_switch_defers_old_commit_to_finalizer_thread(): @@ -2415,7 +2511,7 @@ def test_on_session_switch_defers_old_commit_to_finalizer_thread(): committed = threading.Event() drain_timeouts = [] - def fake_post(path): + def fake_post(path, payload=None): committed.set() return {} @@ -2433,7 +2529,10 @@ def test_on_session_switch_defers_old_commit_to_finalizer_thread(): assert provider._turn_count == 0 # The old-session commit lands on the finalizer thread, not inline. assert committed.wait(timeout=5.0), "old session was not finalized off-thread" - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) # The finalizer drains with the deferred (longer) budget, not inline 10s. assert drain_timeouts == [_DEFERRED_COMMIT_TIMEOUT] diff --git a/tests/run_agent/test_codex_app_server_integration.py b/tests/run_agent/test_codex_app_server_integration.py index 14c058178b9..b0d2ec23861 100644 --- a/tests/run_agent/test_codex_app_server_integration.py +++ b/tests/run_agent/test_codex_app_server_integration.py @@ -12,7 +12,7 @@ Verifies that: from __future__ import annotations -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -148,6 +148,17 @@ class TestRunConversationCodexPath: and m.get("content") == "echo: hello"] assert final, f"expected final assistant message in {msgs}" + def test_projected_messages_are_synced_to_external_memory(self, fake_session): + agent = _make_codex_agent() + agent._memory_manager = MagicMock() + agent._memory_manager.build_system_prompt.return_value = "" + + with patch.object(agent, "_spawn_background_review", return_value=None): + result = agent.run_conversation("hello") + + agent._memory_manager.sync_all.assert_called_once() + assert agent._memory_manager.sync_all.call_args.kwargs["messages"] == result["messages"] + def test_nudge_counters_tick(self, fake_session): """The skill nudge counter must accumulate tool_iterations across turns. The memory nudge counter is gated on memory being configured