mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix(streaming): stop duplicating tool-call args from cumulative-resend providers (#35718)
DeepSeek / Baidu Qianfan stream tool-call arguments in cumulative mode:
each chunk resends the full arguments-so-far instead of the new fragment.
The stream accumulator blindly concatenated arg deltas with +=, turning
that into '{...}{...}{...}', which failed json.loads and got nuked to '{}'
— a silently corrupted tool call (#35592). Worse on multi-param tools
(search_files, session_search, memory replace) because longer args take
more chunks, giving more resend opportunities.
- Per-slot cumulative latch in the stream accumulator: a delta that is a
strict superset of the accumulated buffer marks the slot cumulative and
replaces (not appends); exact duplicates are dropped only after latching.
Incremental fragments are untouched (default += path).
- Backstop _collapse_repeated_json_arguments() in the repair pipeline
collapses pure identical-resend buffers (K exact repeats of a valid-JSON
unit) for providers that resend the complete object from chunk 1. Only
reached after json.loads already failed, so compliant single objects are
never touched.
Not a gateway or DeepSeek-model bug — any OpenAI-wire provider in
cumulative streaming mode is affected.
This commit is contained in:
parent
0ffbcbbe7d
commit
ca03486b6a
4 changed files with 295 additions and 1 deletions
|
|
@ -1750,6 +1750,12 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||||
# call starting at the same index and redirect it to a fresh slot.
|
# call starting at the same index and redirect it to a fresh slot.
|
||||||
_last_id_at_idx: dict = {} # raw_index -> last seen non-empty id
|
_last_id_at_idx: dict = {} # raw_index -> last seen non-empty id
|
||||||
_active_slot_by_idx: dict = {} # raw_index -> current slot in tool_calls_acc
|
_active_slot_by_idx: dict = {} # raw_index -> current slot in tool_calls_acc
|
||||||
|
# Per-slot latch: set once a slot is positively identified as a
|
||||||
|
# cumulative-resend stream (a delta that is a strict superset of the
|
||||||
|
# accumulated buffer). Until latched, deltas are appended normally;
|
||||||
|
# after latching, the buffer is replaced and exact-duplicate deltas
|
||||||
|
# are dropped. See the argument-accumulation block below (#35592).
|
||||||
|
_cumulative_args_slot: set = set()
|
||||||
finish_reason = None
|
finish_reason = None
|
||||||
model_name = None
|
model_name = None
|
||||||
role = "assistant"
|
role = "assistant"
|
||||||
|
|
@ -1867,7 +1873,44 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||||
# Vercel AI patterns) is immune to this.
|
# Vercel AI patterns) is immune to this.
|
||||||
entry["function"]["name"] = tc_delta.function.name
|
entry["function"]["name"] = tc_delta.function.name
|
||||||
if tc_delta.function.arguments:
|
if tc_delta.function.arguments:
|
||||||
entry["function"]["arguments"] += tc_delta.function.arguments
|
# Argument deltas are normally incremental
|
||||||
|
# fragments (OpenAI spec), so the default is to
|
||||||
|
# concatenate. But some OpenAI-compatible
|
||||||
|
# providers (DeepSeek / Baidu Qianfan, #35592)
|
||||||
|
# operate in *cumulative* mode: each chunk
|
||||||
|
# resends the full arguments-so-far instead of
|
||||||
|
# the new fragment. Blind += turns that into
|
||||||
|
# '{...}{...}{...}', corrupting the tool call.
|
||||||
|
#
|
||||||
|
# Detect cumulative mode per-slot: in cumulative
|
||||||
|
# mode the new delta is a superset that starts
|
||||||
|
# with everything accumulated so far (monotonic
|
||||||
|
# growth), and an exact resend equals it.
|
||||||
|
# Incremental fragments are JSON suffixes that do
|
||||||
|
# NOT restate the accumulated prefix, so this is
|
||||||
|
# unambiguous on the full buffer (not a partial
|
||||||
|
# per-chunk guess).
|
||||||
|
_new = tc_delta.function.arguments
|
||||||
|
_prev = entry["function"]["arguments"]
|
||||||
|
if not _prev:
|
||||||
|
entry["function"]["arguments"] = _new
|
||||||
|
elif len(_new) > len(_prev) and _new.startswith(_prev):
|
||||||
|
# Strict superset of the accumulated buffer —
|
||||||
|
# the unambiguous cumulative-resend signature.
|
||||||
|
# Latch the slot and replace (don't append).
|
||||||
|
_cumulative_args_slot.add(idx)
|
||||||
|
entry["function"]["arguments"] = _new
|
||||||
|
elif idx in _cumulative_args_slot and _new == _prev:
|
||||||
|
# Already a confirmed cumulative slot and this
|
||||||
|
# is an exact full resend — drop the duplicate.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# Incremental fragment — normal append. Note
|
||||||
|
# an exact-equal delta on a NON-latched slot is
|
||||||
|
# treated as a real fragment, never silently
|
||||||
|
# dropped, so genuine incremental streams are
|
||||||
|
# untouched.
|
||||||
|
entry["function"]["arguments"] = _prev + _new
|
||||||
extra = getattr(tc_delta, "extra_content", None)
|
extra = getattr(tc_delta, "extra_content", None)
|
||||||
if extra is None and hasattr(tc_delta, "model_extra"):
|
if extra is None and hasattr(tc_delta, "model_extra"):
|
||||||
extra = (tc_delta.model_extra or {}).get("extra_content")
|
extra = (tc_delta.model_extra or {}).get("extra_content")
|
||||||
|
|
|
||||||
|
|
@ -182,6 +182,58 @@ def _escape_invalid_chars_in_json_strings(raw: str) -> str:
|
||||||
return "".join(out)
|
return "".join(out)
|
||||||
|
|
||||||
|
|
||||||
|
def _collapse_repeated_json_arguments(raw_stripped: str) -> str | None:
|
||||||
|
"""Collapse concatenated-duplicate tool-call argument JSON.
|
||||||
|
|
||||||
|
Some OpenAI-compatible streaming providers (DeepSeek / Baidu Qianfan
|
||||||
|
among them) resend the *full* cumulative ``arguments`` string in every
|
||||||
|
stream chunk instead of incremental fragments. Our stream accumulator
|
||||||
|
concatenates argument deltas with ``+=`` (correct for spec-compliant
|
||||||
|
incremental providers), so a cumulative-resend provider yields the
|
||||||
|
object repeated once per chunk:
|
||||||
|
|
||||||
|
'{"path":"x"}{"path":"x"}{"path":"x"}...'
|
||||||
|
|
||||||
|
A spec-compliant provider always produces exactly one valid JSON object
|
||||||
|
after concatenation, so this function is only ever reached for strings
|
||||||
|
that already fail ``json.loads`` (issue #35592).
|
||||||
|
|
||||||
|
Detection is unambiguous and operates on the fully-accumulated string,
|
||||||
|
not on partial per-chunk data: the input must be K>=2 *exact* repeats of
|
||||||
|
a unit substring that itself parses as valid JSON. Returns the single
|
||||||
|
collapsed unit (re-serialised compactly) when that holds, else ``None``
|
||||||
|
so the caller falls through to the generic repair passes.
|
||||||
|
|
||||||
|
Safety: a single object like ``{"command":{"command":"x"}}`` parses on
|
||||||
|
the first ``json.loads`` attempt in the caller and never reaches here;
|
||||||
|
even if it did, it is not an exact-repeat string, so this returns
|
||||||
|
``None`` and leaves it untouched.
|
||||||
|
"""
|
||||||
|
n = len(raw_stripped)
|
||||||
|
if n < 2:
|
||||||
|
return None
|
||||||
|
# The unit must divide the total length evenly for an exact K-repeat.
|
||||||
|
# Try the smallest plausible unit first (divisors of n, ascending),
|
||||||
|
# capping the repeat count so a pathological input can't blow up.
|
||||||
|
for unit_len in range(1, n // 2 + 1):
|
||||||
|
if n % unit_len != 0:
|
||||||
|
continue
|
||||||
|
k = n // unit_len
|
||||||
|
if k < 2:
|
||||||
|
continue
|
||||||
|
unit = raw_stripped[:unit_len]
|
||||||
|
if unit * k != raw_stripped:
|
||||||
|
continue
|
||||||
|
# The repeated unit must itself be valid JSON for this to be a
|
||||||
|
# cumulative-resend collapse and not a coincidental string repeat.
|
||||||
|
try:
|
||||||
|
parsed = json.loads(unit, strict=False)
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
continue
|
||||||
|
return json.dumps(parsed, separators=(",", ":"))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str:
|
def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str:
|
||||||
"""Attempt to repair malformed tool_call argument JSON.
|
"""Attempt to repair malformed tool_call argument JSON.
|
||||||
|
|
||||||
|
|
@ -220,6 +272,20 @@ def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str:
|
||||||
except (json.JSONDecodeError, TypeError, ValueError):
|
except (json.JSONDecodeError, TypeError, ValueError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Repair pass 1: collapse concatenated-duplicate JSON. Cumulative-resend
|
||||||
|
# streaming providers (DeepSeek / Baidu Qianfan, #35592) yield the full
|
||||||
|
# arguments object repeated once per stream chunk; our += accumulator
|
||||||
|
# turns that into '{...}{...}{...}'. Only reached when pass 0 above
|
||||||
|
# already failed, so spec-compliant single objects are never touched.
|
||||||
|
collapsed = _collapse_repeated_json_arguments(raw_stripped)
|
||||||
|
if collapsed is not None:
|
||||||
|
logger.warning(
|
||||||
|
"Collapsed cumulative-resend duplicate tool_call arguments for "
|
||||||
|
"%s (%d chars -> %d)",
|
||||||
|
tool_name, len(raw_stripped), len(collapsed),
|
||||||
|
)
|
||||||
|
return collapsed
|
||||||
|
|
||||||
# Attempt common JSON repairs
|
# Attempt common JSON repairs
|
||||||
fixed = raw_stripped
|
fixed = raw_stripped
|
||||||
# 1. Strip trailing commas before } or ]
|
# 1. Strip trailing commas before } or ]
|
||||||
|
|
@ -436,6 +502,7 @@ __all__ = [
|
||||||
"_sanitize_messages_surrogates",
|
"_sanitize_messages_surrogates",
|
||||||
"_escape_invalid_chars_in_json_strings",
|
"_escape_invalid_chars_in_json_strings",
|
||||||
"_repair_tool_call_arguments",
|
"_repair_tool_call_arguments",
|
||||||
|
"_collapse_repeated_json_arguments",
|
||||||
"_strip_non_ascii",
|
"_strip_non_ascii",
|
||||||
"_sanitize_messages_non_ascii",
|
"_sanitize_messages_non_ascii",
|
||||||
"_sanitize_tools_non_ascii",
|
"_sanitize_tools_non_ascii",
|
||||||
|
|
|
||||||
|
|
@ -140,3 +140,50 @@ class TestRepairToolCallArguments:
|
||||||
parsed = json.loads(result)
|
parsed = json.loads(result)
|
||||||
assert "line" in parsed["msg"]
|
assert "line" in parsed["msg"]
|
||||||
|
|
||||||
|
# -- Stage 1b: cumulative-resend duplicate collapse (#35592) --
|
||||||
|
|
||||||
|
def test_collapse_duplicate_object_2x(self):
|
||||||
|
"""Two concatenated identical objects collapse to one."""
|
||||||
|
raw = '{"path":"x"}{"path":"x"}'
|
||||||
|
result = _repair_tool_call_arguments(raw, "search_files")
|
||||||
|
assert json.loads(result) == {"path": "x"}
|
||||||
|
|
||||||
|
def test_collapse_duplicate_object_many(self):
|
||||||
|
"""K concatenated identical objects collapse to one."""
|
||||||
|
raw = '{"a":1,"b":2}' * 100
|
||||||
|
result = _repair_tool_call_arguments(raw, "t")
|
||||||
|
assert json.loads(result) == {"a": 1, "b": 2}
|
||||||
|
|
||||||
|
def test_collapse_nested_object_repeat(self):
|
||||||
|
"""Repeated nested-key object collapses correctly."""
|
||||||
|
raw = '{"command":{"command":"x"}}' * 3
|
||||||
|
result = _repair_tool_call_arguments(raw, "t")
|
||||||
|
assert json.loads(result) == {"command": {"command": "x"}}
|
||||||
|
|
||||||
|
def test_single_object_not_touched_by_collapse(self):
|
||||||
|
"""A clean single object never enters the collapse path."""
|
||||||
|
raw = '{"path": "x.py"}'
|
||||||
|
result = _repair_tool_call_arguments(raw, "t")
|
||||||
|
assert json.loads(result) == {"path": "x.py"}
|
||||||
|
|
||||||
|
def test_single_nested_object_not_corrupted(self):
|
||||||
|
"""Nested-key single object is NOT mistaken for a repeat (safety)."""
|
||||||
|
raw = '{"command":{"command":"x"}}'
|
||||||
|
result = _repair_tool_call_arguments(raw, "t")
|
||||||
|
assert json.loads(result) == {"command": {"command": "x"}}
|
||||||
|
|
||||||
|
def test_two_different_objects_not_collapsed(self):
|
||||||
|
"""Distinct concatenated objects are not a clean repeat — collapse
|
||||||
|
declines and the generic repair handles it (returns '{}')."""
|
||||||
|
from agent.message_sanitization import _collapse_repeated_json_arguments
|
||||||
|
assert _collapse_repeated_json_arguments('{"a":1}{"b":2}') is None
|
||||||
|
|
||||||
|
def test_collapse_helper_returns_none_for_valid_single(self):
|
||||||
|
from agent.message_sanitization import _collapse_repeated_json_arguments
|
||||||
|
assert _collapse_repeated_json_arguments('{"a":1}') is None
|
||||||
|
|
||||||
|
def test_collapse_helper_returns_none_for_repeated_non_json(self):
|
||||||
|
from agent.message_sanitization import _collapse_repeated_json_arguments
|
||||||
|
# 'abab' repeats 'ab' but 'ab' is not valid JSON.
|
||||||
|
assert _collapse_repeated_json_arguments('abab') is None
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -184,6 +184,143 @@ class TestStreamingAccumulator:
|
||||||
assert tc[0].function.name == "read_file"
|
assert tc[0].function.name == "read_file"
|
||||||
assert tc[0].function.arguments == '{"path": "x.py"}'
|
assert tc[0].function.arguments == '{"path": "x.py"}'
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
|
def test_args_not_duplicated_on_cumulative_growing_resend(self, mock_close, mock_create):
|
||||||
|
"""DeepSeek / Baidu Qianfan stream args in cumulative mode (#35592).
|
||||||
|
|
||||||
|
Each chunk resends the full arguments-so-far (monotonic growth)
|
||||||
|
instead of the new fragment. Blind += produced a duplicated
|
||||||
|
'{...}{...}' string that failed json.loads and got nuked to '{}'.
|
||||||
|
The per-slot cumulative latch must replace (not append) so the
|
||||||
|
final arguments are a single valid object.
|
||||||
|
"""
|
||||||
|
import json as _json
|
||||||
|
from run_agent import AIAgent
|
||||||
|
|
||||||
|
full = '{"pattern": "def handle", "path": "gateway", "output_mode": "content"}'
|
||||||
|
# Growing prefixes, then identical full resends at the tail.
|
||||||
|
steps = [full[:12], full[:30], full[:48], full, full, full]
|
||||||
|
chunks = [_make_stream_chunk(tool_calls=[
|
||||||
|
_make_tool_call_delta(index=0, tc_id="call_qf", name="search_files")
|
||||||
|
])]
|
||||||
|
for s in steps:
|
||||||
|
chunks.append(_make_stream_chunk(tool_calls=[
|
||||||
|
_make_tool_call_delta(index=0, tc_id="call_qf", name="search_files", arguments=s)
|
||||||
|
]))
|
||||||
|
chunks.append(_make_stream_chunk(finish_reason="tool_calls"))
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.chat.completions.create.return_value = iter(chunks)
|
||||||
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
api_key="test-key",
|
||||||
|
base_url="https://qianfan.baidubce.com/v2",
|
||||||
|
model="deepseek-v4-pro",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
response = agent._interruptible_streaming_api_call({})
|
||||||
|
|
||||||
|
tc = response.choices[0].message.tool_calls
|
||||||
|
assert tc is not None and len(tc) == 1
|
||||||
|
assert _json.loads(tc[0].function.arguments) == _json.loads(full)
|
||||||
|
# Not flagged as truncated (which would set finish_reason='length').
|
||||||
|
assert response.choices[0].finish_reason == "tool_calls"
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
|
def test_args_identical_full_resend_collapsed_by_backstop(self, mock_close, mock_create):
|
||||||
|
"""Provider resends the COMPLETE object identically from chunk 1.
|
||||||
|
|
||||||
|
There is never a strict superset to latch on, so the per-chunk
|
||||||
|
guard appends — but the post-loop _collapse_repeated_json_arguments
|
||||||
|
backstop must collapse the K-repeat to one valid object (#35592).
|
||||||
|
"""
|
||||||
|
import json as _json
|
||||||
|
from run_agent import AIAgent
|
||||||
|
|
||||||
|
full = '{"action": "replace", "old_text": "a", "new_text": "b"}'
|
||||||
|
chunks = [_make_stream_chunk(tool_calls=[
|
||||||
|
_make_tool_call_delta(index=0, tc_id="call_id", name="memory")
|
||||||
|
])]
|
||||||
|
for _ in range(5):
|
||||||
|
chunks.append(_make_stream_chunk(tool_calls=[
|
||||||
|
_make_tool_call_delta(index=0, tc_id="call_id", name="memory", arguments=full)
|
||||||
|
]))
|
||||||
|
chunks.append(_make_stream_chunk(finish_reason="tool_calls"))
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.chat.completions.create.return_value = iter(chunks)
|
||||||
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
api_key="test-key",
|
||||||
|
base_url="https://qianfan.baidubce.com/v2",
|
||||||
|
model="deepseek-v4-pro",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
response = agent._interruptible_streaming_api_call({})
|
||||||
|
|
||||||
|
tc = response.choices[0].message.tool_calls
|
||||||
|
assert tc is not None and len(tc) == 1
|
||||||
|
assert _json.loads(tc[0].function.arguments) == _json.loads(full)
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
|
def test_incremental_args_with_duplicate_leading_fragment(self, mock_close, mock_create):
|
||||||
|
"""Genuine incremental stream whose 2nd fragment equals the 1st.
|
||||||
|
|
||||||
|
Safety guard: an exact-equal delta on a NON-latched slot must be
|
||||||
|
treated as a real fragment and appended, never silently dropped.
|
||||||
|
Verifies the cumulative-resend fix does not corrupt nested-key
|
||||||
|
objects (the false-positive class flagged in review of #35592).
|
||||||
|
"""
|
||||||
|
from run_agent import AIAgent
|
||||||
|
|
||||||
|
# Fragments deliberately repeat the leading '{"command":' substring.
|
||||||
|
frags = ['{"command":', '{"command":', ' "x"}', '}']
|
||||||
|
chunks = [_make_stream_chunk(tool_calls=[
|
||||||
|
_make_tool_call_delta(index=0, tc_id="c3", name="t")
|
||||||
|
])]
|
||||||
|
for f in frags:
|
||||||
|
chunks.append(_make_stream_chunk(tool_calls=[
|
||||||
|
_make_tool_call_delta(index=0, tc_id="c3", name="t", arguments=f)
|
||||||
|
]))
|
||||||
|
chunks.append(_make_stream_chunk(finish_reason="tool_calls"))
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.chat.completions.create.return_value = iter(chunks)
|
||||||
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
api_key="test-key",
|
||||||
|
base_url="https://openrouter.ai/api/v1",
|
||||||
|
model="test/model",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
response = agent._interruptible_streaming_api_call({})
|
||||||
|
|
||||||
|
tc = response.choices[0].message.tool_calls
|
||||||
|
assert tc is not None and len(tc) == 1
|
||||||
|
# All four fragments concatenated — nothing dropped.
|
||||||
|
assert tc[0].function.arguments == '{"command":{"command": "x"}}'
|
||||||
|
|
||||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
def test_tool_call_extra_content_preserved(self, mock_close, mock_create):
|
def test_tool_call_extra_content_preserved(self, mock_close, mock_create):
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue