diff --git a/agent/moa_loop.py b/agent/moa_loop.py index 5fe984e7fd2..9be4f1c1081 100644 --- a/agent/moa_loop.py +++ b/agent/moa_loop.py @@ -26,6 +26,16 @@ logger = logging.getLogger(__name__) # opening dozens of sockets at once. _MAX_REFERENCE_WORKERS = 8 +# Per-tool-result character budget for the advisory reference view. Tool +# results can be huge (a full diff, a 5000-line file dump); replaying them +# verbatim per reference per tool-loop step would blow the reference model's +# context window and cost. We keep the agent's *actions* (tool calls) in full — +# they are cheap, high-signal, and tell the reference what the agent did — but +# preview each tool *result* head+tail so the reference still sees what came +# back without replaying megabytes. The acting aggregator always gets the full, +# untrimmed transcript; this budget only shapes the advisory copy. +_REFERENCE_TOOL_RESULT_BUDGET = 4000 + # System prompt prepended to every reference-model call. References are # advisory — they do NOT act, call tools, or own the task. Without this # framing a reference receives the bare trimmed conversation and assumes it is @@ -192,56 +202,140 @@ def _run_references_parallel( return [r for r in results if r is not None] -def _reference_messages(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: - """Build an advisory-safe view of the conversation for reference models. +def _truncate_tool_result(text: str, budget: int = _REFERENCE_TOOL_RESULT_BUDGET) -> str: + """Head+tail preview of a tool result for the advisory view. - Reference calls are advisory: they never call tools and never emit the - ``tool_calls`` the main model did. Replaying the full transcript verbatim - (a) re-bills the ~8K-token Hermes system prompt per reference per - iteration and (b) risks 400s from strict providers (Mistral, Fireworks) - that reject orphan ``tool`` messages or ``tool_calls`` the reference never - produced. We keep only the user/assistant *text* turns, dropping the - system prompt, any ``tool``-role messages, and any ``tool_calls`` payloads. - - The trimmed view MUST end with a ``user`` turn. An advisory reference call - answers the latest user input; it must never end with an ``assistant`` - turn, which Anthropic (and OpenRouter→Anthropic) interpret as an assistant - *prefill* the model should continue — some models (e.g. Claude Opus 4.8) - reject prefill outright with ``400 ... must end with a user message``. This - is the common mid-tool-loop case: the last assistant turn carried - interleaved reasoning text plus tool calls, so its text survives the trim - while the following ``tool`` result is dropped, leaving a trailing - assistant turn. We strip any trailing assistant turns so the reference sees - a user message last. + Keeps the first and last halves of the budget with a ``[... N chars + omitted ...]`` marker between them, so a reference sees both how the result + started and how it ended without replaying the whole payload. """ - trimmed: list[dict[str, Any]] = [] + if not text or len(text) <= budget: + return text + half = budget // 2 + omitted = len(text) - 2 * half + return f"{text[:half]}\n[... {omitted} chars omitted ...]\n{text[-half:]}" + + +def _render_tool_calls(tool_calls: Any) -> str: + """Render an assistant turn's tool_calls as readable text lines. + + The advisory view cannot carry real ``tool_calls`` payloads (strict + providers reject tool_calls the reference never produced), so the agent's + actions are flattened to text the reference can read and reason about. + """ + lines: list[str] = [] + for tc in tool_calls or []: + fn = (tc.get("function") or {}) if isinstance(tc, dict) else {} + name = fn.get("name") or (tc.get("name") if isinstance(tc, dict) else "") or "tool" + args = fn.get("arguments") + if isinstance(args, str): + args_text = args + elif args is not None: + try: + import json + + args_text = json.dumps(args, ensure_ascii=False) + except Exception: + args_text = str(args) + else: + args_text = "" + lines.append(f"[called tool: {name}({args_text})]" if args_text else f"[called tool: {name}]") + return "\n".join(lines) + + +def _reference_messages(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Build an advisory view of the conversation for reference models. + + A reference gives an INFORMED judgement on the current state, so it must + see what the agent actually did — its tool calls AND the tool results that + came back — not just the agent's narration. We therefore preserve the whole + conversation flow, but flatten it into clean user/assistant *text* turns: + + - system prompt: dropped (8K of Hermes boilerplate, not advisory signal). + - assistant turns: kept; any ``tool_calls`` are rendered inline as + ``[called tool: name(args)]`` text lines appended to the turn's text. + - ``tool``-role results: NOT dropped. Each is folded (head+tail preview, + see ``_truncate_tool_result``) into the *preceding* assistant turn as a + ``[tool result: ...]`` block, so the reference sees what came back. + + This emits ZERO ``tool``-role messages and ZERO ``tool_calls`` arrays — only + plain user/assistant text — so strict providers (Mistral, Fireworks) that + reject orphan tool messages / unproduced tool_calls don't 400, while the + reference still has the full picture. + + The view MUST end with a ``user`` turn. Anthropic (and OpenRouter→Anthropic) + interpret a trailing assistant turn as an assistant *prefill* to continue, + and no-prefill models (e.g. Claude Opus 4.8) reject it with + ``400 ... must end with a user message``. Rather than DELETE the agent's + latest context to satisfy that (which would blind the reference to the + current state), we APPEND a synthetic user turn asking the reference to + judge the state above. End-on-user is satisfied and no context is lost. + + The acting aggregator always receives the full, untrimmed transcript; this + function only shapes the disposable advisory copy. + """ + advisory_instruction = ( + "[The conversation above is the current state of the task. Give your " + "most intelligent judgement: what is going on, what should happen next, " + "what risks or mistakes you see, and how the acting agent should " + "proceed.]" + ) + + rendered: list[dict[str, Any]] = [] + last_user_content: str | None = None for msg in messages: role = msg.get("role") - if role not in ("user", "assistant"): - # Drop system prompt and tool-result messages. - continue content = msg.get("content") - if not isinstance(content, str): - # Skip non-text (multimodal/tool-call-only) assistant turns. - if not content: - continue text = content if isinstance(content, str) else "" - if role == "assistant" and not text.strip(): - # Assistant turn that was purely tool calls — nothing advisory. + + if role == "system": continue - trimmed.append({"role": role, "content": text}) - # Advisory calls must end on a user turn (no assistant prefill). Drop any - # trailing assistant turns left by the tool-loop trim above. - while trimmed and trimmed[-1].get("role") == "assistant": - trimmed.pop() - if not trimmed: - # Degenerate case (e.g. first turn was stripped, or the view trimmed - # down to assistant-only): fall back to the latest user turn so the - # reference still has something to answer. + if role == "user": + if text.strip(): + last_user_content = text + rendered.append({"role": "user", "content": text}) + elif role == "assistant": + parts: list[str] = [] + if text.strip(): + parts.append(text.strip()) + calls_text = _render_tool_calls(msg.get("tool_calls")) + if calls_text: + parts.append(calls_text) + # Empty assistant turns (no text, no calls) carry nothing advisory. + if parts: + rendered.append({"role": "assistant", "content": "\n".join(parts)}) + elif role == "tool": + # Fold the tool result into the preceding assistant turn as text so + # the reference sees what came back, without emitting a tool-role + # message a reference never produced. + result_text = _truncate_tool_result(text) + block = f"[tool result: {result_text}]" + if rendered and rendered[-1].get("role") == "assistant": + rendered[-1]["content"] = rendered[-1]["content"] + "\n" + block + else: + # No assistant turn to attach to (e.g. a leading tool result); + # keep it as advisory context on its own assistant-role line. + rendered.append({"role": "assistant", "content": block}) + # Any other role is ignored. + + # End on a user turn: append a synthetic advisory request rather than + # deleting the agent's latest assistant context. This satisfies Anthropic's + # no-trailing-assistant-prefill rule while preserving full state. + if rendered and rendered[-1].get("role") == "assistant": + rendered.append({"role": "user", "content": advisory_instruction}) + elif rendered and rendered[-1].get("role") == "user": + # Already ends on a user turn (fresh user prompt, no agent action yet). + # Leave it — the reference answers that prompt directly. + pass + + if not rendered: + # Degenerate case: nothing rendered. Fall back to the latest user turn. + if last_user_content is not None: + return [{"role": "user", "content": last_user_content}] for msg in reversed(messages): if msg.get("role") == "user" and isinstance(msg.get("content"), str): return [{"role": "user", "content": msg["content"]}] - return trimmed + return rendered @@ -348,15 +442,16 @@ class MoAChatCompletions: # "moa.aggregating" kwargs: aggregator (label), ref_count # Never raises into the model call — display is best-effort. self.reference_callback = reference_callback - # Turn-scoped reference cache. The agent loop calls create() once per - # tool-loop iteration, but references are advisory for the whole turn: - # the advisory message view (_reference_messages) is identical across - # iterations (it strips tool/tool_call turns) until a new user message - # arrives. Re-running references every iteration would multiply their - # API cost by the tool-loop depth AND re-emit the same blocks to the - # display on every iteration. So cache outputs keyed by the advisory - # view's signature and reuse them — running and showing references once - # per user turn. + # State-scoped reference cache. The agent loop calls create() once per + # tool-loop iteration; references should re-run whenever the task STATE + # advances — i.e. on every new user message AND every new tool result — + # so each reference judges the latest state. The advisory view + # (_reference_messages) now renders tool calls + results as text, so its + # signature changes on every new tool response; the cache key is that + # signature, so a new tool result is a cache MISS (references re-run) + # while a redundant create() call with identical state is a HIT (no + # re-run, no re-emit). This gives "fire on every user/tool response" + # for free, without re-firing on a pure no-op re-call. self._ref_cache_key: tuple | None = None self._ref_cache_outputs: list[tuple[str, str]] = [] diff --git a/tests/run_agent/test_moa_loop_mode.py b/tests/run_agent/test_moa_loop_mode.py index b07a8156281..3e4b8de93a9 100644 --- a/tests/run_agent/test_moa_loop_mode.py +++ b/tests/run_agent/test_moa_loop_mode.py @@ -216,7 +216,15 @@ def test_moa_slot_runtime_falls_back_on_resolution_error(monkeypatch): assert "api_key" not in rt -def test_reference_messages_strips_system_and_tool_history(): +def test_reference_messages_drops_system_but_renders_tools_as_text(): + """System prompt is dropped, but tool calls + results are RENDERED as text. + + A reference must see what the agent did (tool calls) and what came back + (tool results) to give an informed judgement — so neither is stripped. They + are flattened to text so the view carries zero tool-role messages / no + tool_calls arrays (strict providers reject those), while the reference + still has the full picture. The view ends on a user turn. + """ from agent.moa_loop import _reference_messages messages = [ @@ -231,30 +239,31 @@ def test_reference_messages_strips_system_and_tool_history(): {"role": "assistant", "content": "here is my answer"}, ] - trimmed = _reference_messages(messages) + view = _reference_messages(messages) - # System prompt, tool-call-only assistant turn, and tool result are gone. - assert all(m["role"] in ("user", "assistant") for m in trimmed) - assert all("tool_calls" not in m for m in trimmed) - # The advisory view must end on a user turn — a trailing assistant turn is - # treated by Anthropic as an assistant prefill (400 on no-prefill models). - # The only kept user turn here is the prompt, so the trailing assistant - # answer is stripped. - assert trimmed == [ - {"role": "user", "content": "do the thing"}, - ] + # Wire-format safety: only user/assistant text, no tool roles / tool_calls. + assert all(m["role"] in ("user", "assistant") for m in view) + assert all("tool_calls" not in m for m in view) + # System prompt is gone. + assert all("huge hermes system prompt" not in m["content"] for m in view) + # The agent's action and the tool result are PRESERVED as text. + joined = "\n".join(m["content"] for m in view) + assert "[called tool: f(" in joined + assert "[tool result: tool result]" in joined + assert "here is my answer" in joined + # Ends on a user turn (advisory request appended after the final assistant). + assert view[-1]["role"] == "user" def test_reference_messages_ends_with_user_not_assistant_prefill(): """Advisory reference views must never end on an assistant turn. - Mid-tool-loop, the last assistant turn carries interleaved reasoning text - plus tool calls; its text survives the trim while the following tool result - is dropped, leaving a trailing assistant turn. Anthropic (and - OpenRouter→Anthropic) treat that as an assistant prefill the model should - continue, and no-prefill models (e.g. Claude Opus 4.8) reject it with - ``400 ... must end with a user message``. The trim must drop trailing - assistant turns while preserving intervening ones. + Mid-tool-loop the conversation ends on an assistant/tool exchange. Anthropic + (and OpenRouter→Anthropic) treat a trailing assistant turn as an assistant + prefill to continue, and no-prefill models (e.g. Claude Opus 4.8) reject it + with ``400 ... must end with a user message``. We append a synthetic user + turn asking for judgement rather than DELETING the agent's latest context — + the reference must still see the current state to advise on it. """ from agent.moa_loop import _reference_messages @@ -267,20 +276,58 @@ def test_reference_messages_ends_with_user_not_assistant_prefill(): "content": "let me reason then call a tool", "tool_calls": [{"id": "c1", "function": {"name": "f", "arguments": "{}"}}], }, - {"role": "tool", "tool_call_id": "c1", "content": "tool result"}, + {"role": "tool", "tool_call_id": "c1", "content": "the tool output"}, ] - trimmed = _reference_messages(messages) + view = _reference_messages(messages) - assert trimmed, "advisory view should not be empty" - assert trimmed[-1]["role"] == "user" - # Intervening assistant context is preserved; only the trailing one drops. - assert trimmed == [ + assert view, "advisory view should not be empty" + assert view[-1]["role"] == "user" + joined = "\n".join(m["content"] for m in view) + # The agent's latest action and its result are preserved, not dropped. + assert "let me reason then call a tool" in joined + assert "[called tool: f(" in joined + assert "[tool result: the tool output]" in joined + # Earlier context preserved too. + assert "q1" in joined and "a1" in joined and "q2 current" in joined + + +def test_reference_messages_truncates_large_tool_results(): + """Large tool results are previewed head+tail, not replayed verbatim.""" + from agent.moa_loop import _REFERENCE_TOOL_RESULT_BUDGET, _reference_messages + + huge = "A" * (_REFERENCE_TOOL_RESULT_BUDGET * 3) + messages = [ + {"role": "user", "content": "q"}, + { + "role": "assistant", + "content": "", + "tool_calls": [{"id": "c1", "function": {"name": "f", "arguments": "{}"}}], + }, + {"role": "tool", "tool_call_id": "c1", "content": huge}, + ] + + view = _reference_messages(messages) + joined = "\n".join(m["content"] for m in view) + assert "chars omitted" in joined + # The folded result is far smaller than the raw payload. + assert len(joined) < len(huge) + + +def test_reference_messages_fresh_user_turn_ends_on_that_user(): + """A fresh user prompt with no agent action yet ends on that user turn.""" + from agent.moa_loop import _reference_messages + + messages = [ + {"role": "system", "content": "sys"}, {"role": "user", "content": "q1"}, {"role": "assistant", "content": "a1"}, {"role": "user", "content": "q2 current"}, ] + view = _reference_messages(messages) + assert view[-1] == {"role": "user", "content": "q2 current"} + def test_run_reference_prepends_advisory_system_prompt(monkeypatch): """Each reference call gets the advisory-role system prompt first. @@ -345,19 +392,31 @@ moa: messages=[ {"role": "system", "content": "system prompt"}, {"role": "user", "content": "question"}, - {"role": "tool", "tool_call_id": "x", "content": "leftover"}, + { + "role": "assistant", + "content": "checking", + "tool_calls": [{"id": "x", "function": {"name": "lookup", "arguments": "{}"}}], + }, + {"role": "tool", "tool_call_id": "x", "content": "tool output"}, ], tools=[{"type": "function"}], ) ref_call = next(c for c in calls if c["task"] == "moa_reference") - # Reference gets the advisory-role system prompt first, then user turns - # only — never the agent's own system prompt or tool-role messages. ref_msgs = ref_call["messages"] + # Advisory-role system prompt first; the agent's own system prompt is gone. assert ref_msgs[0]["role"] == "system" assert "reference advisor" in ref_msgs[0]["content"].lower() - assert "huge hermes system prompt" not in ref_msgs[0]["content"] - assert all(m["role"] == "user" for m in ref_msgs[1:]) + assert "system prompt" not in ref_msgs[0]["content"] + # No tool-role messages and no tool_calls arrays leak to the reference. + assert all(m["role"] in ("system", "user", "assistant") for m in ref_msgs) + assert all("tool_calls" not in m for m in ref_msgs) + # The agent's action + tool result ARE preserved, rendered as text. + joined = "\n".join(m["content"] for m in ref_msgs[1:]) + assert "[called tool: lookup(" in joined + assert "[tool result: tool output]" in joined + # Ends on a user turn (advisory request after the final assistant block). + assert ref_msgs[-1]["role"] == "user" assert ref_call.get("tools") in (None, []) # Aggregator still receives the original messages + tool schema. agg_call = next(c for c in calls if c["task"] == "moa_aggregator") @@ -506,12 +565,13 @@ def test_moa_facade_emits_reference_then_aggregating(monkeypatch, tmp_path): assert agg_events[0][1]["ref_count"] == 2 -def test_moa_facade_caches_references_within_a_turn(monkeypatch, tmp_path): - """References run + emit ONCE per user turn, not per tool-loop iteration. +def test_moa_facade_reruns_references_on_new_tool_result(monkeypatch, tmp_path): + """References re-run when a new tool result advances the task state. - The agent loop calls create() once per iteration; the advisory message - view is identical across iterations (tool/tool_call turns are stripped), - so re-running references would multiply their cost and re-spam the display. + The agent loop calls create() once per tool-loop iteration. References must + judge the LATEST state, so a new tool result is a cache MISS and re-runs the + references — but a redundant create() call with the SAME state is a cache + HIT (no re-run, no re-emit), so we don't fire on a pure no-op re-call. """ home = tmp_path / ".hermes" _ref_config(home) @@ -533,24 +593,22 @@ def test_moa_facade_caches_references_within_a_turn(monkeypatch, tmp_path): facade = MoAChatCompletions("review", reference_callback=lambda ev, **kw: events.append(ev)) base_msgs = [{"role": "user", "content": "do the thing"}] - # Iteration 1: model emits a tool call. + # Iteration 1: fresh user turn — references run (2 models). facade.create(messages=base_msgs, tools=[{"type": "function"}]) - # Iteration 2: same turn — a tool result was appended, but the advisory - # view (which strips tool turns) is unchanged, so references must be reused. - facade.create( - messages=base_msgs - + [ - {"role": "assistant", "content": "", "tool_calls": [{"id": "c1", "function": {"name": "f", "arguments": "{}"}}]}, - {"role": "tool", "tool_call_id": "c1", "content": "result"}, - ], - tools=[{"type": "function"}], - ) + after_tool = base_msgs + [ + {"role": "assistant", "content": "", "tool_calls": [{"id": "c1", "function": {"name": "f", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "result"}, + ] + # Iteration 2: a NEW tool result advanced the state → references re-run. + facade.create(messages=after_tool, tools=[{"type": "function"}]) + # Iteration 3: identical state (no new tool/user input) → cache hit, no re-run. + facade.create(messages=after_tool, tools=[{"type": "function"}]) - # 2 reference models, run once total (not once per iteration). - assert len(ref_runs) == 2 - # Reference blocks emitted once (2 reference events + 1 aggregating). - assert events.count("moa.reference") == 2 - assert events.count("moa.aggregating") == 1 + # 2 models × 2 distinct states (fresh turn + new tool result) = 4 runs. + # The redundant 3rd call adds none. + assert len(ref_runs) == 4 + assert events.count("moa.reference") == 4 + assert events.count("moa.aggregating") == 2 def test_moa_facade_reruns_references_on_new_turn(monkeypatch, tmp_path):