From d7cd0bc0863cda1a203f00422b1441ca2d9890ed Mon Sep 17 00:00:00 2001 From: Hao Zhe Date: Fri, 19 Jun 2026 13:42:36 +0800 Subject: [PATCH] fix(openviking): preserve structured sync attribution --- agent/codex_runtime.py | 1 + agent/message_content.py | 50 +++++++++++++ plugins/memory/openviking/__init__.py | 36 +++++----- tests/agent/test_message_content.py | 25 +++++++ tests/openviking_plugin/test_openviking.py | 36 +++++++++- .../memory/test_openviking_provider.py | 72 +++++++++++++++++++ .../test_codex_app_server_integration.py | 13 +++- 7 files changed, 210 insertions(+), 23 deletions(-) create mode 100644 agent/message_content.py create mode 100644 tests/agent/test_message_content.py diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 7f175fff97f..4ff67871934 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -290,6 +290,7 @@ def run_codex_app_server_turn( original_user_message=original_user_message, final_response=turn.final_text, interrupted=False, + messages=messages, ) except Exception: logger.debug("external memory sync raised", exc_info=True) diff --git a/agent/message_content.py b/agent/message_content.py new file mode 100644 index 00000000000..c42bf408550 --- /dev/null +++ b/agent/message_content.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + + +_NON_TEXT_PART_TYPES = {"image", "image_url", "input_image", "audio", "input_audio"} +_TEXT_KEYS = ("text", "content", "input_text", "output_text", "summary_text") + + +def _field(value: Any, key: str) -> Any: + if isinstance(value, Mapping): + return value.get(key) + return getattr(value, key, None) + + +def _text_from_part(part: Any) -> str: + if part is None: + return "" + if isinstance(part, str): + return part + + part_type = str(_field(part, "type") or "").strip().lower() + if part_type in _NON_TEXT_PART_TYPES: + return "" + + for key in _TEXT_KEYS: + text = _field(part, key) + if isinstance(text, str): + return text + return "" + + +def flatten_message_text(content: Any, *, sep: str = "\n") -> str: + """Return the visible text from common chat/Responses message content shapes.""" + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + chunks = [_text_from_part(part) for part in content] + return sep.join(chunk for chunk in chunks if chunk) + + text = _text_from_part(content) + if text: + return text + try: + return str(content) + except Exception: + return "" diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index c7b05a4864c..82f1f26a0a0 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -45,6 +45,7 @@ from typing import Any, Callable, Dict, List, Optional, Set from urllib.parse import urlparse from urllib.request import url2pathname +from agent.message_content import flatten_message_text from agent.memory_provider import MemoryProvider from agent.skill_commands import extract_user_instruction_from_skill_message from tools.registry import tool_error @@ -2313,22 +2314,7 @@ class OpenVikingMemoryProvider(MemoryProvider): @staticmethod def _message_text(content: Any) -> str: """Extract text from OpenAI-style string/list content.""" - if isinstance(content, str): - return content - if isinstance(content, list): - chunks = [] - for block in content: - if isinstance(block, str): - chunks.append(block) - elif isinstance(block, dict): - if block.get("type") == "text" and isinstance(block.get("text"), str): - chunks.append(block["text"]) - elif isinstance(block.get("content"), str): - chunks.append(block["content"]) - return "\n".join(chunk for chunk in chunks if chunk) - if content is None: - return "" - return str(content) + return flatten_message_text(content) @classmethod def _message_matches_text(cls, message: Dict[str, Any], expected: Any) -> bool: @@ -2460,8 +2446,11 @@ class OpenVikingMemoryProvider(MemoryProvider): def _messages_to_openviking_batch( cls, messages: List[Dict[str, Any]], + *, + assistant_peer_id: str = "", ) -> List[Dict[str, Any]]: """Convert Hermes canonical messages into OpenViking batch payloads.""" + assistant_peer_id = str(assistant_peer_id or "").strip() tool_calls_by_id: Dict[str, Dict[str, Any]] = {} completed_tool_ids: set[str] = set() skipped_tool_ids: set[str] = set() @@ -2493,10 +2482,16 @@ class OpenVikingMemoryProvider(MemoryProvider): payload_messages: List[Dict[str, Any]] = [] pending_tool_parts: List[Dict[str, Any]] = [] + def payload_message(role: str, parts: List[Dict[str, Any]]) -> Dict[str, Any]: + payload: Dict[str, Any] = {"role": role, "parts": parts} + if role == "assistant" and assistant_peer_id: + payload["peer_id"] = assistant_peer_id + return payload + def flush_tool_parts() -> None: nonlocal pending_tool_parts if pending_tool_parts: - payload_messages.append({"role": "user", "parts": pending_tool_parts}) + payload_messages.append(payload_message("assistant", pending_tool_parts)) pending_tool_parts = [] for message in messages: @@ -2552,7 +2547,7 @@ class OpenVikingMemoryProvider(MemoryProvider): }) if parts: - payload_messages.append({"role": role, "parts": parts}) + payload_messages.append(payload_message(role, parts)) flush_tool_parts() return payload_messages @@ -2584,7 +2579,10 @@ class OpenVikingMemoryProvider(MemoryProvider): if message.get("role") == "user": message["content"] = user_content break - batch_messages = self._messages_to_openviking_batch(turn_messages) + batch_messages = self._messages_to_openviking_batch( + turn_messages, + assistant_peer_id=getattr(self, "_agent", _DEFAULT_AGENT), + ) if _sync_trace_enabled(): logger.info( diff --git a/tests/agent/test_message_content.py b/tests/agent/test_message_content.py new file mode 100644 index 00000000000..0207d63600b --- /dev/null +++ b/tests/agent/test_message_content.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from types import SimpleNamespace + +from agent.message_content import flatten_message_text + + +def test_flatten_message_text_accepts_chat_and_responses_text_parts(): + content = [ + {"type": "text", "text": "chat text"}, + {"type": "input_text", "text": "user text"}, + {"type": "output_text", "text": "assistant text"}, + {"type": "summary_text", "text": "summary text"}, + ] + + assert flatten_message_text(content) == "chat text\nuser text\nassistant text\nsummary text" + + +def test_flatten_message_text_accepts_object_parts(): + content = [ + SimpleNamespace(type="output_text", text="object text"), + {"content": "legacy content"}, + ] + + assert flatten_message_text(content) == "object text\nlegacy content" diff --git a/tests/openviking_plugin/test_openviking.py b/tests/openviking_plugin/test_openviking.py index ee5d1eb2373..3a743287672 100644 --- a/tests/openviking_plugin/test_openviking.py +++ b/tests/openviking_plugin/test_openviking.py @@ -330,7 +330,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) - assert [message["role"] for message in batch] == ["user", "assistant", "user", "assistant"] + assert [message["role"] for message in batch] == ["user", "assistant", "assistant", "assistant"] assert batch[0]["parts"] == [ {"type": "text", "text": "Please inspect the repository for assemble hooks."} ] @@ -378,6 +378,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + assert batch[1]["role"] == "assistant" assert batch[1]["parts"] == [ { "type": "tool", @@ -453,7 +454,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) - assert [message["role"] for message in batch] == ["user", "user", "assistant"] + assert [message["role"] for message in batch] == ["user", "assistant", "assistant"] assert batch[1]["parts"] == [ { "type": "tool", @@ -523,7 +524,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) - assert [message["role"] for message in batch] == ["user", "user", "assistant"] + assert [message["role"] for message in batch] == ["user", "assistant", "assistant"] assert batch[1]["parts"] == [ { "type": "tool", @@ -538,6 +539,35 @@ class TestOpenVikingTurnConversion: assert recall_tool_name not in batch_text assert "Old OpenViking memory content" not in batch_text + def test_messages_to_openviking_batch_preserves_responses_text_parts(self): + turn = [ + {"role": "user", "content": [{"type": "input_text", "text": "hello"}]}, + {"role": "assistant", "content": [{"type": "output_text", "text": "answer"}]}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch == [ + {"role": "user", "parts": [{"type": "text", "text": "hello"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "answer"}]}, + ] + + def test_messages_to_openviking_batch_adds_assistant_peer_id_when_requested(self): + turn = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "answer"}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch( + turn, + assistant_peer_id="hermes", + ) + + assert batch == [ + {"role": "user", "parts": [{"type": "text", "text": "hello"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "answer"}], "peer_id": "hermes"}, + ] + class TestOpenVikingRead: def test_overview_read_normalizes_uri_and_unwraps_result(self): diff --git a/tests/plugins/memory/test_openviking_provider.py b/tests/plugins/memory/test_openviking_provider.py index 2863566b367..28f2d8e9d46 100644 --- a/tests/plugins/memory/test_openviking_provider.py +++ b/tests/plugins/memory/test_openviking_provider.py @@ -2195,6 +2195,78 @@ def test_sync_turn_retries_batch_write_with_fresh_client(): )] +def test_sync_turn_structured_messages_include_assistant_peer_id(): + provider = OpenVikingMemoryProvider() + provider._client = MagicMock() + provider._endpoint = "http://test" + provider._api_key = "" + provider._account = "acct" + provider._user = "usr" + provider._agent = "hermes" + provider._session_id = "sid-structured" + + captured = [] + + class StubClient: + def __init__(self, *a, **kw): + pass + + def post(self, path, payload=None, **kwargs): + captured.append((path, payload)) + return {} + + import plugins.memory.openviking as _mod + + real_client_cls = _mod._VikingClient + _mod._VikingClient = StubClient + messages = [ + {"role": "user", "content": [{"type": "input_text", "text": "u"}]}, + { + "role": "assistant", + "content": "Looking.", + "tool_calls": [ + { + "id": "call-1", + "type": "function", + "function": {"name": "shell_command", "arguments": json.dumps({"cmd": "pwd"})}, + } + ], + }, + {"role": "tool", "tool_call_id": "call-1", "name": "shell_command", "content": "ok"}, + {"role": "assistant", "content": [{"type": "output_text", "text": "a"}]}, + ] + try: + provider.sync_turn("u", "a", messages=messages) + assert provider._drain_writers("sid-structured", timeout=2.0) + finally: + _mod._VikingClient = real_client_cls + + assert captured == [( + "/api/v1/sessions/sid-structured/messages/batch", + { + "messages": [ + {"role": "user", "parts": [{"type": "text", "text": "u"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "Looking."}], "peer_id": "hermes"}, + { + "role": "assistant", + "parts": [ + { + "type": "tool", + "tool_id": "call-1", + "tool_name": "shell_command", + "tool_input": {"cmd": "pwd"}, + "tool_output": "ok", + "tool_status": "completed", + } + ], + "peer_id": "hermes", + }, + {"role": "assistant", "parts": [{"type": "text", "text": "a"}], "peer_id": "hermes"}, + ] + }, + )] + + def test_sync_turn_noop_when_session_id_blank(): provider = OpenVikingMemoryProvider() provider._client = MagicMock() diff --git a/tests/run_agent/test_codex_app_server_integration.py b/tests/run_agent/test_codex_app_server_integration.py index 14c058178b9..b0d2ec23861 100644 --- a/tests/run_agent/test_codex_app_server_integration.py +++ b/tests/run_agent/test_codex_app_server_integration.py @@ -12,7 +12,7 @@ Verifies that: from __future__ import annotations -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -148,6 +148,17 @@ class TestRunConversationCodexPath: and m.get("content") == "echo: hello"] assert final, f"expected final assistant message in {msgs}" + def test_projected_messages_are_synced_to_external_memory(self, fake_session): + agent = _make_codex_agent() + agent._memory_manager = MagicMock() + agent._memory_manager.build_system_prompt.return_value = "" + + with patch.object(agent, "_spawn_background_review", return_value=None): + result = agent.run_conversation("hello") + + agent._memory_manager.sync_all.assert_called_once() + assert agent._memory_manager.sync_all.call_args.kwargs["messages"] == result["messages"] + def test_nudge_counters_tick(self, fake_session): """The skill nudge counter must accumulate tool_iterations across turns. The memory nudge counter is gated on memory being configured