hermes-agent/tests/plugins/test_langfuse_plugin.py
kshitij db84a78e61
fix(langfuse): complete observability fix — trace I/O, tool outputs, placeholder credentials (closes #22342, #22763) (#26320)
* fix(langfuse): reject placeholder credentials with one-shot warning

When operators leave HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY
at a template value like 'placeholder', 'test-key', or 'your-langfuse-key',
the Langfuse SDK silently accepts the credentials at construction time and
drops every trace at flush time. No warning, no error — just an empty
Langfuse dashboard the operator only notices hours later.

Add prefix-based validation in _get_langfuse() against the documented
'pk-lf-' / 'sk-lf-' prefixes that Langfuse always issues server-side.
Anything else fires a single warning naming the offending env var(s)
with a log-safe value preview (full string for short placeholders so the
operator knows which template they left in place; truncated for long
values so a real secret pasted into the wrong field never hits the log),
then short-circuits via the existing _INIT_FAILED cache so the warning
fires once per process, not once per hook invocation.

The check sits after the 'Langfuse is None' SDK-installed guard so hosts
without the optional langfuse SDK don't see misleading 'set real keys'
hints when the actionable fix is 'pip install langfuse'. Missing
credentials remains the documented opt-out path and stays silent — no
log noise for unconfigured installs.

Fixes #22763
Fixes #23823

* fix(langfuse): use actual API request messages for generation input

on_pre_llm_request previously used the messages kwarg alone, which
could be None when Hermes passes the payload via request_messages,
conversation_history, or user_message instead. Add _coerce_request_messages
to pick the first available list across all variants, falling back to a
synthetic user message. Generations now show the real outbound payload
rather than an empty input.

* fix(langfuse): record tool call outputs in traces

Tool observations showed input (arguments) but output was always
undefined. Root cause: when tool_call_id is empty, pre_tool_call stored
observations under a unique time-based key that post_tool_call could
never reconstruct, so every tool span was closed without output by the
_finish_trace sweep.

Fix pre/post matching by routing empty-tool_call_id tools through a
per-name FIFO queue (pending_tools_by_name) instead of the time-based
key. Tools with a tool_call_id continue to use the id-keyed dict.

Also:
 - Preserve OpenAI-style nested function shape in serialized tool calls
   so Langfuse renders name/arguments correctly
 - Keep name + tool_call_id on role:tool messages for proper pairing
 - Backfill tool results onto the matching turn_tool_calls entry so the
   generation's tool-call record carries the result alongside arguments
 - Coerce request messages from whichever field the runtime provides
   (request_messages, messages, conversation_history, user_message)

* fix(langfuse): salvage-review polish — drop dead is_first_turn, shallow-copy request_messages, real threaded FIFO test

Self-review of the combined #22345 + #23831 salvage surfaced three issues
worth fixing in the same PR rather than as follow-ups:

1. Drop is_first_turn from the pre_api_request hook. The boolean expression
   `not bool(conversation_history)` was wrong: conversation_history is
   reassigned to None mid-run after compression (5 sites in run_agent.py),
   so the value flips False -> True mid-conversation on every post-compression
   API call. The langfuse plugin never consumed it, so the kwarg was both
   misleading AND dead.

2. Replace copy.deepcopy(request_messages) with shallow list() copy. The
   pre_api_request hook contract discards return values (invoke_hook never
   writes back to api_kwargs), and the langfuse plugin's _serialize_messages
   already builds its own snapshot dicts via _safe_value. A deepcopy on every
   API call would walk every tool result and base64 image — significant
   overhead for no real isolation benefit. Shallow copy of the outer list
   protects against later mutations of api_messages without paying for the
   inner-dict walk.

3. Rename test_empty_tool_call_id_concurrent_fifo_order ->
   test_empty_tool_call_id_observations_are_fifo_within_tool_name and add a
   real test_threaded_post_calls_preserve_fifo_under_lock that spawns 8
   threads behind a barrier to actually exercise _STATE_LOCK on the
   pending_tools_by_name queue. The original test was sequential and only
   validated Python list semantics; this one validates the lock discipline.

4. Fix stale 'Cleared by reset_cache_for_tests()' comment on _INIT_FAILED —
   that function does not exist. Tests reload the module via sys.modules.pop
   + importlib.import_module instead.

Tests: 37 langfuse plugin tests pass, 658 plugin tests overall pass.

---------

Co-authored-by: xxxigm <tuancanhnguyen706@gmail.com>
Co-authored-by: Brian Conklin <brian@dralth.com>
2026-05-15 05:04:02 -07:00

706 lines
30 KiB
Python

"""Tests for the bundled observability/langfuse plugin."""
from __future__ import annotations
import importlib
import logging
import sys
from pathlib import Path
import pytest
import yaml
REPO_ROOT = Path(__file__).resolve().parents[2]
PLUGIN_DIR = REPO_ROOT / "plugins" / "observability" / "langfuse"
# ---------------------------------------------------------------------------
# Manifest + layout
# ---------------------------------------------------------------------------
class TestManifest:
def test_plugin_directory_exists(self):
assert PLUGIN_DIR.is_dir()
assert (PLUGIN_DIR / "plugin.yaml").exists()
assert (PLUGIN_DIR / "__init__.py").exists()
def test_manifest_fields(self):
data = yaml.safe_load((PLUGIN_DIR / "plugin.yaml").read_text())
assert data["name"] == "langfuse"
assert data["version"]
# All six hooks the plugin implements.
assert set(data["hooks"]) == {
"pre_api_request", "post_api_request",
"pre_llm_call", "post_llm_call",
"pre_tool_call", "post_tool_call",
}
# Required env vars are the user-facing HERMES_ prefixed keys.
assert "HERMES_LANGFUSE_PUBLIC_KEY" in data["requires_env"]
assert "HERMES_LANGFUSE_SECRET_KEY" in data["requires_env"]
# ---------------------------------------------------------------------------
# Plugin discovery: langfuse is opt-in (not loaded unless explicitly enabled).
# This guards against someone accidentally re-introducing a per-hook
# load_config() gate or making the plugin auto-load.
# ---------------------------------------------------------------------------
class TestDiscovery:
def test_plugin_is_discovered_as_standalone_opt_in(self, tmp_path, monkeypatch):
"""Scanner should find the plugin but NOT load it by default."""
from hermes_cli import plugins as plugins_mod
# Isolated HERMES_HOME so we don't read the developer's config.yaml.
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
manager = plugins_mod.PluginManager()
manager.discover_and_load()
# observability/langfuse appears in the plugin registry …
loaded = manager._plugins.get("observability/langfuse")
assert loaded is not None, "plugin not discovered"
# … but is not loaded (opt-in default → no config.yaml means nothing enabled)
assert loaded.enabled is False
assert "not enabled" in (loaded.error or "").lower()
# ---------------------------------------------------------------------------
# Runtime gate: _get_langfuse() returns None and caches _INIT_FAILED when
# credentials are missing. Guards against regressing toward the rejected
# per-hook load_config() design.
# ---------------------------------------------------------------------------
class TestRuntimeGate:
def _fresh_plugin(self):
"""Import the plugin module fresh (clears any cached client)."""
mod_name = "plugins.observability.langfuse"
sys.modules.pop(mod_name, None)
return importlib.import_module(mod_name)
def test_get_langfuse_returns_none_without_credentials(self, monkeypatch):
for k in (
"HERMES_LANGFUSE_PUBLIC_KEY", "HERMES_LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY",
):
monkeypatch.delenv(k, raising=False)
langfuse_plugin = self._fresh_plugin()
assert langfuse_plugin._get_langfuse() is None
def test_get_langfuse_caches_failure_no_config_load(self, monkeypatch):
"""A miss must be cached — no per-hook config.yaml reads, no env re-reads."""
for k in (
"HERMES_LANGFUSE_PUBLIC_KEY", "HERMES_LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY",
):
monkeypatch.delenv(k, raising=False)
langfuse_plugin = self._fresh_plugin()
# Prime the cache with one call.
assert langfuse_plugin._get_langfuse() is None
# Now block os.environ.get — a correctly-cached plugin must not
# touch env again.
import os
called = {"n": 0}
real_get = os.environ.get
def tracking_get(key, default=None):
if key.startswith(("HERMES_LANGFUSE_", "LANGFUSE_")):
called["n"] += 1
return real_get(key, default)
monkeypatch.setattr(os.environ, "get", tracking_get)
for _ in range(20):
assert langfuse_plugin._get_langfuse() is None
assert called["n"] == 0, (
f"_get_langfuse() re-read env {called['n']} times after cache miss — "
"it should short-circuit via _INIT_FAILED"
)
def test_get_langfuse_does_not_import_hermes_config(self, monkeypatch):
"""The plugin must not re-read config.yaml per hook."""
for k in (
"HERMES_LANGFUSE_PUBLIC_KEY", "HERMES_LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY",
):
monkeypatch.delenv(k, raising=False)
# Drop any cached import of hermes_cli.config.
sys.modules.pop("hermes_cli.config", None)
langfuse_plugin = self._fresh_plugin()
for _ in range(20):
langfuse_plugin._get_langfuse()
assert "hermes_cli.config" not in sys.modules, (
"langfuse plugin imported hermes_cli.config — regression toward "
"the rejected per-hook load_config() design"
)
# ---------------------------------------------------------------------------
# Hooks are inert when the client is unavailable.
# ---------------------------------------------------------------------------
class TestHooksInert:
def test_hooks_noop_without_client(self, monkeypatch):
"""All 6 hooks must return without raising when _get_langfuse() is None."""
for k in (
"HERMES_LANGFUSE_PUBLIC_KEY", "HERMES_LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY",
):
monkeypatch.delenv(k, raising=False)
sys.modules.pop("plugins.observability.langfuse", None)
import importlib
mod = importlib.import_module("plugins.observability.langfuse")
# Each hook should just return; no exceptions.
mod.on_pre_llm_call(task_id="t", session_id="s", messages=[{"role": "user", "content": "hi"}])
mod.on_pre_llm_request(task_id="t", session_id="s", api_call_count=1, request_messages=[])
mod.on_post_llm_call(task_id="t", session_id="s", api_call_count=1)
mod.on_pre_tool_call(tool_name="read_file", args={}, task_id="t", session_id="s")
mod.on_post_tool_call(tool_name="read_file", args={}, result="ok", task_id="t", session_id="s")
# ---------------------------------------------------------------------------
# Placeholder-credential guard (#23823).
#
# Regression coverage for the silent-failure bug: when an operator leaves
# HERMES_LANGFUSE_PUBLIC_KEY / SECRET_KEY at a template value like
# "placeholder", "test-key", or "your-langfuse-key", the SDK accepts the
# credentials at construction time (it does no server-side validation
# eagerly) but drops every trace at flush time, with no signal in the
# Hermes logs. The fix in `_get_langfuse()` validates the documented
# `pk-lf-` / `sk-lf-` prefix Langfuse always issues, surfaces a one-shot
# warning naming the offending env var(s), and short-circuits via the
# same `_INIT_FAILED` path used for missing credentials so subsequent
# hook invocations don't re-log.
# ---------------------------------------------------------------------------
class _FakeLangfuse:
"""Stand-in for the real :class:`langfuse.Langfuse` so tests don't
need the optional ``langfuse`` SDK installed. The plugin's runtime
gate refuses to proceed past ``if Langfuse is None`` when the SDK
is missing, which would short-circuit before the placeholder check
can fire. Patching ``plugin.Langfuse`` with this class lets the
placeholder validator exercise its full code path."""
instances: list["_FakeLangfuse"] = []
def __init__(self, **kwargs):
self.kwargs = kwargs
_FakeLangfuse.instances.append(self)
class TestPlaceholderKeyDetection:
LOGGER_NAME = "plugins.observability.langfuse"
def _fresh_plugin(self, monkeypatch=None):
mod_name = "plugins.observability.langfuse"
sys.modules.pop(mod_name, None)
mod = importlib.import_module(mod_name)
if monkeypatch is not None:
# Pretend the SDK is installed so `_get_langfuse()` actually
# reaches the placeholder check. Real SDK calls are never
# made because the placeholder/missing-credentials paths
# return before constructing a client.
_FakeLangfuse.instances.clear()
monkeypatch.setattr(mod, "Langfuse", _FakeLangfuse, raising=False)
return mod
@staticmethod
def _clear_env(monkeypatch):
for k in (
"HERMES_LANGFUSE_PUBLIC_KEY", "HERMES_LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY",
):
monkeypatch.delenv(k, raising=False)
# -- helper unit tests (no SDK stub needed: these don't go through
# _get_langfuse, they exercise the pure-Python helpers directly) ------
def test_redact_key_preview_empty(self, monkeypatch):
self._clear_env(monkeypatch)
plugin = self._fresh_plugin()
assert plugin._redact_key_preview("") == "<empty>"
def test_redact_key_preview_short_value_echoed(self, monkeypatch):
"""Short placeholder strings are echoed in full so the operator
can see exactly which template they forgot to replace."""
self._clear_env(monkeypatch)
plugin = self._fresh_plugin()
assert plugin._redact_key_preview("placeholder") == "'placeholder'"
assert plugin._redact_key_preview("test-key") == "'test-key'"
def test_redact_key_preview_long_value_truncated(self, monkeypatch):
"""If an operator pasted a real secret into the wrong env var the
preview must NOT echo it in full — only the leading 6 chars."""
self._clear_env(monkeypatch)
plugin = self._fresh_plugin()
result = plugin._redact_key_preview("sk-lf-abcdefghijklmnop")
assert "abcdefghij" not in result
assert result.startswith("'sk-lf-")
assert result.endswith("...'")
def test_validate_langfuse_key_accepts_documented_prefix(self, monkeypatch):
self._clear_env(monkeypatch)
plugin = self._fresh_plugin()
assert plugin._validate_langfuse_key(
"HERMES_LANGFUSE_PUBLIC_KEY", "pk-lf-real-public-xyz"
) is None
assert plugin._validate_langfuse_key(
"HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz"
) is None
def test_validate_langfuse_key_rejects_wrong_prefix(self, monkeypatch):
self._clear_env(monkeypatch)
plugin = self._fresh_plugin()
msg = plugin._validate_langfuse_key(
"HERMES_LANGFUSE_PUBLIC_KEY", "placeholder"
)
assert msg is not None
assert "HERMES_LANGFUSE_PUBLIC_KEY" in msg
assert "pk-lf-" in msg
def test_validate_langfuse_key_unknown_name_passes(self, monkeypatch):
"""Defensive: an env var with no registered prefix is trusted."""
self._clear_env(monkeypatch)
plugin = self._fresh_plugin()
assert plugin._validate_langfuse_key("HERMES_LANGFUSE_BASE_URL", "anything") is None
# -- end-to-end _get_langfuse() behaviour --------------------------------
# These tests pass `monkeypatch` to _fresh_plugin() so the helper can
# stub out `Langfuse` (the optional SDK). Without that, every call
# short-circuits at `if Langfuse is None` before reaching the
# placeholder validator — masking the very behaviour we're testing.
def test_placeholder_public_key_warns_and_skips(self, monkeypatch, caplog):
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder")
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
text = caplog.text
assert "HERMES_LANGFUSE_PUBLIC_KEY" in text
assert "'placeholder'" in text
assert "pk-lf-" in text
# The valid secret value must NOT appear (the var NAME does, in
# the "or unset ..." hint, but the value preview shouldn't).
assert "'sk-lf-" not in text
# Never constructed the SDK client — short-circuited before that.
assert _FakeLangfuse.instances == []
def test_placeholder_secret_key_warns_and_skips(self, monkeypatch, caplog):
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "pk-lf-real-public-xyz")
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "test-key")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
text = caplog.text
assert "HERMES_LANGFUSE_SECRET_KEY" in text
assert "'test-key'" in text
assert "sk-lf-" in text
# The valid public value must NOT appear.
assert "'pk-lf-" not in text
assert _FakeLangfuse.instances == []
def test_both_placeholders_one_warning_with_both_keys(self, monkeypatch, caplog):
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder")
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "placeholder")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
warnings = [r for r in caplog.records if r.levelname == "WARNING"
and r.name == self.LOGGER_NAME]
assert len(warnings) == 1, (
f"Expected a single combined warning; got {len(warnings)}:\n"
+ "\n".join(r.getMessage() for r in warnings)
)
text = warnings[0].getMessage()
assert "HERMES_LANGFUSE_PUBLIC_KEY" in text
assert "HERMES_LANGFUSE_SECRET_KEY" in text
def test_repeated_calls_do_not_re_warn(self, monkeypatch, caplog):
"""The cached ``_INIT_FAILED`` sentinel must short-circuit
subsequent calls so each hook invocation isn't a fresh log
line — otherwise a busy gateway will spam the operator's
terminal."""
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder")
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "placeholder")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
for _ in range(15):
assert plugin._get_langfuse() is None
warnings = [r for r in caplog.records if r.levelname == "WARNING"
and r.name == self.LOGGER_NAME]
assert len(warnings) == 1, (
f"Warning fired {len(warnings)} times across 15 calls; "
"expected 1 (cached via _INIT_FAILED)"
)
@pytest.mark.parametrize("placeholder", [
"placeholder",
"test-key",
"your-langfuse-key",
"change-me",
"xxx",
"dummy-key-here",
"<your-key>",
"REPLACE_ME",
])
def test_common_placeholders_detected(self, monkeypatch, caplog, placeholder):
"""A grab-bag of values that real-world ``.env.example`` templates
use as stand-ins. Any of them in either key must trip the guard."""
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", placeholder)
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
assert "HERMES_LANGFUSE_PUBLIC_KEY" in caplog.text
def test_legacy_LANGFUSE_PUBLIC_KEY_also_validated(self, monkeypatch, caplog):
"""The plugin reads both the canonical HERMES_-prefixed env var and
the legacy bare ``LANGFUSE_PUBLIC_KEY``. The validator must run on
whichever value ``_get_langfuse()`` actually consumed."""
self._clear_env(monkeypatch)
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "placeholder")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
# Warning names the canonical user-facing env var (the bare
# LANGFUSE_PUBLIC_KEY is a backwards-compat alias for the
# HERMES_-prefixed one — operators set the HERMES_-prefixed one).
assert "HERMES_LANGFUSE_PUBLIC_KEY" in caplog.text
assert "'placeholder'" in caplog.text
def test_missing_credentials_still_skip_silently(self, monkeypatch, caplog):
"""Missing-creds is the documented opt-out path (operator hasn't
configured the plugin yet) — it must remain SILENT. Regression
guard against the placeholder validator accidentally running on
empty values and re-introducing log noise for unconfigured
installs."""
self._clear_env(monkeypatch)
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
warnings = [r for r in caplog.records if r.levelname == "WARNING"
and r.name == self.LOGGER_NAME]
assert warnings == []
def test_sdk_not_installed_still_skips_silently(self, monkeypatch, caplog):
"""If the langfuse SDK isn't installed at all, the placeholder
check should never run — there's nothing the operator can do
about a credential mismatch when the package is missing, and
re-warning here would dilute the actually-actionable SDK-missing
signal upstream. The ``Langfuse is None`` guard at the top of
``_get_langfuse`` already handles this; this test pins that
behaviour."""
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder")
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "placeholder")
# NO monkeypatch on Langfuse here — falls back to whatever the
# plugin imported at module load (None if SDK absent).
plugin = self._fresh_plugin()
monkeypatch.setattr(plugin, "Langfuse", None, raising=False)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
assert plugin._get_langfuse() is None
warnings = [r for r in caplog.records if r.levelname == "WARNING"
and r.name == self.LOGGER_NAME]
assert warnings == []
def test_valid_prefixes_do_not_trigger_placeholder_warning(self, monkeypatch, caplog):
"""Real Langfuse keys (``pk-lf-…`` / ``sk-lf-…``) must pass the
guard and proceed to SDK init. We stub the SDK constructor with
a recording fake so the assertion can confirm BOTH that the
placeholder warning didn't fire AND that the client was actually
constructed — the latter is the success signal the bug report
wanted."""
self._clear_env(monkeypatch)
monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "pk-lf-real-public-xyz")
monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz")
plugin = self._fresh_plugin(monkeypatch)
with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME):
client = plugin._get_langfuse()
assert isinstance(client, _FakeLangfuse)
assert client.kwargs["public_key"] == "pk-lf-real-public-xyz"
assert client.kwargs["secret_key"] == "sk-lf-real-secret-xyz"
assert "placeholders" not in caplog.text.lower(), (
f"Valid Langfuse keys tripped the placeholder guard: {caplog.text!r}"
)
class TestRequestMessageCoercion:
def test_prefers_request_messages_then_messages_then_history_then_user_message(self):
sys.modules.pop("plugins.observability.langfuse", None)
mod = importlib.import_module("plugins.observability.langfuse")
assert mod._coerce_request_messages(
request_messages=[{"role": "system", "content": "s"}],
messages=[{"role": "user", "content": "m"}],
conversation_history=[{"role": "user", "content": "h"}],
user_message="u",
) == [{"role": "system", "content": "s"}]
assert mod._coerce_request_messages(
messages=[{"role": "user", "content": "m"}],
conversation_history=[{"role": "user", "content": "h"}],
user_message="u",
) == [{"role": "user", "content": "m"}]
assert mod._coerce_request_messages(
conversation_history=[{"role": "user", "content": "h"}],
user_message="u",
) == [{"role": "user", "content": "h"}]
assert mod._coerce_request_messages(user_message="u") == [{"role": "user", "content": "u"}]
class TestToolCallOutputBackfill:
def test_post_tool_call_backfills_matching_turn_tool_call_output(self, monkeypatch):
sys.modules.pop("plugins.observability.langfuse", None)
mod = importlib.import_module("plugins.observability.langfuse")
observation = object()
state = mod.TraceState(trace_id="trace-1", root_ctx=None, root_span=None)
state.tools["call-1"] = observation
state.turn_tool_calls.append({
"id": "call-1",
"type": "function",
"name": "web_extract",
"arguments": '{"urls": ["https://example.com"]}',
"function": {
"name": "web_extract",
"arguments": '{"urls": ["https://example.com"]}',
},
})
task_key = mod._trace_key("task-1", "session-1")
monkeypatch.setitem(mod._TRACE_STATE, task_key, state)
ended = {}
def fake_end_observation(obs, *, output=None, metadata=None, usage_details=None, cost_details=None):
ended["observation"] = obs
ended["output"] = output
ended["metadata"] = metadata
monkeypatch.setattr(mod, "_end_observation", fake_end_observation)
mod.on_post_tool_call(
tool_name="web_extract",
args={"urls": ["https://example.com"]},
result='{"results": [{"url": "https://example.com", "content": "Example Domain"}]}',
task_id="task-1",
session_id="session-1",
tool_call_id="call-1",
)
assert ended["observation"] is observation
assert state.turn_tool_calls[0]["output"] == ended["output"]
assert state.turn_tool_calls[0]["function"]["output"] == ended["output"]
assert state.turn_tool_calls[0]["output"] == {
"results": [{"url": "https://example.com", "content": "Example Domain"}]
}
def test_serialize_messages_keeps_tool_name_and_call_id(self):
sys.modules.pop("plugins.observability.langfuse", None)
mod = importlib.import_module("plugins.observability.langfuse")
messages = [{
"role": "tool",
"name": "web_extract",
"tool_call_id": "call-1",
"content": '{"ok": true}',
}]
assert mod._serialize_messages(messages) == [{
"role": "tool",
"name": "web_extract",
"tool_call_id": "call-1",
"content": {"ok": True},
}]
def test_serialize_tool_calls_emits_openai_style_function_shape(self):
sys.modules.pop("plugins.observability.langfuse", None)
mod = importlib.import_module("plugins.observability.langfuse")
class _Fn:
name = "web_extract"
arguments = '{"urls": ["https://example.com"]}'
class _ToolCall:
id = "call-1"
type = "function"
function = _Fn()
assert mod._serialize_tool_calls([_ToolCall()]) == [{
"id": "call-1",
"type": "function",
"name": "web_extract",
"arguments": '{"urls": ["https://example.com"]}',
"function": {
"name": "web_extract",
"arguments": '{"urls": ["https://example.com"]}',
},
}]
class TestToolObservationKeying:
"""Tests for pre/post tool_call observation matching when tool_call_id is absent."""
def _make_mod(self):
sys.modules.pop("plugins.observability.langfuse", None)
return importlib.import_module("plugins.observability.langfuse")
def test_empty_tool_call_id_single_tool_sets_output(self, monkeypatch):
mod = self._make_mod()
obs = object()
state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None)
state.pending_tools_by_name.setdefault("my_tool", []).append(obs)
task_key = mod._trace_key("task-1", "sess-1")
monkeypatch.setitem(mod._TRACE_STATE, task_key, state)
ended = {}
def fake_end(o, *, output=None, metadata=None, **kw):
ended["obs"] = o
ended["output"] = output
monkeypatch.setattr(mod, "_end_observation", fake_end)
mod.on_post_tool_call(
tool_name="my_tool",
args={},
result='{"ok": true}',
task_id="task-1",
session_id="sess-1",
tool_call_id="",
)
assert ended["obs"] is obs
assert ended["output"] == {"ok": True}
assert state.pending_tools_by_name.get("my_tool") is None
def test_empty_tool_call_id_observations_are_fifo_within_tool_name(self, monkeypatch):
"""Two queued observations are consumed in FIFO order so the first
post hook gets the first observation's output, not the second.
Sequential-on-one-thread coverage; the real concurrent case is
guarded by ``_STATE_LOCK`` around every read-modify-write on
``pending_tools_by_name`` and is exercised in
``test_threaded_post_calls_preserve_fifo_under_lock`` below.
"""
mod = self._make_mod()
obs_a, obs_b = object(), object()
state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None)
state.pending_tools_by_name["web_extract"] = [obs_a, obs_b]
task_key = mod._trace_key("task-1", "sess-1")
monkeypatch.setitem(mod._TRACE_STATE, task_key, state)
calls = []
def fake_end(o, *, output=None, metadata=None, **kw):
calls.append((o, output))
monkeypatch.setattr(mod, "_end_observation", fake_end)
mod.on_post_tool_call(
tool_name="web_extract", args={}, result='{"val": "a"}',
task_id="task-1", session_id="sess-1", tool_call_id="",
)
mod.on_post_tool_call(
tool_name="web_extract", args={}, result='{"val": "b"}',
task_id="task-1", session_id="sess-1", tool_call_id="",
)
assert calls[0] == (obs_a, {"val": "a"})
assert calls[1] == (obs_b, {"val": "b"})
assert state.pending_tools_by_name.get("web_extract") is None
def test_threaded_post_calls_preserve_fifo_under_lock(self, monkeypatch):
"""The actual concurrency contract: when 8 threads race to drain
the pending queue, no observation is consumed twice and none is
lost. Validates ``_STATE_LOCK`` discipline, not Python list
semantics."""
import threading
mod = self._make_mod()
n = 8
observations = [object() for _ in range(n)]
state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None)
state.pending_tools_by_name["web_extract"] = list(observations)
task_key = mod._trace_key("task-thr", "sess-thr")
monkeypatch.setitem(mod._TRACE_STATE, task_key, state)
recorded: list = []
lock = threading.Lock()
def fake_end(o, *, output=None, metadata=None, **kw):
with lock:
recorded.append(o)
monkeypatch.setattr(mod, "_end_observation", fake_end)
barrier = threading.Barrier(n)
def worker():
barrier.wait()
mod.on_post_tool_call(
tool_name="web_extract", args={}, result='{"ok": true}',
task_id="task-thr", session_id="sess-thr", tool_call_id="",
)
threads = [threading.Thread(target=worker) for _ in range(n)]
for t in threads:
t.start()
for t in threads:
t.join()
# Every observation was consumed exactly once; queue is empty.
assert len(recorded) == n
assert set(map(id, recorded)) == set(map(id, observations))
assert state.pending_tools_by_name.get("web_extract") is None
def test_explicit_tool_call_id_uses_tools_dict(self, monkeypatch):
"""When tool_call_id is present, pending_tools_by_name is not touched."""
mod = self._make_mod()
obs = object()
state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None)
state.tools["call-99"] = obs
task_key = mod._trace_key("task-1", "sess-1")
monkeypatch.setitem(mod._TRACE_STATE, task_key, state)
ended = {}
def fake_end(o, *, output=None, metadata=None, **kw):
ended["obs"] = o
ended["output"] = output
monkeypatch.setattr(mod, "_end_observation", fake_end)
mod.on_post_tool_call(
tool_name="my_tool", args={}, result='{"status": "done"}',
task_id="task-1", session_id="sess-1", tool_call_id="call-99",
)
assert ended["obs"] is obs
assert ended["output"] == {"status": "done"}
assert not state.tools