fix(openviking): preserve structured sync attribution

This commit is contained in:
Hao Zhe 2026-06-19 13:42:36 +08:00
parent c7b7f92ec1
commit d7cd0bc086
7 changed files with 210 additions and 23 deletions

View file

@ -290,6 +290,7 @@ def run_codex_app_server_turn(
original_user_message=original_user_message,
final_response=turn.final_text,
interrupted=False,
messages=messages,
)
except Exception:
logger.debug("external memory sync raised", exc_info=True)

50
agent/message_content.py Normal file
View file

@ -0,0 +1,50 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
_NON_TEXT_PART_TYPES = {"image", "image_url", "input_image", "audio", "input_audio"}
_TEXT_KEYS = ("text", "content", "input_text", "output_text", "summary_text")
def _field(value: Any, key: str) -> Any:
if isinstance(value, Mapping):
return value.get(key)
return getattr(value, key, None)
def _text_from_part(part: Any) -> str:
if part is None:
return ""
if isinstance(part, str):
return part
part_type = str(_field(part, "type") or "").strip().lower()
if part_type in _NON_TEXT_PART_TYPES:
return ""
for key in _TEXT_KEYS:
text = _field(part, key)
if isinstance(text, str):
return text
return ""
def flatten_message_text(content: Any, *, sep: str = "\n") -> str:
"""Return the visible text from common chat/Responses message content shapes."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
chunks = [_text_from_part(part) for part in content]
return sep.join(chunk for chunk in chunks if chunk)
text = _text_from_part(content)
if text:
return text
try:
return str(content)
except Exception:
return ""

View file

@ -45,6 +45,7 @@ from typing import Any, Callable, Dict, List, Optional, Set
from urllib.parse import urlparse
from urllib.request import url2pathname
from agent.message_content import flatten_message_text
from agent.memory_provider import MemoryProvider
from agent.skill_commands import extract_user_instruction_from_skill_message
from tools.registry import tool_error
@ -2313,22 +2314,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
@staticmethod
def _message_text(content: Any) -> str:
"""Extract text from OpenAI-style string/list content."""
if isinstance(content, str):
return content
if isinstance(content, list):
chunks = []
for block in content:
if isinstance(block, str):
chunks.append(block)
elif isinstance(block, dict):
if block.get("type") == "text" and isinstance(block.get("text"), str):
chunks.append(block["text"])
elif isinstance(block.get("content"), str):
chunks.append(block["content"])
return "\n".join(chunk for chunk in chunks if chunk)
if content is None:
return ""
return str(content)
return flatten_message_text(content)
@classmethod
def _message_matches_text(cls, message: Dict[str, Any], expected: Any) -> bool:
@ -2460,8 +2446,11 @@ class OpenVikingMemoryProvider(MemoryProvider):
def _messages_to_openviking_batch(
cls,
messages: List[Dict[str, Any]],
*,
assistant_peer_id: str = "",
) -> List[Dict[str, Any]]:
"""Convert Hermes canonical messages into OpenViking batch payloads."""
assistant_peer_id = str(assistant_peer_id or "").strip()
tool_calls_by_id: Dict[str, Dict[str, Any]] = {}
completed_tool_ids: set[str] = set()
skipped_tool_ids: set[str] = set()
@ -2493,10 +2482,16 @@ class OpenVikingMemoryProvider(MemoryProvider):
payload_messages: List[Dict[str, Any]] = []
pending_tool_parts: List[Dict[str, Any]] = []
def payload_message(role: str, parts: List[Dict[str, Any]]) -> Dict[str, Any]:
payload: Dict[str, Any] = {"role": role, "parts": parts}
if role == "assistant" and assistant_peer_id:
payload["peer_id"] = assistant_peer_id
return payload
def flush_tool_parts() -> None:
nonlocal pending_tool_parts
if pending_tool_parts:
payload_messages.append({"role": "user", "parts": pending_tool_parts})
payload_messages.append(payload_message("assistant", pending_tool_parts))
pending_tool_parts = []
for message in messages:
@ -2552,7 +2547,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
})
if parts:
payload_messages.append({"role": role, "parts": parts})
payload_messages.append(payload_message(role, parts))
flush_tool_parts()
return payload_messages
@ -2584,7 +2579,10 @@ class OpenVikingMemoryProvider(MemoryProvider):
if message.get("role") == "user":
message["content"] = user_content
break
batch_messages = self._messages_to_openviking_batch(turn_messages)
batch_messages = self._messages_to_openviking_batch(
turn_messages,
assistant_peer_id=getattr(self, "_agent", _DEFAULT_AGENT),
)
if _sync_trace_enabled():
logger.info(

View file

@ -0,0 +1,25 @@
from __future__ import annotations
from types import SimpleNamespace
from agent.message_content import flatten_message_text
def test_flatten_message_text_accepts_chat_and_responses_text_parts():
content = [
{"type": "text", "text": "chat text"},
{"type": "input_text", "text": "user text"},
{"type": "output_text", "text": "assistant text"},
{"type": "summary_text", "text": "summary text"},
]
assert flatten_message_text(content) == "chat text\nuser text\nassistant text\nsummary text"
def test_flatten_message_text_accepts_object_parts():
content = [
SimpleNamespace(type="output_text", text="object text"),
{"content": "legacy content"},
]
assert flatten_message_text(content) == "object text\nlegacy content"

View file

@ -330,7 +330,7 @@ class TestOpenVikingTurnConversion:
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert [message["role"] for message in batch] == ["user", "assistant", "user", "assistant"]
assert [message["role"] for message in batch] == ["user", "assistant", "assistant", "assistant"]
assert batch[0]["parts"] == [
{"type": "text", "text": "Please inspect the repository for assemble hooks."}
]
@ -378,6 +378,7 @@ class TestOpenVikingTurnConversion:
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert batch[1]["role"] == "assistant"
assert batch[1]["parts"] == [
{
"type": "tool",
@ -453,7 +454,7 @@ class TestOpenVikingTurnConversion:
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert [message["role"] for message in batch] == ["user", "user", "assistant"]
assert [message["role"] for message in batch] == ["user", "assistant", "assistant"]
assert batch[1]["parts"] == [
{
"type": "tool",
@ -523,7 +524,7 @@ class TestOpenVikingTurnConversion:
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert [message["role"] for message in batch] == ["user", "user", "assistant"]
assert [message["role"] for message in batch] == ["user", "assistant", "assistant"]
assert batch[1]["parts"] == [
{
"type": "tool",
@ -538,6 +539,35 @@ class TestOpenVikingTurnConversion:
assert recall_tool_name not in batch_text
assert "Old OpenViking memory content" not in batch_text
def test_messages_to_openviking_batch_preserves_responses_text_parts(self):
turn = [
{"role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"role": "assistant", "content": [{"type": "output_text", "text": "answer"}]},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn)
assert batch == [
{"role": "user", "parts": [{"type": "text", "text": "hello"}]},
{"role": "assistant", "parts": [{"type": "text", "text": "answer"}]},
]
def test_messages_to_openviking_batch_adds_assistant_peer_id_when_requested(self):
turn = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "answer"},
]
batch = OpenVikingMemoryProvider._messages_to_openviking_batch(
turn,
assistant_peer_id="hermes",
)
assert batch == [
{"role": "user", "parts": [{"type": "text", "text": "hello"}]},
{"role": "assistant", "parts": [{"type": "text", "text": "answer"}], "peer_id": "hermes"},
]
class TestOpenVikingRead:
def test_overview_read_normalizes_uri_and_unwraps_result(self):

View file

@ -2195,6 +2195,78 @@ def test_sync_turn_retries_batch_write_with_fresh_client():
)]
def test_sync_turn_structured_messages_include_assistant_peer_id():
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()
provider._endpoint = "http://test"
provider._api_key = ""
provider._account = "acct"
provider._user = "usr"
provider._agent = "hermes"
provider._session_id = "sid-structured"
captured = []
class StubClient:
def __init__(self, *a, **kw):
pass
def post(self, path, payload=None, **kwargs):
captured.append((path, payload))
return {}
import plugins.memory.openviking as _mod
real_client_cls = _mod._VikingClient
_mod._VikingClient = StubClient
messages = [
{"role": "user", "content": [{"type": "input_text", "text": "u"}]},
{
"role": "assistant",
"content": "Looking.",
"tool_calls": [
{
"id": "call-1",
"type": "function",
"function": {"name": "shell_command", "arguments": json.dumps({"cmd": "pwd"})},
}
],
},
{"role": "tool", "tool_call_id": "call-1", "name": "shell_command", "content": "ok"},
{"role": "assistant", "content": [{"type": "output_text", "text": "a"}]},
]
try:
provider.sync_turn("u", "a", messages=messages)
assert provider._drain_writers("sid-structured", timeout=2.0)
finally:
_mod._VikingClient = real_client_cls
assert captured == [(
"/api/v1/sessions/sid-structured/messages/batch",
{
"messages": [
{"role": "user", "parts": [{"type": "text", "text": "u"}]},
{"role": "assistant", "parts": [{"type": "text", "text": "Looking."}], "peer_id": "hermes"},
{
"role": "assistant",
"parts": [
{
"type": "tool",
"tool_id": "call-1",
"tool_name": "shell_command",
"tool_input": {"cmd": "pwd"},
"tool_output": "ok",
"tool_status": "completed",
}
],
"peer_id": "hermes",
},
{"role": "assistant", "parts": [{"type": "text", "text": "a"}], "peer_id": "hermes"},
]
},
)]
def test_sync_turn_noop_when_session_id_blank():
provider = OpenVikingMemoryProvider()
provider._client = MagicMock()

View file

@ -12,7 +12,7 @@ Verifies that:
from __future__ import annotations
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import pytest
@ -148,6 +148,17 @@ class TestRunConversationCodexPath:
and m.get("content") == "echo: hello"]
assert final, f"expected final assistant message in {msgs}"
def test_projected_messages_are_synced_to_external_memory(self, fake_session):
agent = _make_codex_agent()
agent._memory_manager = MagicMock()
agent._memory_manager.build_system_prompt.return_value = ""
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hello")
agent._memory_manager.sync_all.assert_called_once()
assert agent._memory_manager.sync_all.call_args.kwargs["messages"] == result["messages"]
def test_nudge_counters_tick(self, fake_session):
"""The skill nudge counter must accumulate tool_iterations across
turns. The memory nudge counter is gated on memory being configured