From c7b7f92ec14a5c43deef844804f0bf6a7f2d992d Mon Sep 17 00:00:00 2001 From: Eurekaxun Date: Tue, 2 Jun 2026 14:33:12 +0800 Subject: [PATCH 1/7] fix(openviking): sync structured turns with tool parts --- plugins/memory/openviking/__init__.py | 339 +++++++++++++++++- tests/openviking_plugin/test_openviking.py | 274 ++++++++++++++ .../memory/test_openviking_provider.py | 47 ++- 3 files changed, 639 insertions(+), 21 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 7ebe6869a46..c7b05a4864c 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -70,6 +70,8 @@ _TIMEOUT = 30.0 _SESSION_DRAIN_TIMEOUT = 10.0 _DEFERRED_COMMIT_TIMEOUT = (_TIMEOUT * 2) + 5.0 _REMOTE_RESOURCE_PREFIXES = ("http://", "https://", "git@", "ssh://", "git://") +_SYNC_TRACE_ENV = "HERMES_OPENVIKING_SYNC_TRACE" +_OPENVIKING_RECALL_TOOL_NAMES = {"viking_search", "viking_read", "viking_browse"} # Maps the viking_remember `category` enum to a viking:// subdirectory. # Keep in sync with REMEMBER_SCHEMA.parameters.properties.category.enum. @@ -156,6 +158,18 @@ def _derive_openviking_user_text(content: Any) -> str: return extract_user_instruction_from_skill_message(content) or "" +def _sync_trace_enabled() -> bool: + return os.environ.get(_SYNC_TRACE_ENV, "").strip().lower() in {"1", "true", "yes", "on"} + + +def _preview(value: Any, limit: int = 160) -> str: + text = "" if value is None else str(value) + text = text.replace("\n", "\\n") + if len(text) > limit: + return text[:limit] + "..." + return text + + # --------------------------------------------------------------------------- # Process-level atexit safety net — ensures pending sessions are committed # even if shutdown_memory_provider is never called (e.g. gateway crash, @@ -2221,7 +2235,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _commit_session(self, sid: str, turn_count: int, *, context: str) -> bool: try: - self._client.post(f"/api/v1/sessions/{sid}/commit") + self._client.post( + f"/api/v1/sessions/{sid}/commit", + {"keep_recent_count": 0}, + ) self._mark_session_committed(sid) logger.info("OpenViking session %s committed %s (%d turns)", sid, context, turn_count) return True @@ -2293,7 +2310,261 @@ class OpenVikingMemoryProvider(MemoryProvider): with self._prefetch_lock: self._prefetch_result = "" - def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: + @staticmethod + def _message_text(content: Any) -> str: + """Extract text from OpenAI-style string/list content.""" + if isinstance(content, str): + return content + if isinstance(content, list): + chunks = [] + for block in content: + if isinstance(block, str): + chunks.append(block) + elif isinstance(block, dict): + if block.get("type") == "text" and isinstance(block.get("text"), str): + chunks.append(block["text"]) + elif isinstance(block.get("content"), str): + chunks.append(block["content"]) + return "\n".join(chunk for chunk in chunks if chunk) + if content is None: + return "" + return str(content) + + @classmethod + def _message_matches_text(cls, message: Dict[str, Any], expected: Any) -> bool: + expected_text = cls._message_text(expected).strip() + if not expected_text: + return False + actual_text = cls._message_text(message.get("content")).strip() + return actual_text == expected_text + + @classmethod + def _extract_current_turn_messages( + cls, + messages: Optional[List[Dict[str, Any]]], + user_content: str, + assistant_content: str, + ) -> List[Dict[str, Any]]: + """Slice the completed turn out of Hermes' full canonical transcript.""" + if not messages: + return [] + + end_idx: Optional[int] = None + if cls._message_text(assistant_content).strip(): + for idx in range(len(messages) - 1, -1, -1): + message = messages[idx] + if ( + isinstance(message, dict) + and message.get("role") == "assistant" + and cls._message_matches_text(message, assistant_content) + ): + end_idx = idx + break + if end_idx is None: + for idx in range(len(messages) - 1, -1, -1): + message = messages[idx] + if isinstance(message, dict) and message.get("role") == "assistant": + end_idx = idx + break + if end_idx is None: + end_idx = len(messages) - 1 + + start_idx: Optional[int] = None + if cls._message_text(user_content).strip(): + for idx in range(end_idx, -1, -1): + message = messages[idx] + if ( + isinstance(message, dict) + and message.get("role") == "user" + and cls._message_matches_text(message, user_content) + ): + start_idx = idx + break + if start_idx is None: + for idx in range(end_idx, -1, -1): + message = messages[idx] + if isinstance(message, dict) and message.get("role") == "user": + start_idx = idx + break + if start_idx is None: + return [] + + return [message for message in messages[start_idx : end_idx + 1] if isinstance(message, dict)] + + @staticmethod + def _tool_call_id(tool_call: Dict[str, Any]) -> str: + return str(tool_call.get("id") or tool_call.get("tool_call_id") or "") + + @staticmethod + def _tool_call_name(tool_call: Dict[str, Any]) -> str: + function = tool_call.get("function") + if isinstance(function, dict): + return str(function.get("name") or "") + return str(tool_call.get("name") or "") + + @staticmethod + def _is_openviking_recall_tool_name(tool_name: Any) -> bool: + return str(tool_name or "").strip().lower() in _OPENVIKING_RECALL_TOOL_NAMES + + @staticmethod + def _tool_call_input(tool_call: Dict[str, Any]) -> Dict[str, Any]: + function = tool_call.get("function") + raw_args: Any = None + if isinstance(function, dict): + raw_args = function.get("arguments") + if raw_args is None: + raw_args = tool_call.get("args") + if raw_args is None: + return {} + if isinstance(raw_args, dict): + return raw_args + if isinstance(raw_args, str): + if not raw_args.strip(): + return {} + try: + parsed = json.loads(raw_args) + except Exception: + return {"value": raw_args} + if isinstance(parsed, dict): + return parsed + return {"value": parsed} + return {"value": raw_args} + + @classmethod + def _tool_result_status(cls, message: Dict[str, Any]) -> str: + raw_status = str(message.get("status") or message.get("tool_status") or "").lower() + if raw_status in {"error", "failed", "failure"}: + return "error" + if raw_status in {"completed", "complete", "success", "succeeded"}: + return "completed" + + text = cls._message_text(message.get("content")).strip() + if text: + try: + parsed = json.loads(text) + except Exception: + parsed = None + if isinstance(parsed, dict): + status = str(parsed.get("status") or "").lower() + exit_code = parsed.get("exit_code") + if ( + status in {"error", "failed", "failure"} + or parsed.get("success") is False + or bool(parsed.get("error")) + or (isinstance(exit_code, int) and exit_code != 0) + ): + return "error" + return "completed" + + @classmethod + def _messages_to_openviking_batch( + cls, + messages: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: + """Convert Hermes canonical messages into OpenViking batch payloads.""" + tool_calls_by_id: Dict[str, Dict[str, Any]] = {} + completed_tool_ids: set[str] = set() + skipped_tool_ids: set[str] = set() + for message in messages: + if not isinstance(message, dict): + continue + if message.get("role") == "tool": + tool_id = str(message.get("tool_call_id") or message.get("id") or "") + if tool_id: + completed_tool_ids.add(tool_id) + if cls._is_openviking_recall_tool_name(message.get("name")): + skipped_tool_ids.add(tool_id) + continue + if message.get("role") != "assistant": + continue + for tool_call in message.get("tool_calls") or []: + if not isinstance(tool_call, dict): + continue + tool_id = cls._tool_call_id(tool_call) + tool_name = cls._tool_call_name(tool_call) + if tool_id: + tool_calls_by_id[tool_id] = { + "tool_name": tool_name, + "tool_input": cls._tool_call_input(tool_call), + } + if cls._is_openviking_recall_tool_name(tool_name): + skipped_tool_ids.add(tool_id) + + payload_messages: List[Dict[str, Any]] = [] + pending_tool_parts: List[Dict[str, Any]] = [] + + def flush_tool_parts() -> None: + nonlocal pending_tool_parts + if pending_tool_parts: + payload_messages.append({"role": "user", "parts": pending_tool_parts}) + pending_tool_parts = [] + + for message in messages: + if not isinstance(message, dict): + continue + + role = str(message.get("role") or "") + if role in {"system", "developer"}: + continue + + if role == "tool": + tool_id = str(message.get("tool_call_id") or message.get("id") or "") + prior_call = tool_calls_by_id.get(tool_id, {}) + tool_name = str(message.get("name") or prior_call.get("tool_name") or "") + if tool_id in skipped_tool_ids or cls._is_openviking_recall_tool_name(tool_name): + continue + tool_part = { + "type": "tool", + "tool_id": tool_id, + "tool_name": tool_name, + "tool_input": prior_call.get("tool_input", {}), + "tool_output": cls._message_text(message.get("content")), + "tool_status": cls._tool_result_status(message), + } + pending_tool_parts.append(tool_part) + continue + + if role not in {"user", "assistant"}: + continue + + flush_tool_parts() + parts: List[Dict[str, Any]] = [] + text = cls._message_text(message.get("content")) + if text: + parts.append({"type": "text", "text": text}) + + if role == "assistant": + for tool_call in message.get("tool_calls") or []: + if not isinstance(tool_call, dict): + continue + tool_id = cls._tool_call_id(tool_call) + tool_name = cls._tool_call_name(tool_call) + if tool_id in skipped_tool_ids or cls._is_openviking_recall_tool_name(tool_name): + continue + if tool_id in completed_tool_ids: + continue + parts.append({ + "type": "tool", + "tool_id": tool_id, + "tool_name": tool_name, + "tool_input": cls._tool_call_input(tool_call), + "tool_status": "pending", + }) + + if parts: + payload_messages.append({"role": role, "parts": parts}) + + flush_tool_parts() + return payload_messages + + def sync_turn( + self, + user_content: str, + assistant_content: str, + *, + session_id: str = "", + messages: Optional[List[Dict[str, Any]]] = None, + ) -> None: """Record the conversation turn in OpenViking's session (non-blocking).""" if not self._client: return @@ -2302,6 +2573,37 @@ class OpenVikingMemoryProvider(MemoryProvider): if not user_content: return + turn_messages = ( + self._extract_current_turn_messages(messages, user_content, assistant_content) + if messages is not None + else [] + ) + if turn_messages: + turn_messages = [dict(message) for message in turn_messages] + for message in turn_messages: + if message.get("role") == "user": + message["content"] = user_content + break + batch_messages = self._messages_to_openviking_batch(turn_messages) + + if _sync_trace_enabled(): + logger.info( + "OpenViking sync_turn trace: session_arg=%r cached_session=%r " + "messages_param_supported=true messages_present=%s message_count=%s " + "turn_message_count=%d batch_message_count=%d user_len=%d assistant_len=%d " + "user_preview=%r assistant_preview=%r", + session_id, + self._session_id, + messages is not None, + len(messages) if messages is not None else None, + len(turn_messages), + len(batch_messages), + len(str(user_content or "")), + len(str(assistant_content or "")), + _preview(user_content), + _preview(assistant_content), + ) + # Snapshot the sid and bump the turn counter atomically so a # concurrent on_session_switch/on_session_end can't interleave its # snapshot+reset between the read and the increment (lost turn) and so @@ -2313,24 +2615,39 @@ class OpenVikingMemoryProvider(MemoryProvider): self._turn_count += 1 def _sync(): - try: - client = self._new_client() + def _post_turn(client: _VikingClient) -> None: + if batch_messages: + payload = {"messages": batch_messages} + if _sync_trace_enabled(): + logger.info( + "OpenViking sync_turn trace: POST /api/v1/sessions/%s/messages/batch payload=%s", + sid, + json.dumps(payload, ensure_ascii=False), + ) + try: + client.post(f"/api/v1/sessions/{sid}/messages/batch", payload) + return + except Exception as batch_error: + logger.warning( + "OpenViking structured sync failed; falling back to text sync: %s", + batch_error, + ) + self._post_session_turn( client, sid, user_content[:4000], - assistant_content[:4000], + self._message_text(assistant_content)[:4000], ) + + try: + client = self._new_client() + _post_turn(client) except Exception as e: logger.debug("OpenViking sync_turn failed, reconnecting: %s", e) try: client = self._new_client() - self._post_session_turn( - client, - sid, - user_content[:4000], - assistant_content[:4000], - ) + _post_turn(client) except Exception as retry_error: logger.warning("OpenViking sync_turn failed: %s", retry_error) diff --git a/tests/openviking_plugin/test_openviking.py b/tests/openviking_plugin/test_openviking.py index f10fc502000..ee5d1eb2373 100644 --- a/tests/openviking_plugin/test_openviking.py +++ b/tests/openviking_plugin/test_openviking.py @@ -265,6 +265,280 @@ class TestOpenVikingSkillQuerySafety: assert RecordingVikingClient.calls == [] +class TestOpenVikingTurnConversion: + def test_extract_current_turn_anchors_on_latest_matching_user_and_assistant(self): + messages = [ + {"role": "user", "content": "Please inspect the repository for assemble hooks."}, + {"role": "assistant", "content": "Earlier answer."}, + {"role": "user", "content": "Please inspect the repository for assemble hooks."}, + { + "role": "assistant", + "content": "I will search the codebase.", + "tool_calls": [ + { + "id": "call_rg_1", + "type": "function", + "function": { + "name": "shell_command", + "arguments": json.dumps({"command": "rg assemble"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_rg_1", + "name": "shell_command", + "content": "agent/context_engine.py: no preassemble hook", + }, + {"role": "assistant", "content": "The current main does not expose assemble."}, + ] + + turn = OpenVikingMemoryProvider._extract_current_turn_messages( + messages, + "Please inspect the repository for assemble hooks.", + "The current main does not expose assemble.", + ) + + assert turn == messages[2:] + + def test_messages_to_openviking_batch_coalesces_tool_results(self): + turn = [ + {"role": "user", "content": "Please inspect the repository for assemble hooks."}, + { + "role": "assistant", + "content": "I will search the codebase.", + "tool_calls": [ + { + "id": "call_rg_1", + "type": "function", + "function": { + "name": "shell_command", + "arguments": json.dumps({"command": "rg assemble"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_rg_1", + "name": "shell_command", + "content": "agent/context_engine.py: no preassemble hook", + }, + {"role": "assistant", "content": "The current main does not expose assemble."}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert [message["role"] for message in batch] == ["user", "assistant", "user", "assistant"] + assert batch[0]["parts"] == [ + {"type": "text", "text": "Please inspect the repository for assemble hooks."} + ] + assert batch[1]["parts"] == [ + {"type": "text", "text": "I will search the codebase."} + ] + assert batch[2]["parts"] == [ + { + "type": "tool", + "tool_id": "call_rg_1", + "tool_name": "shell_command", + "tool_input": {"command": "rg assemble"}, + "tool_output": "agent/context_engine.py: no preassemble hook", + "tool_status": "completed", + } + ] + assert batch[3]["parts"] == [ + {"type": "text", "text": "The current main does not expose assemble."} + ] + + def test_messages_to_openviking_batch_marks_json_tool_error_results(self): + turn = [ + {"role": "user", "content": "Check the file."}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_read_1", + "type": "function", + "function": { + "name": "read_file", + "arguments": json.dumps({"path": "missing.md"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_read_1", + "name": "read_file", + "content": json.dumps({"error": "File not found", "exit_code": 1}), + }, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch[1]["parts"] == [ + { + "type": "tool", + "tool_id": "call_read_1", + "tool_name": "read_file", + "tool_input": {"path": "missing.md"}, + "tool_output": json.dumps({"error": "File not found", "exit_code": 1}), + "tool_status": "error", + } + ] + + def test_messages_to_openviking_batch_keeps_pending_tool_call_without_result(self): + turn = [ + {"role": "user", "content": "Start a long running check."}, + { + "role": "assistant", + "content": "Starting it now.", + "tool_calls": [ + { + "id": "call_long_1", + "type": "function", + "function": { + "name": "long_check", + "arguments": json.dumps({"target": "repo"}), + }, + } + ], + }, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch[1]["parts"] == [ + {"type": "text", "text": "Starting it now."}, + { + "type": "tool", + "tool_id": "call_long_1", + "tool_name": "long_check", + "tool_input": {"target": "repo"}, + "tool_status": "pending", + }, + ] + + def test_messages_to_openviking_batch_coalesces_adjacent_tool_results(self): + turn = [ + {"role": "user", "content": "Run both tools."}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_a", + "type": "function", + "function": { + "name": "first_tool", + "arguments": json.dumps({"x": 1}), + }, + }, + { + "id": "call_b", + "type": "function", + "function": { + "name": "second_tool", + "arguments": json.dumps({"y": 2}), + }, + }, + ], + }, + {"role": "tool", "tool_call_id": "call_a", "name": "first_tool", "content": "a"}, + {"role": "tool", "tool_call_id": "call_b", "name": "second_tool", "content": "b"}, + {"role": "assistant", "content": "Done."}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert [message["role"] for message in batch] == ["user", "user", "assistant"] + assert batch[1]["parts"] == [ + { + "type": "tool", + "tool_id": "call_a", + "tool_name": "first_tool", + "tool_input": {"x": 1}, + "tool_output": "a", + "tool_status": "completed", + }, + { + "type": "tool", + "tool_id": "call_b", + "tool_name": "second_tool", + "tool_input": {"y": 2}, + "tool_output": "b", + "tool_status": "completed", + }, + ] + + def test_messages_to_openviking_batch_skips_openviking_recall_tool_results(self): + for recall_tool_name in ("viking_search", "viking_read", "viking_browse"): + turn = [ + {"role": "user", "content": "What did we decide about context assembly?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_recall_1", + "type": "function", + "function": { + "name": recall_tool_name, + "arguments": json.dumps({"query": "context assembly decision"}), + }, + }, + { + "id": "call_shell_1", + "type": "function", + "function": { + "name": "shell_command", + "arguments": json.dumps({"command": "rg preassemble"}), + }, + }, + ], + }, + { + "role": "tool", + "tool_call_id": "call_recall_1", + "name": recall_tool_name, + "content": json.dumps({ + "results": [ + { + "uri": "viking://user/hermes/memories/context", + "abstract": "Old OpenViking memory content", + } + ] + }), + }, + { + "role": "tool", + "tool_call_id": "call_shell_1", + "name": "shell_command", + "content": "plugins/memory/openviking/__init__.py", + }, + {"role": "assistant", "content": "We decided to keep sync_turn scoped to ingestion."}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert [message["role"] for message in batch] == ["user", "user", "assistant"] + assert batch[1]["parts"] == [ + { + "type": "tool", + "tool_id": "call_shell_1", + "tool_name": "shell_command", + "tool_input": {"command": "rg preassemble"}, + "tool_output": "plugins/memory/openviking/__init__.py", + "tool_status": "completed", + } + ] + batch_text = json.dumps(batch) + assert recall_tool_name not in batch_text + assert "Old OpenViking memory content" not in batch_text + + class TestOpenVikingRead: def test_overview_read_normalizes_uri_and_unwraps_result(self): provider = OpenVikingMemoryProvider() diff --git a/tests/plugins/memory/test_openviking_provider.py b/tests/plugins/memory/test_openviking_provider.py index 954385fa54e..2863566b367 100644 --- a/tests/plugins/memory/test_openviking_provider.py +++ b/tests/plugins/memory/test_openviking_provider.py @@ -1975,7 +1975,10 @@ def test_on_session_switch_commits_old_session_and_rotates_id(): provider.on_session_switch("new-sid", parent_session_id="old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -1998,7 +2001,10 @@ def test_on_session_switch_commits_pending_tokens_without_turn_count(): provider.on_session_switch("new-sid") provider._client.get.assert_called_once_with("/api/v1/sessions/old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -2051,7 +2057,10 @@ def test_on_session_switch_waits_for_inflight_sync_thread(): provider.on_session_switch("new-sid") assert join_calls, "expected on_session_switch to join the in-flight sync thread" - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) def test_on_session_switch_noop_on_empty_new_id(): @@ -2206,7 +2215,10 @@ def test_on_session_end_marks_session_clean_after_successful_commit(): provider.on_session_end([]) - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._turn_count == 0 @@ -2228,7 +2240,10 @@ def test_on_session_end_commits_pending_tokens_without_turn_count(): provider.on_session_end([]) provider._client.get.assert_called_once_with("/api/v1/sessions/old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) def test_end_then_switch_does_not_double_commit(): @@ -2241,7 +2256,10 @@ def test_end_then_switch_does_not_double_commit(): provider.on_session_switch("new-sid", parent_session_id="old-sid") # Exactly one commit call, on the OLD session, fired by on_session_end. - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -2253,7 +2271,10 @@ def test_end_then_switch_with_pending_tokens_does_not_double_commit(): provider.on_session_end([]) provider.on_session_switch("new-sid", parent_session_id="old-sid") - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) assert provider._session_id == "new-sid" assert provider._turn_count == 0 @@ -2400,7 +2421,10 @@ def test_on_session_switch_does_not_block_caller_on_slow_drain(): # Let the finalizer finish so it doesn't leak past the test. release_drain.set() assert provider._drain_finalizers(timeout=5.0) - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) def test_on_session_switch_defers_old_commit_to_finalizer_thread(): @@ -2415,7 +2439,7 @@ def test_on_session_switch_defers_old_commit_to_finalizer_thread(): committed = threading.Event() drain_timeouts = [] - def fake_post(path): + def fake_post(path, payload=None): committed.set() return {} @@ -2433,7 +2457,10 @@ def test_on_session_switch_defers_old_commit_to_finalizer_thread(): assert provider._turn_count == 0 # The old-session commit lands on the finalizer thread, not inline. assert committed.wait(timeout=5.0), "old session was not finalized off-thread" - provider._client.post.assert_called_once_with("/api/v1/sessions/old-sid/commit") + provider._client.post.assert_called_once_with( + "/api/v1/sessions/old-sid/commit", + {"keep_recent_count": 0}, + ) # The finalizer drains with the deferred (longer) budget, not inline 10s. assert drain_timeouts == [_DEFERRED_COMMIT_TIMEOUT] From d7cd0bc0863cda1a203f00422b1441ca2d9890ed Mon Sep 17 00:00:00 2001 From: Hao Zhe Date: Fri, 19 Jun 2026 13:42:36 +0800 Subject: [PATCH 2/7] fix(openviking): preserve structured sync attribution --- agent/codex_runtime.py | 1 + agent/message_content.py | 50 +++++++++++++ plugins/memory/openviking/__init__.py | 36 +++++----- tests/agent/test_message_content.py | 25 +++++++ tests/openviking_plugin/test_openviking.py | 36 +++++++++- .../memory/test_openviking_provider.py | 72 +++++++++++++++++++ .../test_codex_app_server_integration.py | 13 +++- 7 files changed, 210 insertions(+), 23 deletions(-) create mode 100644 agent/message_content.py create mode 100644 tests/agent/test_message_content.py diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 7f175fff97f..4ff67871934 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -290,6 +290,7 @@ def run_codex_app_server_turn( original_user_message=original_user_message, final_response=turn.final_text, interrupted=False, + messages=messages, ) except Exception: logger.debug("external memory sync raised", exc_info=True) diff --git a/agent/message_content.py b/agent/message_content.py new file mode 100644 index 00000000000..c42bf408550 --- /dev/null +++ b/agent/message_content.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + + +_NON_TEXT_PART_TYPES = {"image", "image_url", "input_image", "audio", "input_audio"} +_TEXT_KEYS = ("text", "content", "input_text", "output_text", "summary_text") + + +def _field(value: Any, key: str) -> Any: + if isinstance(value, Mapping): + return value.get(key) + return getattr(value, key, None) + + +def _text_from_part(part: Any) -> str: + if part is None: + return "" + if isinstance(part, str): + return part + + part_type = str(_field(part, "type") or "").strip().lower() + if part_type in _NON_TEXT_PART_TYPES: + return "" + + for key in _TEXT_KEYS: + text = _field(part, key) + if isinstance(text, str): + return text + return "" + + +def flatten_message_text(content: Any, *, sep: str = "\n") -> str: + """Return the visible text from common chat/Responses message content shapes.""" + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + chunks = [_text_from_part(part) for part in content] + return sep.join(chunk for chunk in chunks if chunk) + + text = _text_from_part(content) + if text: + return text + try: + return str(content) + except Exception: + return "" diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index c7b05a4864c..82f1f26a0a0 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -45,6 +45,7 @@ from typing import Any, Callable, Dict, List, Optional, Set from urllib.parse import urlparse from urllib.request import url2pathname +from agent.message_content import flatten_message_text from agent.memory_provider import MemoryProvider from agent.skill_commands import extract_user_instruction_from_skill_message from tools.registry import tool_error @@ -2313,22 +2314,7 @@ class OpenVikingMemoryProvider(MemoryProvider): @staticmethod def _message_text(content: Any) -> str: """Extract text from OpenAI-style string/list content.""" - if isinstance(content, str): - return content - if isinstance(content, list): - chunks = [] - for block in content: - if isinstance(block, str): - chunks.append(block) - elif isinstance(block, dict): - if block.get("type") == "text" and isinstance(block.get("text"), str): - chunks.append(block["text"]) - elif isinstance(block.get("content"), str): - chunks.append(block["content"]) - return "\n".join(chunk for chunk in chunks if chunk) - if content is None: - return "" - return str(content) + return flatten_message_text(content) @classmethod def _message_matches_text(cls, message: Dict[str, Any], expected: Any) -> bool: @@ -2460,8 +2446,11 @@ class OpenVikingMemoryProvider(MemoryProvider): def _messages_to_openviking_batch( cls, messages: List[Dict[str, Any]], + *, + assistant_peer_id: str = "", ) -> List[Dict[str, Any]]: """Convert Hermes canonical messages into OpenViking batch payloads.""" + assistant_peer_id = str(assistant_peer_id or "").strip() tool_calls_by_id: Dict[str, Dict[str, Any]] = {} completed_tool_ids: set[str] = set() skipped_tool_ids: set[str] = set() @@ -2493,10 +2482,16 @@ class OpenVikingMemoryProvider(MemoryProvider): payload_messages: List[Dict[str, Any]] = [] pending_tool_parts: List[Dict[str, Any]] = [] + def payload_message(role: str, parts: List[Dict[str, Any]]) -> Dict[str, Any]: + payload: Dict[str, Any] = {"role": role, "parts": parts} + if role == "assistant" and assistant_peer_id: + payload["peer_id"] = assistant_peer_id + return payload + def flush_tool_parts() -> None: nonlocal pending_tool_parts if pending_tool_parts: - payload_messages.append({"role": "user", "parts": pending_tool_parts}) + payload_messages.append(payload_message("assistant", pending_tool_parts)) pending_tool_parts = [] for message in messages: @@ -2552,7 +2547,7 @@ class OpenVikingMemoryProvider(MemoryProvider): }) if parts: - payload_messages.append({"role": role, "parts": parts}) + payload_messages.append(payload_message(role, parts)) flush_tool_parts() return payload_messages @@ -2584,7 +2579,10 @@ class OpenVikingMemoryProvider(MemoryProvider): if message.get("role") == "user": message["content"] = user_content break - batch_messages = self._messages_to_openviking_batch(turn_messages) + batch_messages = self._messages_to_openviking_batch( + turn_messages, + assistant_peer_id=getattr(self, "_agent", _DEFAULT_AGENT), + ) if _sync_trace_enabled(): logger.info( diff --git a/tests/agent/test_message_content.py b/tests/agent/test_message_content.py new file mode 100644 index 00000000000..0207d63600b --- /dev/null +++ b/tests/agent/test_message_content.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from types import SimpleNamespace + +from agent.message_content import flatten_message_text + + +def test_flatten_message_text_accepts_chat_and_responses_text_parts(): + content = [ + {"type": "text", "text": "chat text"}, + {"type": "input_text", "text": "user text"}, + {"type": "output_text", "text": "assistant text"}, + {"type": "summary_text", "text": "summary text"}, + ] + + assert flatten_message_text(content) == "chat text\nuser text\nassistant text\nsummary text" + + +def test_flatten_message_text_accepts_object_parts(): + content = [ + SimpleNamespace(type="output_text", text="object text"), + {"content": "legacy content"}, + ] + + assert flatten_message_text(content) == "object text\nlegacy content" diff --git a/tests/openviking_plugin/test_openviking.py b/tests/openviking_plugin/test_openviking.py index ee5d1eb2373..3a743287672 100644 --- a/tests/openviking_plugin/test_openviking.py +++ b/tests/openviking_plugin/test_openviking.py @@ -330,7 +330,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) - assert [message["role"] for message in batch] == ["user", "assistant", "user", "assistant"] + assert [message["role"] for message in batch] == ["user", "assistant", "assistant", "assistant"] assert batch[0]["parts"] == [ {"type": "text", "text": "Please inspect the repository for assemble hooks."} ] @@ -378,6 +378,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + assert batch[1]["role"] == "assistant" assert batch[1]["parts"] == [ { "type": "tool", @@ -453,7 +454,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) - assert [message["role"] for message in batch] == ["user", "user", "assistant"] + assert [message["role"] for message in batch] == ["user", "assistant", "assistant"] assert batch[1]["parts"] == [ { "type": "tool", @@ -523,7 +524,7 @@ class TestOpenVikingTurnConversion: batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) - assert [message["role"] for message in batch] == ["user", "user", "assistant"] + assert [message["role"] for message in batch] == ["user", "assistant", "assistant"] assert batch[1]["parts"] == [ { "type": "tool", @@ -538,6 +539,35 @@ class TestOpenVikingTurnConversion: assert recall_tool_name not in batch_text assert "Old OpenViking memory content" not in batch_text + def test_messages_to_openviking_batch_preserves_responses_text_parts(self): + turn = [ + {"role": "user", "content": [{"type": "input_text", "text": "hello"}]}, + {"role": "assistant", "content": [{"type": "output_text", "text": "answer"}]}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + assert batch == [ + {"role": "user", "parts": [{"type": "text", "text": "hello"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "answer"}]}, + ] + + def test_messages_to_openviking_batch_adds_assistant_peer_id_when_requested(self): + turn = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "answer"}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch( + turn, + assistant_peer_id="hermes", + ) + + assert batch == [ + {"role": "user", "parts": [{"type": "text", "text": "hello"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "answer"}], "peer_id": "hermes"}, + ] + class TestOpenVikingRead: def test_overview_read_normalizes_uri_and_unwraps_result(self): diff --git a/tests/plugins/memory/test_openviking_provider.py b/tests/plugins/memory/test_openviking_provider.py index 2863566b367..28f2d8e9d46 100644 --- a/tests/plugins/memory/test_openviking_provider.py +++ b/tests/plugins/memory/test_openviking_provider.py @@ -2195,6 +2195,78 @@ def test_sync_turn_retries_batch_write_with_fresh_client(): )] +def test_sync_turn_structured_messages_include_assistant_peer_id(): + provider = OpenVikingMemoryProvider() + provider._client = MagicMock() + provider._endpoint = "http://test" + provider._api_key = "" + provider._account = "acct" + provider._user = "usr" + provider._agent = "hermes" + provider._session_id = "sid-structured" + + captured = [] + + class StubClient: + def __init__(self, *a, **kw): + pass + + def post(self, path, payload=None, **kwargs): + captured.append((path, payload)) + return {} + + import plugins.memory.openviking as _mod + + real_client_cls = _mod._VikingClient + _mod._VikingClient = StubClient + messages = [ + {"role": "user", "content": [{"type": "input_text", "text": "u"}]}, + { + "role": "assistant", + "content": "Looking.", + "tool_calls": [ + { + "id": "call-1", + "type": "function", + "function": {"name": "shell_command", "arguments": json.dumps({"cmd": "pwd"})}, + } + ], + }, + {"role": "tool", "tool_call_id": "call-1", "name": "shell_command", "content": "ok"}, + {"role": "assistant", "content": [{"type": "output_text", "text": "a"}]}, + ] + try: + provider.sync_turn("u", "a", messages=messages) + assert provider._drain_writers("sid-structured", timeout=2.0) + finally: + _mod._VikingClient = real_client_cls + + assert captured == [( + "/api/v1/sessions/sid-structured/messages/batch", + { + "messages": [ + {"role": "user", "parts": [{"type": "text", "text": "u"}]}, + {"role": "assistant", "parts": [{"type": "text", "text": "Looking."}], "peer_id": "hermes"}, + { + "role": "assistant", + "parts": [ + { + "type": "tool", + "tool_id": "call-1", + "tool_name": "shell_command", + "tool_input": {"cmd": "pwd"}, + "tool_output": "ok", + "tool_status": "completed", + } + ], + "peer_id": "hermes", + }, + {"role": "assistant", "parts": [{"type": "text", "text": "a"}], "peer_id": "hermes"}, + ] + }, + )] + + def test_sync_turn_noop_when_session_id_blank(): provider = OpenVikingMemoryProvider() provider._client = MagicMock() diff --git a/tests/run_agent/test_codex_app_server_integration.py b/tests/run_agent/test_codex_app_server_integration.py index 14c058178b9..b0d2ec23861 100644 --- a/tests/run_agent/test_codex_app_server_integration.py +++ b/tests/run_agent/test_codex_app_server_integration.py @@ -12,7 +12,7 @@ Verifies that: from __future__ import annotations -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -148,6 +148,17 @@ class TestRunConversationCodexPath: and m.get("content") == "echo: hello"] assert final, f"expected final assistant message in {msgs}" + def test_projected_messages_are_synced_to_external_memory(self, fake_session): + agent = _make_codex_agent() + agent._memory_manager = MagicMock() + agent._memory_manager.build_system_prompt.return_value = "" + + with patch.object(agent, "_spawn_background_review", return_value=None): + result = agent.run_conversation("hello") + + agent._memory_manager.sync_all.assert_called_once() + assert agent._memory_manager.sync_all.call_args.kwargs["messages"] == result["messages"] + def test_nudge_counters_tick(self, fake_session): """The skill nudge counter must accumulate tool_iterations across turns. The memory nudge counter is gated on memory being configured From 5a856bdfa355bb45330a23ecb63abdf9b810e865 Mon Sep 17 00:00:00 2001 From: Hao Zhe Date: Fri, 19 Jun 2026 15:38:25 +0800 Subject: [PATCH 3/7] chore(release): add OpenViking contributor attribution --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index 6c5d33ec3a1..4e5f8844439 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -1577,6 +1577,7 @@ AUTHOR_MAP = { "sunsky.lau@gmail.com": "liuhao1024", # PR #45494 salvage (claim session slot before auto-resume task; #45456) "andrewdmwalker@gmail.com": "capt-marbles", # PR #38440 salvage (resolve xAI OAuth credentials across profiles; #43589) "infinitycrew39@gmail.com": "infinitycrew39", # PR #47945 salvage (scope langfuse trace state by turn/request ids; #48292) + "eurekaxun@163.com": "huangxun375-stack", # PR #37251 / #48894 structured OpenViking sync } From fcac0f94d4844f904a6eaa8a2b667299408b9f92 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 19 Jun 2026 13:53:39 +0530 Subject: [PATCH 4/7] fix(openviking): guard empty tool_id in batch skip set; reuse env_var_enabled Two follow-up fixes on top of the cherry-picked structured-sync work: - _messages_to_openviking_batch only added a recall tool result's id to skipped_tool_ids when the id was non-empty. An empty tool_call_id (which the canonical transcript can carry; agent_runtime_helpers defaults it to "") poisoned the skip set with "", silently dropping any *other* tool result that also lacked an id. Move the recall-skip add inside the existing `if tool_id:` guard. Adds a regression test (mutation-checked: fails on pre-fix code, passes after). - _sync_trace_enabled() open-coded the canonical truthy-env check; reuse utils.env_var_enabled (byte-identical {1,true,yes,on} semantics). --- plugins/memory/openviking/__init__.py | 8 ++-- tests/openviking_plugin/test_openviking.py | 45 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 82f1f26a0a0..a57a60e67bd 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -49,7 +49,7 @@ from agent.message_content import flatten_message_text from agent.memory_provider import MemoryProvider from agent.skill_commands import extract_user_instruction_from_skill_message from tools.registry import tool_error -from utils import atomic_json_write +from utils import atomic_json_write, env_var_enabled logger = logging.getLogger(__name__) @@ -160,7 +160,7 @@ def _derive_openviking_user_text(content: Any) -> str: def _sync_trace_enabled() -> bool: - return os.environ.get(_SYNC_TRACE_ENV, "").strip().lower() in {"1", "true", "yes", "on"} + return env_var_enabled(_SYNC_TRACE_ENV) def _preview(value: Any, limit: int = 160) -> str: @@ -2461,8 +2461,8 @@ class OpenVikingMemoryProvider(MemoryProvider): tool_id = str(message.get("tool_call_id") or message.get("id") or "") if tool_id: completed_tool_ids.add(tool_id) - if cls._is_openviking_recall_tool_name(message.get("name")): - skipped_tool_ids.add(tool_id) + if cls._is_openviking_recall_tool_name(message.get("name")): + skipped_tool_ids.add(tool_id) continue if message.get("role") != "assistant": continue diff --git a/tests/openviking_plugin/test_openviking.py b/tests/openviking_plugin/test_openviking.py index 3a743287672..171e6abc8ac 100644 --- a/tests/openviking_plugin/test_openviking.py +++ b/tests/openviking_plugin/test_openviking.py @@ -539,6 +539,51 @@ class TestOpenVikingTurnConversion: assert recall_tool_name not in batch_text assert "Old OpenViking memory content" not in batch_text + def test_messages_to_openviking_batch_empty_tool_id_does_not_drop_other_results(self): + # A recall tool result that arrives with an empty tool_call_id must not + # poison the skip set with "" and silently drop unrelated tool results + # that also lack an id. Empty tool_call_id is reachable in the canonical + # transcript (agent_runtime_helpers defaults it to ""). + turn = [ + {"role": "user", "content": "What did we decide?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "", + "type": "function", + "function": { + "name": "viking_search", + "arguments": json.dumps({"query": "decision"}), + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "", + "name": "viking_search", + "content": json.dumps({"results": ["recall stuff"]}), + }, + { + "role": "tool", + "tool_call_id": "", + "name": "shell_command", + "content": "important shell output", + }, + {"role": "assistant", "content": "done"}, + ] + + batch = OpenVikingMemoryProvider._messages_to_openviking_batch(turn) + + batch_text = json.dumps(batch) + # The unrelated (empty-id) shell result must survive. + assert "important shell output" in batch_text + # The recall tool result must still be excluded. + assert "recall stuff" not in batch_text + assert "viking_search" not in batch_text + def test_messages_to_openviking_batch_preserves_responses_text_parts(self): turn = [ {"role": "user", "content": [{"type": "input_text", "text": "hello"}]}, From 27a6e188c4b4bc66f52b321f055fe18aa866b545 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:01:16 +0530 Subject: [PATCH 5/7] refactor(openviking): derive recall-tool name set from canonical schemas _OPENVIKING_RECALL_TOOL_NAMES hardcoded the three read-tool names as string literals, which can silently desync from the *_SCHEMA["name"] constants on a rename (the same drift the adjacent _CATEGORY_SUBDIR_MAP comment warns about). Derive the set from SEARCH/READ/BROWSE_SCHEMA["name"] instead. Write tools (viking_remember / viking_add_resource) remain intentionally excluded. Set contents are unchanged. --- plugins/memory/openviking/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index a57a60e67bd..95edaca47d8 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -72,7 +72,6 @@ _SESSION_DRAIN_TIMEOUT = 10.0 _DEFERRED_COMMIT_TIMEOUT = (_TIMEOUT * 2) + 5.0 _REMOTE_RESOURCE_PREFIXES = ("http://", "https://", "git@", "ssh://", "git://") _SYNC_TRACE_ENV = "HERMES_OPENVIKING_SYNC_TRACE" -_OPENVIKING_RECALL_TOOL_NAMES = {"viking_search", "viking_read", "viking_browse"} # Maps the viking_remember `category` enum to a viking:// subdirectory. # Keep in sync with REMEMBER_SCHEMA.parameters.properties.category.enum. @@ -503,6 +502,17 @@ ADD_RESOURCE_SCHEMA = { } +# Recall tools (read-only) whose results we never re-ingest into OpenViking — +# echoing recalled memory back into the session transcript would re-store it. +# Write tools (viking_remember / viking_add_resource) are intentionally NOT +# here. Derived from the canonical schema names so renames can't desync. +_OPENVIKING_RECALL_TOOL_NAMES = { + SEARCH_SCHEMA["name"], + READ_SCHEMA["name"], + BROWSE_SCHEMA["name"], +} + + def _zip_directory(dir_path: Path) -> Path: """Create a temporary zip file containing a directory tree.""" root = dir_path.resolve() From 2d4046c6de975eff194d6ebdfa4180e5ed86c422 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:03:49 +0530 Subject: [PATCH 6/7] refactor(openviking): reuse pre-scanned tool_input for pending tool calls _messages_to_openviking_batch's pre-scan already parses and caches each tool call's arguments into tool_calls_by_id. The pending-tool-call branch re-parsed them via _tool_call_input(), a second parse and a second source of truth. Reuse the cached tool_input when the id was cached (non-empty), falling back to a parse only for the uncached empty-id case so arguments are never dropped. No behavior change. --- plugins/memory/openviking/__init__.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 95edaca47d8..9c1029d4a89 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -2548,11 +2548,20 @@ class OpenVikingMemoryProvider(MemoryProvider): continue if tool_id in completed_tool_ids: continue + # Reuse the tool_input parsed in the pre-scan when available + # (non-empty ids are cached); fall back to parsing for the + # uncached empty-id case so we never drop arguments. + prior_call = tool_calls_by_id.get(tool_id) if tool_id else None + tool_input = ( + prior_call["tool_input"] + if prior_call is not None + else cls._tool_call_input(tool_call) + ) parts.append({ "type": "tool", "tool_id": tool_id, "tool_name": tool_name, - "tool_input": cls._tool_call_input(tool_call), + "tool_input": tool_input, "tool_status": "pending", }) From be2c2beb96e578542b24bdb275071044a853ebbd Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 19 Jun 2026 14:05:40 +0530 Subject: [PATCH 7/7] refactor(openviking): name tool_status constants and alias sets The batch tool_status values ('completed'/'error'/'pending') and the inbound status alias sets were inline magic strings, duplicated across two checks in _tool_result_status. Hoist them to module-level constants (_TOOL_STATUS_* + _TOOL_STATUS_{ERROR,COMPLETED}_ALIASES) so the canonical wire values and the alias->canonical mapping live in one place. Emitted values are unchanged. --- plugins/memory/openviking/__init__.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 9c1029d4a89..b4d44be88af 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -512,6 +512,14 @@ _OPENVIKING_RECALL_TOOL_NAMES = { BROWSE_SCHEMA["name"], } +# Canonical tool_status values emitted in OpenViking batch tool parts. +_TOOL_STATUS_COMPLETED = "completed" +_TOOL_STATUS_ERROR = "error" +_TOOL_STATUS_PENDING = "pending" +# Inbound status aliases (from varied tool-result shapes) -> canonical above. +_TOOL_STATUS_ERROR_ALIASES = {"error", "failed", "failure"} +_TOOL_STATUS_COMPLETED_ALIASES = {"completed", "complete", "success", "succeeded"} + def _zip_directory(dir_path: Path) -> Path: """Create a temporary zip file containing a directory tree.""" @@ -2429,10 +2437,10 @@ class OpenVikingMemoryProvider(MemoryProvider): @classmethod def _tool_result_status(cls, message: Dict[str, Any]) -> str: raw_status = str(message.get("status") or message.get("tool_status") or "").lower() - if raw_status in {"error", "failed", "failure"}: - return "error" - if raw_status in {"completed", "complete", "success", "succeeded"}: - return "completed" + if raw_status in _TOOL_STATUS_ERROR_ALIASES: + return _TOOL_STATUS_ERROR + if raw_status in _TOOL_STATUS_COMPLETED_ALIASES: + return _TOOL_STATUS_COMPLETED text = cls._message_text(message.get("content")).strip() if text: @@ -2444,13 +2452,14 @@ class OpenVikingMemoryProvider(MemoryProvider): status = str(parsed.get("status") or "").lower() exit_code = parsed.get("exit_code") if ( - status in {"error", "failed", "failure"} + status in _TOOL_STATUS_ERROR_ALIASES or parsed.get("success") is False or bool(parsed.get("error")) or (isinstance(exit_code, int) and exit_code != 0) ): - return "error" - return "completed" + return _TOOL_STATUS_ERROR + + return _TOOL_STATUS_COMPLETED @classmethod def _messages_to_openviking_batch( @@ -2562,7 +2571,7 @@ class OpenVikingMemoryProvider(MemoryProvider): "tool_id": tool_id, "tool_name": tool_name, "tool_input": tool_input, - "tool_status": "pending", + "tool_status": _TOOL_STATUS_PENDING, }) if parts: