hermes-agent/agent/moa_loop.py
Teknium 1fa44180b0
fix(moa): advisory references end on a user turn + get a reference-role system prompt (#54007)
* fix(moa): reference advisory view must end with a user turn

MoA reference calls failed with Anthropic models that don't support
assistant prefill (e.g. Claude Opus 4.8): '400 ... must end with a user
message'. The advisory view built by _reference_messages() kept the last
assistant turn's text while dropping the following tool result, leaving a
trailing assistant turn — which Anthropic (and OpenRouter->Anthropic)
interpret as an assistant prefill to continue. References are advisory and
must end on the user turn they answer.

Strip trailing assistant turns from the advisory view (preserving
intervening ones). Update the existing test that encoded the buggy shape
and add a mid-tool-loop regression test.

* feat(moa): give reference models an advisory-role system prompt

Reference models received the bare trimmed conversation with no role
framing, so they assumed they were the acting agent and refused ("I can't
access repositories/URLs from here") or tried to call tools they don't have.

Prepend a dedicated advisory system prompt to every reference call: the
model is an analyst, not the actor — it cannot execute, should not
apologize for lacking tools, and should reason about the presented state to
advise the aggregator/orchestrator on approach, next steps, tool-use
strategy, risks, and anything the acting agent missed. Its output is private
guidance for the aggregator, not a user-facing answer.
2026-06-27 22:52:25 -07:00

490 lines
23 KiB
Python

"""Mixture-of-Agents runtime helpers for /moa turns.
The slash command is deliberately not a model tool. It marks one user turn as
MoA-enabled; the normal Hermes agent loop still owns tool calling and turn
termination, while this module gathers reference-model context before each model
iteration.
"""
from __future__ import annotations
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from agent.auxiliary_client import call_llm
from agent.transports import get_transport
logger = logging.getLogger(__name__)
# Upper bound on concurrent reference-model calls. References are independent
# advisory calls (no tools, no inter-dependence), so we fan them out the same
# way delegate_task runs a batch: all in flight at once, results collected when
# every reference finishes. Presets rarely list more than a handful of
# references; this cap just protects against a pathologically large preset
# opening dozens of sockets at once.
_MAX_REFERENCE_WORKERS = 8
# System prompt prepended to every reference-model call. References are
# advisory — they do NOT act, call tools, or own the task. Without this
# framing a reference receives the bare trimmed conversation and assumes it is
# the acting agent: it then refuses ("I can't access repositories / URLs from
# here") or tries to call tools it doesn't have. The prompt reframes the model
# as an analyst whose job is to reason about the presented state and hand its
# best thinking to the aggregator/orchestrator that will actually act.
_REFERENCE_SYSTEM_PROMPT = (
"You are a reference advisor in a Mixture of Agents (MoA) process. You are "
"NOT the acting agent and you do NOT execute anything: you cannot call "
"tools, run commands, browse, or access files, repositories, or URLs, and "
"you should not try to or apologize for being unable to. A separate "
"aggregator/orchestrator model holds those capabilities and will take the "
"actual actions.\n\n"
"The conversation below is the current state of a task handled by that "
"acting agent. Your job is to give your most intelligent analysis of that "
"state: understand the goal, reason about the problem, and advise on what "
"to do next. Surface the best approach, concrete next steps and tool-use "
"strategy, likely pitfalls and risks, and anything the acting agent may "
"have missed or gotten wrong. Assume any referenced files, URLs, or "
"systems exist and reason about them from the context given rather than "
"asking for access.\n\n"
"Respond with your advice directly — no preamble, no disclaimers about "
"tools or access. Your response is private guidance handed to the "
"aggregator, not an answer shown to the user."
)
def _slot_label(slot: dict[str, str]) -> str:
return f"{slot.get('provider', '').strip()}:{slot.get('model', '').strip()}"
def _slot_runtime(slot: dict[str, str]) -> dict[str, Any]:
"""Resolve a reference/aggregator slot to real runtime call kwargs.
A MoA slot is just a model selection — it must be called the same way any
model is called elsewhere, not through a bare ``call_llm(provider=...,
model=...)`` that leaves base_url/api_key/api_mode unresolved and lets the
auxiliary auto-detector guess. We route the slot's provider through
``resolve_runtime_provider`` (the canonical provider→api_mode/base_url/
api_key resolver the CLI, gateway, and delegate_task all use), so the slot
gets its provider's real API surface — e.g. MiniMax → anthropic_messages,
GPT-5/o-series → max_completion_tokens, custom endpoints → their base_url.
Returns the kwargs to pass through to ``call_llm`` (provider/model plus the
resolved base_url/api_key when available). Falls back to the bare
provider/model on any resolution error so a misconfigured slot still
attempts the call rather than aborting the whole MoA turn.
"""
provider = str(slot.get("provider") or "").strip()
model = str(slot.get("model") or "").strip()
out: dict[str, Any] = {"provider": provider, "model": model}
try:
from hermes_cli.runtime_provider import resolve_runtime_provider
rt = resolve_runtime_provider(requested=provider, target_model=model)
resolved_provider = str(rt.get("provider") or provider).strip().lower()
# call_llm treats an explicit base_url as a custom endpoint. That is
# correct for ordinary OpenAI-compatible targets, but wrong for OAuth /
# adapter-backed providers whose provider branch adds auth headers and
# request-shape adapters. Keep those providers identified by name.
if resolved_provider in {"openai-codex", "xai-oauth"}:
return out
# Pass the resolved endpoint through so call_llm builds the request for
# the provider's actual API surface instead of auto-detecting. base_url
# routes call_llm to the right adapter (incl. anthropic_messages mode);
# api_key is the resolved credential for that provider.
if rt.get("base_url"):
out["base_url"] = rt["base_url"]
if rt.get("api_key"):
out["api_key"] = rt["api_key"]
except Exception as exc: # pragma: no cover - defensive
logger.debug("MoA slot runtime resolution failed for %s: %s", _slot_label(slot), exc)
return out
def _run_reference(
slot: dict[str, str],
ref_messages: list[dict[str, Any]],
*,
temperature: float | None = None,
max_tokens: int | None = None,
) -> tuple[str, str]:
"""Call one reference model and return ``(label, text)``.
The slot is resolved to its provider's real runtime (via ``_slot_runtime``)
and called through the same ``call_llm`` request-building path any model
uses, so per-model wire-format handling (anthropic_messages,
max_completion_tokens, fixed/forbidden temperature) applies identically to
a reference as it would if that model were the acting model. MoA imposes no
cap of its own (``max_tokens`` defaults to ``None`` → omitted → the model's
real maximum); ``temperature`` is only the user's configured preset value,
which call_llm may still override per model.
Never raises: a failed reference becomes a labelled note so the aggregator
can still act with partial context. Designed to run inside a thread pool —
``call_llm`` is synchronous/blocking, so threads (not asyncio) are the right
concurrency primitive, mirroring ``delegate_task``'s batch fan-out.
"""
label = _slot_label(slot)
try:
# Prepend the advisory-role system prompt so the reference understands
# it is analyzing state for an aggregator, not acting on the task. The
# trimmed view (_reference_messages) already strips the agent's own
# system prompt, so this is the only system message the reference sees.
messages = [{"role": "system", "content": _REFERENCE_SYSTEM_PROMPT}, *ref_messages]
response = call_llm(
task="moa_reference",
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
**_slot_runtime(slot),
)
return label, _extract_text(response) or "(empty response)"
except Exception as exc:
logger.warning("MoA reference model %s failed: %s", label, exc)
return label, f"[failed: {exc}]"
def _run_references_parallel(
reference_models: list[dict[str, str]],
ref_messages: list[dict[str, Any]],
*,
temperature: float | None = None,
max_tokens: int | None = None,
) -> list[tuple[str, str]]:
"""Fan out all reference models in parallel, returning outputs in order.
Like ``delegate_task``'s batch mode, every reference is dispatched at once
and we block until all of them finish before handing the joined results to
the aggregator. Output order matches ``reference_models`` so the
``Reference {idx}`` labelling stays stable. MoA presets that reference
another MoA preset are skipped here (recursion guard) with a labelled note.
"""
if not reference_models:
return []
results: list[tuple[str, str] | None] = [None] * len(reference_models)
futures = {}
workers = min(_MAX_REFERENCE_WORKERS, len(reference_models))
with ThreadPoolExecutor(max_workers=workers) as executor:
for idx, slot in enumerate(reference_models):
if slot.get("provider") == "moa":
results[idx] = (
_slot_label(slot),
"[skipped: MoA presets cannot recursively reference MoA]",
)
continue
futures[
executor.submit(
_run_reference,
slot,
ref_messages,
temperature=temperature,
max_tokens=max_tokens,
)
] = idx
# Collect every reference before returning — the aggregator needs the
# complete set, so there is no early-exit / first-completed path here.
for future, idx in futures.items():
results[idx] = future.result()
return [r for r in results if r is not None]
def _reference_messages(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Build an advisory-safe view of the conversation for reference models.
Reference calls are advisory: they never call tools and never emit the
``tool_calls`` the main model did. Replaying the full transcript verbatim
(a) re-bills the ~8K-token Hermes system prompt per reference per
iteration and (b) risks 400s from strict providers (Mistral, Fireworks)
that reject orphan ``tool`` messages or ``tool_calls`` the reference never
produced. We keep only the user/assistant *text* turns, dropping the
system prompt, any ``tool``-role messages, and any ``tool_calls`` payloads.
The trimmed view MUST end with a ``user`` turn. An advisory reference call
answers the latest user input; it must never end with an ``assistant``
turn, which Anthropic (and OpenRouter→Anthropic) interpret as an assistant
*prefill* the model should continue — some models (e.g. Claude Opus 4.8)
reject prefill outright with ``400 ... must end with a user message``. This
is the common mid-tool-loop case: the last assistant turn carried
interleaved reasoning text plus tool calls, so its text survives the trim
while the following ``tool`` result is dropped, leaving a trailing
assistant turn. We strip any trailing assistant turns so the reference sees
a user message last.
"""
trimmed: list[dict[str, Any]] = []
for msg in messages:
role = msg.get("role")
if role not in ("user", "assistant"):
# Drop system prompt and tool-result messages.
continue
content = msg.get("content")
if not isinstance(content, str):
# Skip non-text (multimodal/tool-call-only) assistant turns.
if not content:
continue
text = content if isinstance(content, str) else ""
if role == "assistant" and not text.strip():
# Assistant turn that was purely tool calls — nothing advisory.
continue
trimmed.append({"role": role, "content": text})
# Advisory calls must end on a user turn (no assistant prefill). Drop any
# trailing assistant turns left by the tool-loop trim above.
while trimmed and trimmed[-1].get("role") == "assistant":
trimmed.pop()
if not trimmed:
# Degenerate case (e.g. first turn was stripped, or the view trimmed
# down to assistant-only): fall back to the latest user turn so the
# reference still has something to answer.
for msg in reversed(messages):
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
return [{"role": "user", "content": msg["content"]}]
return trimmed
def _extract_text(response: Any) -> str:
try:
transport = get_transport("chat_completions")
if transport is None:
raise RuntimeError("chat_completions transport unavailable")
normalized = transport.normalize_response(response)
text = (normalized.content or "").strip()
if text:
return text
except Exception:
pass
try:
content = response.choices[0].message.content
return (content or "").strip()
except Exception:
return ""
def aggregate_moa_context(
*,
user_prompt: str,
api_messages: list[dict[str, Any]],
reference_models: list[dict[str, str]],
aggregator: dict[str, str],
temperature: float = 0.6,
aggregator_temperature: float = 0.4,
max_tokens: int | None = None,
) -> str:
"""Run configured reference models and synthesize their advice.
Failures are returned as model-specific notes instead of aborting the normal
agent loop; the main model can still act with partial context.
``max_tokens`` is ``None`` by default: MoA does not cap reference or
aggregator output, so each model uses its own maximum. ``call_llm`` omits
the parameter entirely when it is ``None`` (see its docstring), which also
sidesteps providers that reject ``max_tokens`` outright. A hardcoded cap
here previously truncated long aggregator syntheses.
"""
reference_outputs: list[tuple[str, str]] = []
ref_messages = _reference_messages(api_messages)
reference_outputs = _run_references_parallel(
reference_models,
ref_messages,
temperature=temperature,
max_tokens=max_tokens,
)
joined = "\n\n".join(
f"Reference {idx}{label}:\n{text}"
for idx, (label, text) in enumerate(reference_outputs, start=1)
)
synth_prompt = (
"You are the aggregator in a Mixture of Agents process. Synthesize the "
"reference responses into concise, actionable guidance for the main "
"Hermes agent. Focus on next steps, tool-use strategy, risks, and any "
"disagreements. Do not answer the user directly unless that is all that "
"is needed; produce context the main agent should use in its normal loop.\n\n"
f"Original user prompt:\n{user_prompt}\n\n"
f"Reference responses:\n{joined}"
)
agg_label = _slot_label(aggregator)
try:
response = call_llm(
task="moa_aggregator",
messages=[{"role": "user", "content": synth_prompt}],
temperature=aggregator_temperature,
max_tokens=max_tokens,
**_slot_runtime(aggregator),
)
synthesis = _extract_text(response)
except Exception as exc:
logger.warning("MoA aggregator model %s failed: %s", agg_label, exc)
synthesis = ""
if not synthesis:
synthesis = joined
return (
"[Mixture of Agents context — use this as private guidance for the "
"normal Hermes agent loop. You may call tools, continue reasoning, or "
"finish normally.]\n"
f"Aggregator: {agg_label}\n"
f"References: {', '.join(_slot_label(slot) for slot in reference_models)}\n\n"
f"{synthesis.strip()}"
)
class MoAChatCompletions:
"""OpenAI-chat-compatible facade where the aggregator is the acting model."""
def __init__(self, preset_name: str, reference_callback: Any = None):
self.preset_name = preset_name or "default"
# Optional display hook. Called as reference outputs become available so
# frontends can show each reference model's answer as a labelled block
# before the aggregator acts. Signature:
# reference_callback(event, **kwargs)
# where event is one of:
# "moa.reference" kwargs: index, count, label, text
# "moa.aggregating" kwargs: aggregator (label), ref_count
# Never raises into the model call — display is best-effort.
self.reference_callback = reference_callback
# Turn-scoped reference cache. The agent loop calls create() once per
# tool-loop iteration, but references are advisory for the whole turn:
# the advisory message view (_reference_messages) is identical across
# iterations (it strips tool/tool_call turns) until a new user message
# arrives. Re-running references every iteration would multiply their
# API cost by the tool-loop depth AND re-emit the same blocks to the
# display on every iteration. So cache outputs keyed by the advisory
# view's signature and reuse them — running and showing references once
# per user turn.
self._ref_cache_key: tuple | None = None
self._ref_cache_outputs: list[tuple[str, str]] = []
def _emit(self, event: str, **kwargs: Any) -> None:
cb = self.reference_callback
if cb is None:
return
try:
cb(event, **kwargs)
except Exception as exc: # pragma: no cover - display must never break the turn
logger.debug("MoA reference_callback failed for %s: %s", event, exc)
def create(self, **api_kwargs: Any) -> Any:
from hermes_cli.config import load_config
from hermes_cli.moa_config import resolve_moa_preset
preset = resolve_moa_preset(load_config().get("moa") or {}, self.preset_name)
messages = list(api_kwargs.get("messages") or [])
reference_models = preset.get("reference_models") or []
aggregator = preset.get("aggregator") or {}
# MoA does not cap reference or aggregator output: each model uses its
# own maximum. Passing max_tokens=None makes call_llm omit the parameter
# (it never caps by default), so a long aggregator synthesis is never
# truncated and providers that reject max_tokens don't 400.
temperature = float(preset.get("reference_temperature", 0.6) or 0.6)
aggregator_temperature = float(preset.get("aggregator_temperature", api_kwargs.get("temperature") or 0.4) or 0.4)
# When the preset is disabled, skip the reference fan-out and let the
# configured aggregator act alone — it is the preset's acting model, so
# a disabled MoA preset is simply "use the aggregator directly."
if not preset.get("enabled", True):
reference_models = []
reference_outputs: list[tuple[str, str]] = []
ref_messages = _reference_messages(messages)
# Turn-scoped cache: only run + display references when the advisory
# view changed (i.e. a new user turn). Within one turn the agent loop
# calls create() once per tool iteration with the same advisory view;
# reuse the cached outputs and skip both the re-run and the re-emit.
_sig = hashlib.sha256(
"\u0000".join(
f"{m.get('role')}:{m.get('content')}" for m in ref_messages
).encode("utf-8", "replace")
).hexdigest()
_cache_key = (self.preset_name, _sig, tuple(_slot_label(s) for s in reference_models))
_refs_from_cache = _cache_key == self._ref_cache_key and bool(self._ref_cache_outputs)
if _refs_from_cache:
reference_outputs = list(self._ref_cache_outputs)
else:
reference_outputs = _run_references_parallel(
reference_models,
ref_messages,
temperature=temperature,
max_tokens=None,
)
self._ref_cache_key = _cache_key
self._ref_cache_outputs = list(reference_outputs)
# Surface each reference model's answer to the display BEFORE the
# aggregator acts — once per turn (only on the iteration that
# actually ran them). The user sees one labelled block per
# reference (rendered like a thinking block) so the MoA process is
# visible rather than a silent pause. Best-effort: never blocks the
# turn.
_ref_count = len(reference_outputs)
for _idx, (_label, _text) in enumerate(reference_outputs, start=1):
self._emit(
"moa.reference",
index=_idx,
count=_ref_count,
label=_label,
text=_text,
)
if _ref_count:
self._emit(
"moa.aggregating",
aggregator=_slot_label(aggregator),
ref_count=_ref_count,
)
agg_messages = [dict(m) for m in messages]
if reference_outputs:
joined = "\n\n".join(
f"Reference {idx}{label}:\n{text}"
for idx, (label, text) in enumerate(reference_outputs, start=1)
)
guidance = (
"[Mixture of Agents reference context]\n"
f"Preset: {self.preset_name}\n"
f"Aggregator/acting model: {_slot_label(aggregator)}\n"
f"References: {', '.join(label for label, _ in reference_outputs)}\n\n"
"Use the reference responses below as private context. You are the aggregator and acting model: "
"answer the user directly or call tools as needed.\n\n"
f"{joined}"
)
for msg in reversed(agg_messages):
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
msg["content"] = msg["content"] + "\n\n" + guidance
break
else:
agg_messages.append({"role": "user", "content": guidance})
if aggregator.get("provider") == "moa":
raise RuntimeError("MoA aggregator cannot be another MoA preset")
agg_kwargs = dict(api_kwargs)
agg_kwargs["messages"] = agg_messages
# The aggregator is the acting model. Resolve its slot to the provider's
# real runtime (base_url/api_key/api_mode) and call it through the same
# request-building path any model uses — so per-model wire-format
# handling (anthropic_messages, max_completion_tokens, fixed/forbidden
# temperature) applies identically to it. MoA imposes no output cap:
# max_tokens is passed through from the caller (normally None → omitted
# → the model's real maximum). The preset's old hardcoded 4096 default
# is gone — it truncated long syntheses.
return call_llm(
task="moa_aggregator",
messages=agg_messages,
temperature=aggregator_temperature,
max_tokens=agg_kwargs.get("max_tokens"),
tools=agg_kwargs.get("tools"),
extra_body=agg_kwargs.get("extra_body"),
**_slot_runtime(aggregator),
)
class MoAClient:
def __init__(self, preset_name: str, reference_callback: Any = None):
self.chat = type("_MoAChat", (), {})()
self.chat.completions = MoAChatCompletions(preset_name, reference_callback=reference_callback)