From db84a78e618bf973ffc403ed2e1f8162f2591daa Mon Sep 17 00:00:00 2001 From: kshitij <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 15 May 2026 05:04:02 -0700 Subject: [PATCH] =?UTF-8?q?fix(langfuse):=20complete=20observability=20fix?= =?UTF-8?q?=20=E2=80=94=20trace=20I/O,=20tool=20outputs,=20placeholder=20c?= =?UTF-8?q?redentials=20(closes=20#22342,=20#22763)=20(#26320)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(langfuse): reject placeholder credentials with one-shot warning When operators leave HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY at a template value like 'placeholder', 'test-key', or 'your-langfuse-key', the Langfuse SDK silently accepts the credentials at construction time and drops every trace at flush time. No warning, no error — just an empty Langfuse dashboard the operator only notices hours later. Add prefix-based validation in _get_langfuse() against the documented 'pk-lf-' / 'sk-lf-' prefixes that Langfuse always issues server-side. Anything else fires a single warning naming the offending env var(s) with a log-safe value preview (full string for short placeholders so the operator knows which template they left in place; truncated for long values so a real secret pasted into the wrong field never hits the log), then short-circuits via the existing _INIT_FAILED cache so the warning fires once per process, not once per hook invocation. The check sits after the 'Langfuse is None' SDK-installed guard so hosts without the optional langfuse SDK don't see misleading 'set real keys' hints when the actionable fix is 'pip install langfuse'. Missing credentials remains the documented opt-out path and stays silent — no log noise for unconfigured installs. Fixes #22763 Fixes #23823 * fix(langfuse): use actual API request messages for generation input on_pre_llm_request previously used the messages kwarg alone, which could be None when Hermes passes the payload via request_messages, conversation_history, or user_message instead. Add _coerce_request_messages to pick the first available list across all variants, falling back to a synthetic user message. Generations now show the real outbound payload rather than an empty input. * fix(langfuse): record tool call outputs in traces Tool observations showed input (arguments) but output was always undefined. Root cause: when tool_call_id is empty, pre_tool_call stored observations under a unique time-based key that post_tool_call could never reconstruct, so every tool span was closed without output by the _finish_trace sweep. Fix pre/post matching by routing empty-tool_call_id tools through a per-name FIFO queue (pending_tools_by_name) instead of the time-based key. Tools with a tool_call_id continue to use the id-keyed dict. Also: - Preserve OpenAI-style nested function shape in serialized tool calls so Langfuse renders name/arguments correctly - Keep name + tool_call_id on role:tool messages for proper pairing - Backfill tool results onto the matching turn_tool_calls entry so the generation's tool-call record carries the result alongside arguments - Coerce request messages from whichever field the runtime provides (request_messages, messages, conversation_history, user_message) * fix(langfuse): salvage-review polish — drop dead is_first_turn, shallow-copy request_messages, real threaded FIFO test Self-review of the combined #22345 + #23831 salvage surfaced three issues worth fixing in the same PR rather than as follow-ups: 1. Drop is_first_turn from the pre_api_request hook. The boolean expression `not bool(conversation_history)` was wrong: conversation_history is reassigned to None mid-run after compression (5 sites in run_agent.py), so the value flips False -> True mid-conversation on every post-compression API call. The langfuse plugin never consumed it, so the kwarg was both misleading AND dead. 2. Replace copy.deepcopy(request_messages) with shallow list() copy. The pre_api_request hook contract discards return values (invoke_hook never writes back to api_kwargs), and the langfuse plugin's _serialize_messages already builds its own snapshot dicts via _safe_value. A deepcopy on every API call would walk every tool result and base64 image — significant overhead for no real isolation benefit. Shallow copy of the outer list protects against later mutations of api_messages without paying for the inner-dict walk. 3. Rename test_empty_tool_call_id_concurrent_fifo_order -> test_empty_tool_call_id_observations_are_fifo_within_tool_name and add a real test_threaded_post_calls_preserve_fifo_under_lock that spawns 8 threads behind a barrier to actually exercise _STATE_LOCK on the pending_tools_by_name queue. The original test was sequential and only validated Python list semantics; this one validates the lock discipline. 4. Fix stale 'Cleared by reset_cache_for_tests()' comment on _INIT_FAILED — that function does not exist. Tests reload the module via sys.modules.pop + importlib.import_module instead. Tests: 37 langfuse plugin tests pass, 658 plugin tests overall pass. --------- Co-authored-by: xxxigm Co-authored-by: Brian Conklin --- plugins/observability/langfuse/__init__.py | 168 ++++++- run_agent.py | 16 + tests/plugins/test_langfuse_plugin.py | 538 ++++++++++++++++++++- tests/run_agent/test_run_agent.py | 5 +- 4 files changed, 705 insertions(+), 22 deletions(-) diff --git a/plugins/observability/langfuse/__init__.py b/plugins/observability/langfuse/__init__.py index 9c9583261a6..8516030fb01 100644 --- a/plugins/observability/langfuse/__init__.py +++ b/plugins/observability/langfuse/__init__.py @@ -47,6 +47,7 @@ class TraceState: root_span: Any generations: Dict[str, Any] = field(default_factory=dict) tools: Dict[str, Any] = field(default_factory=dict) + pending_tools_by_name: Dict[str, list] = field(default_factory=dict) turn_tool_calls: list[dict[str, Any]] = field(default_factory=list) last_updated_at: float = field(default_factory=time.time) @@ -58,6 +59,17 @@ _READ_FILE_LINE_RE = re.compile(r"^\s*(\d+)\|(.*)$") _READ_FILE_HEAD_LINES = 25 _READ_FILE_TAIL_LINES = 15 +# Langfuse-issued keys always carry these prefixes (cloud or self-hosted — +# the prefix is baked into the server-side issuance flow, not a UI hint). +# Anything else (`placeholder`, `test-key`, `your-langfuse-key`, etc.) is a +# leftover template value and would cause the SDK to silently accept the +# credentials at construction time but drop every trace at flush time. +# See #23823 — the silent-failure bug this guard fixes. +_LANGFUSE_KEY_PREFIXES: Dict[str, str] = { + "HERMES_LANGFUSE_PUBLIC_KEY": "pk-lf-", + "HERMES_LANGFUSE_SECRET_KEY": "sk-lf-", +} + def _env(name: str, default: str = "") -> str: return os.environ.get(name, default).strip() @@ -82,10 +94,49 @@ def _debug(message: str) -> None: # Sentinel: "_get_langfuse() has tried and failed". Lets us short-circuit # every subsequent hook call without re-checking env vars or re-attempting -# SDK init. Cleared by reset_cache_for_tests(). +# SDK init. Tests clear this by reloading the module via +# ``sys.modules.pop(...) + importlib.import_module(...)`` rather than via a +# dedicated reset function. Runtime callers cannot reset the cache; if an +# operator fixes a misconfigured credential they must restart the process. _INIT_FAILED = object() +def _redact_key_preview(value: str) -> str: + """Return a brief, log-safe preview of a credential value. + + Keeps enough characters to disambiguate common placeholders + (``placeholder``, ``test-key``, ``your-key``) without echoing a + real secret in full if an operator pasted one into the wrong env + var. Used only for the once-per-process placeholder-detection + warning in :func:`_get_langfuse`. + """ + if not value: + return "" + if len(value) <= 12: + return repr(value) + return repr(value[:6] + "...") + + +def _validate_langfuse_key(env_name: str, value: str) -> Optional[str]: + """Return an error message if ``value`` is not a real Langfuse key. + + Returns ``None`` when the value matches the documented Langfuse + prefix for ``env_name``, or when no prefix is registered for the + name (in which case we trust the operator). When validation + fails the returned string is suitable for direct inclusion in a + single log line — it names the env var and shows a safe preview. + """ + expected = _LANGFUSE_KEY_PREFIXES.get(env_name, "") + if not expected: + return None + if value.startswith(expected): + return None + return ( + f"{env_name}={_redact_key_preview(value)} " + f"(expected {expected!r} prefix)" + ) + + def _get_langfuse() -> Optional[Langfuse]: """Return a cached Langfuse client, or ``None`` if unavailable. @@ -111,6 +162,33 @@ def _get_langfuse() -> Optional[Langfuse]: _LANGFUSE_CLIENT = _INIT_FAILED return None + # Reject placeholder credentials with a one-shot warning so the + # operator sees the misconfiguration instead of silently shipping a + # broken observability stack (#23823). The SDK does not validate + # keys at construction time — it queues traces in memory and only + # discovers the auth failure when the background flush thread tries + # to post them, by which point the warning is buried under whatever + # else the process is logging. Catch it here, surface it once, and + # short-circuit via the same _INIT_FAILED path as the empty case. + placeholder_issues = [ + msg + for msg in ( + _validate_langfuse_key("HERMES_LANGFUSE_PUBLIC_KEY", public_key), + _validate_langfuse_key("HERMES_LANGFUSE_SECRET_KEY", secret_key), + ) + if msg + ] + if placeholder_issues: + logger.warning( + "Langfuse plugin: credentials look like placeholders, traces will " + "NOT be emitted (%s). Set real Langfuse keys (pk-lf-... / sk-lf-...) " + "or unset HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY to " + "silence this warning.", + "; ".join(placeholder_issues), + ) + _LANGFUSE_CLIENT = _INIT_FAILED + return None + base_url = _env("HERMES_LANGFUSE_BASE_URL") or _env("LANGFUSE_BASE_URL") or "https://cloud.langfuse.com" environment = _env("HERMES_LANGFUSE_ENV") or _env("LANGFUSE_ENV") release = _env("HERMES_LANGFUSE_RELEASE") or _env("LANGFUSE_RELEASE") @@ -328,6 +406,21 @@ def _extract_last_user_message(messages: Any) -> Any: return None +def _coerce_request_messages( + *, + request_messages: Any = None, + messages: Any = None, + conversation_history: Any = None, + user_message: Any = None, +) -> list[dict[str, Any]]: + for candidate in (request_messages, messages, conversation_history): + if isinstance(candidate, list): + return candidate + if user_message is None: + return [] + return [{"role": "user", "content": user_message}] + + def _serialize_messages(messages: Any) -> list[dict[str, Any]]: if not isinstance(messages, list): return [] @@ -343,8 +436,11 @@ def _serialize_messages(messages: Any) -> list[dict[str, Any]]: parse_json_strings=(role == "tool"), ), } - if role == "tool" and message.get("tool_call_id"): - item["tool_call_id"] = message.get("tool_call_id") + if role == "tool": + if message.get("tool_call_id"): + item["tool_call_id"] = message.get("tool_call_id") + if message.get("name"): + item["name"] = _safe_value(message.get("name")) if message.get("tool_calls"): item["tool_calls"] = _safe_value(message.get("tool_calls"), parse_json_strings=True) serialized.append(item) @@ -359,15 +455,16 @@ def _serialize_tool_calls(tool_calls: Any) -> list[dict[str, Any]]: fn = getattr(tool_call, "function", None) name = getattr(fn, "name", None) if fn else None arguments = getattr(fn, "arguments", None) if fn else None - if isinstance(arguments, str): - try: - arguments = json.loads(arguments) - except Exception: - pass + safe_arguments = _safe_value(arguments, parse_json_strings=False) serialized.append({ "id": getattr(tool_call, "id", None), + "type": getattr(tool_call, "type", None) or "function", "name": name, - "arguments": _safe_value(arguments, parse_json_strings=True), + "arguments": safe_arguments, + "function": { + "name": name, + "arguments": safe_arguments, + }, }) return serialized @@ -564,6 +661,9 @@ def _finish_trace(task_key: str, *, output: Any = None) -> None: _end_observation(observation) for observation in state.tools.values(): _end_observation(observation) + for queue in state.pending_tools_by_name.values(): + for observation in queue: + _end_observation(observation) final_output = _merge_trace_output(output, state) if final_output is not None: state.root_span.set_trace_io(output=final_output) @@ -636,6 +736,7 @@ def on_pre_llm_request( base_url: str = "", api_mode: str = "", api_call_count: int = 0, + request_messages: Any = None, messages: Any = None, turn_type: str = "user", message_count: int = 0, @@ -643,12 +744,21 @@ def on_pre_llm_request( approx_input_tokens: int = 0, request_char_count: int = 0, max_tokens: Any = None, + conversation_history: Any = None, + user_message: Any = None, **_: Any, ) -> None: client = _get_langfuse() if client is None: return + input_messages = _coerce_request_messages( + request_messages=request_messages, + messages=messages, + conversation_history=conversation_history, + user_message=user_message, + ) + task_key = _trace_key(task_id, session_id) req_key = _request_key(api_call_count) @@ -663,7 +773,7 @@ def on_pre_llm_request( provider=provider, model=model, api_mode=api_mode, - messages=messages, + messages=input_messages, client=client, ) _TRACE_STATE[task_key] = state @@ -676,7 +786,7 @@ def on_pre_llm_request( client=client, name=f"LLM call {api_call_count}", as_type="generation", - input_value=_serialize_messages(messages), + input_value=_serialize_messages(input_messages), metadata={ "provider": provider, "platform": platform, @@ -815,13 +925,12 @@ def on_pre_tool_call(*, tool_name: str = "", args: Any = None, task_id: str = "" return task_key = _trace_key(task_id, session_id) - tool_key = tool_call_id or f"{tool_name}:{time.time_ns()}" with _STATE_LOCK: state = _TRACE_STATE.get(task_key) if state is None: return - state.tools[tool_key] = _start_child_observation( + observation = _start_child_observation( state, client=client, name=f"Tool: {tool_name}", @@ -829,22 +938,29 @@ def on_pre_tool_call(*, tool_name: str = "", args: Any = None, task_id: str = "" input_value=_safe_value(args), metadata={"tool_name": tool_name, "tool_call_id": tool_call_id}, ) + if tool_call_id: + state.tools[tool_call_id] = observation + else: + state.pending_tools_by_name.setdefault(tool_name, []).append(observation) def on_post_tool_call(*, tool_name: str = "", args: Any = None, result: Any = None, task_id: str = "", session_id: str = "", tool_call_id: str = "", **_: Any) -> None: task_key = _trace_key(task_id, session_id) - tool_key = tool_call_id or "" observation = None with _STATE_LOCK: state = _TRACE_STATE.get(task_key) if state is None: return - if tool_key: - observation = state.tools.pop(tool_key, None) - elif state.tools: - _, observation = state.tools.popitem() + if tool_call_id: + observation = state.tools.pop(tool_call_id, None) + if observation is None: + queue = state.pending_tools_by_name.get(tool_name) + if queue: + observation = queue.pop(0) + if not queue: + state.pending_tools_by_name.pop(tool_name, None) if observation is None: return @@ -854,10 +970,24 @@ def on_post_tool_call(*, tool_name: str = "", args: Any = None, result: Any = No else: result_value = result result_value = _normalize_payload(result_value, tool_name=tool_name, args=args) + safe_result_value = _safe_value(result_value, parse_json_strings=True) + + # Backfill so the generation's tool_call record carries the result alongside arguments. + if tool_call_id: + with _STATE_LOCK: + state = _TRACE_STATE.get(task_key) + if state is not None: + for tool_call in reversed(state.turn_tool_calls): + if tool_call.get("id") == tool_call_id: + tool_call["output"] = safe_result_value + function_payload = tool_call.get("function") + if isinstance(function_payload, dict): + function_payload["output"] = safe_result_value + break _end_observation( observation, - output=_safe_value(result_value, parse_json_strings=True), + output=safe_result_value, metadata={"tool_name": tool_name, "args": _safe_value(args, parse_json_strings=True)}, ) diff --git a/run_agent.py b/run_agent.py index 18ca03bd512..a4df8749777 100644 --- a/run_agent.py +++ b/run_agent.py @@ -12668,16 +12668,30 @@ class AIAgent: try: from hermes_cli.plugins import invoke_hook as _invoke_hook + request_messages = api_kwargs.get("messages") + if not isinstance(request_messages, list): + request_messages = api_kwargs.get("input") + if not isinstance(request_messages, list): + request_messages = api_messages + # Shallow-copy the outer list so plugins that retain the + # reference for async snapshotting don't observe later + # mutations of api_messages. The inner dicts are not + # mutated by the agent loop, so a shallow copy is + # sufficient; a deepcopy would walk every tool result + # and base64 image on every API call. _invoke_hook( "pre_api_request", task_id=effective_task_id, session_id=self.session_id or "", + user_message=original_user_message, + conversation_history=list(messages), platform=self.platform or "", model=self.model, provider=self.provider, base_url=self.base_url, api_mode=self.api_mode, api_call_count=api_call_count, + request_messages=list(request_messages) if isinstance(request_messages, list) else [], message_count=len(api_messages), tool_count=len(self.tools or []), approx_input_tokens=approx_tokens, @@ -14582,7 +14596,9 @@ class AIAgent: finish_reason=finish_reason, message_count=len(api_messages), response_model=getattr(response, "model", None), + response=response, usage=self._usage_summary_for_api_request_hook(response), + assistant_message=assistant_message, assistant_content_chars=len(_assistant_text), assistant_tool_call_count=len(_assistant_tool_calls), ) diff --git a/tests/plugins/test_langfuse_plugin.py b/tests/plugins/test_langfuse_plugin.py index 6d9fcce38ee..313d2e94a72 100644 --- a/tests/plugins/test_langfuse_plugin.py +++ b/tests/plugins/test_langfuse_plugin.py @@ -2,6 +2,7 @@ from __future__ import annotations import importlib +import logging import sys from pathlib import Path @@ -164,7 +165,542 @@ class TestHooksInert: # Each hook should just return; no exceptions. mod.on_pre_llm_call(task_id="t", session_id="s", messages=[{"role": "user", "content": "hi"}]) - mod.on_pre_llm_request(task_id="t", session_id="s", api_call_count=1, messages=[]) + mod.on_pre_llm_request(task_id="t", session_id="s", api_call_count=1, request_messages=[]) mod.on_post_llm_call(task_id="t", session_id="s", api_call_count=1) mod.on_pre_tool_call(tool_name="read_file", args={}, task_id="t", session_id="s") mod.on_post_tool_call(tool_name="read_file", args={}, result="ok", task_id="t", session_id="s") + + +# --------------------------------------------------------------------------- +# Placeholder-credential guard (#23823). +# +# Regression coverage for the silent-failure bug: when an operator leaves +# HERMES_LANGFUSE_PUBLIC_KEY / SECRET_KEY at a template value like +# "placeholder", "test-key", or "your-langfuse-key", the SDK accepts the +# credentials at construction time (it does no server-side validation +# eagerly) but drops every trace at flush time, with no signal in the +# Hermes logs. The fix in `_get_langfuse()` validates the documented +# `pk-lf-` / `sk-lf-` prefix Langfuse always issues, surfaces a one-shot +# warning naming the offending env var(s), and short-circuits via the +# same `_INIT_FAILED` path used for missing credentials so subsequent +# hook invocations don't re-log. +# --------------------------------------------------------------------------- + + +class _FakeLangfuse: + """Stand-in for the real :class:`langfuse.Langfuse` so tests don't + need the optional ``langfuse`` SDK installed. The plugin's runtime + gate refuses to proceed past ``if Langfuse is None`` when the SDK + is missing, which would short-circuit before the placeholder check + can fire. Patching ``plugin.Langfuse`` with this class lets the + placeholder validator exercise its full code path.""" + + instances: list["_FakeLangfuse"] = [] + + def __init__(self, **kwargs): + self.kwargs = kwargs + _FakeLangfuse.instances.append(self) + + +class TestPlaceholderKeyDetection: + LOGGER_NAME = "plugins.observability.langfuse" + + def _fresh_plugin(self, monkeypatch=None): + mod_name = "plugins.observability.langfuse" + sys.modules.pop(mod_name, None) + mod = importlib.import_module(mod_name) + if monkeypatch is not None: + # Pretend the SDK is installed so `_get_langfuse()` actually + # reaches the placeholder check. Real SDK calls are never + # made because the placeholder/missing-credentials paths + # return before constructing a client. + _FakeLangfuse.instances.clear() + monkeypatch.setattr(mod, "Langfuse", _FakeLangfuse, raising=False) + return mod + + @staticmethod + def _clear_env(monkeypatch): + for k in ( + "HERMES_LANGFUSE_PUBLIC_KEY", "HERMES_LANGFUSE_SECRET_KEY", + "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", + ): + monkeypatch.delenv(k, raising=False) + + # -- helper unit tests (no SDK stub needed: these don't go through + # _get_langfuse, they exercise the pure-Python helpers directly) ------ + + def test_redact_key_preview_empty(self, monkeypatch): + self._clear_env(monkeypatch) + plugin = self._fresh_plugin() + assert plugin._redact_key_preview("") == "" + + def test_redact_key_preview_short_value_echoed(self, monkeypatch): + """Short placeholder strings are echoed in full so the operator + can see exactly which template they forgot to replace.""" + self._clear_env(monkeypatch) + plugin = self._fresh_plugin() + assert plugin._redact_key_preview("placeholder") == "'placeholder'" + assert plugin._redact_key_preview("test-key") == "'test-key'" + + def test_redact_key_preview_long_value_truncated(self, monkeypatch): + """If an operator pasted a real secret into the wrong env var the + preview must NOT echo it in full — only the leading 6 chars.""" + self._clear_env(monkeypatch) + plugin = self._fresh_plugin() + result = plugin._redact_key_preview("sk-lf-abcdefghijklmnop") + assert "abcdefghij" not in result + assert result.startswith("'sk-lf-") + assert result.endswith("...'") + + def test_validate_langfuse_key_accepts_documented_prefix(self, monkeypatch): + self._clear_env(monkeypatch) + plugin = self._fresh_plugin() + assert plugin._validate_langfuse_key( + "HERMES_LANGFUSE_PUBLIC_KEY", "pk-lf-real-public-xyz" + ) is None + assert plugin._validate_langfuse_key( + "HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz" + ) is None + + def test_validate_langfuse_key_rejects_wrong_prefix(self, monkeypatch): + self._clear_env(monkeypatch) + plugin = self._fresh_plugin() + msg = plugin._validate_langfuse_key( + "HERMES_LANGFUSE_PUBLIC_KEY", "placeholder" + ) + assert msg is not None + assert "HERMES_LANGFUSE_PUBLIC_KEY" in msg + assert "pk-lf-" in msg + + def test_validate_langfuse_key_unknown_name_passes(self, monkeypatch): + """Defensive: an env var with no registered prefix is trusted.""" + self._clear_env(monkeypatch) + plugin = self._fresh_plugin() + assert plugin._validate_langfuse_key("HERMES_LANGFUSE_BASE_URL", "anything") is None + + # -- end-to-end _get_langfuse() behaviour -------------------------------- + # These tests pass `monkeypatch` to _fresh_plugin() so the helper can + # stub out `Langfuse` (the optional SDK). Without that, every call + # short-circuits at `if Langfuse is None` before reaching the + # placeholder validator — masking the very behaviour we're testing. + + def test_placeholder_public_key_warns_and_skips(self, monkeypatch, caplog): + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder") + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + text = caplog.text + assert "HERMES_LANGFUSE_PUBLIC_KEY" in text + assert "'placeholder'" in text + assert "pk-lf-" in text + # The valid secret value must NOT appear (the var NAME does, in + # the "or unset ..." hint, but the value preview shouldn't). + assert "'sk-lf-" not in text + # Never constructed the SDK client — short-circuited before that. + assert _FakeLangfuse.instances == [] + + def test_placeholder_secret_key_warns_and_skips(self, monkeypatch, caplog): + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "pk-lf-real-public-xyz") + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "test-key") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + text = caplog.text + assert "HERMES_LANGFUSE_SECRET_KEY" in text + assert "'test-key'" in text + assert "sk-lf-" in text + # The valid public value must NOT appear. + assert "'pk-lf-" not in text + assert _FakeLangfuse.instances == [] + + def test_both_placeholders_one_warning_with_both_keys(self, monkeypatch, caplog): + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder") + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "placeholder") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + warnings = [r for r in caplog.records if r.levelname == "WARNING" + and r.name == self.LOGGER_NAME] + assert len(warnings) == 1, ( + f"Expected a single combined warning; got {len(warnings)}:\n" + + "\n".join(r.getMessage() for r in warnings) + ) + text = warnings[0].getMessage() + assert "HERMES_LANGFUSE_PUBLIC_KEY" in text + assert "HERMES_LANGFUSE_SECRET_KEY" in text + + def test_repeated_calls_do_not_re_warn(self, monkeypatch, caplog): + """The cached ``_INIT_FAILED`` sentinel must short-circuit + subsequent calls so each hook invocation isn't a fresh log + line — otherwise a busy gateway will spam the operator's + terminal.""" + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder") + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "placeholder") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + for _ in range(15): + assert plugin._get_langfuse() is None + warnings = [r for r in caplog.records if r.levelname == "WARNING" + and r.name == self.LOGGER_NAME] + assert len(warnings) == 1, ( + f"Warning fired {len(warnings)} times across 15 calls; " + "expected 1 (cached via _INIT_FAILED)" + ) + + @pytest.mark.parametrize("placeholder", [ + "placeholder", + "test-key", + "your-langfuse-key", + "change-me", + "xxx", + "dummy-key-here", + "", + "REPLACE_ME", + ]) + def test_common_placeholders_detected(self, monkeypatch, caplog, placeholder): + """A grab-bag of values that real-world ``.env.example`` templates + use as stand-ins. Any of them in either key must trip the guard.""" + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", placeholder) + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + assert "HERMES_LANGFUSE_PUBLIC_KEY" in caplog.text + + def test_legacy_LANGFUSE_PUBLIC_KEY_also_validated(self, monkeypatch, caplog): + """The plugin reads both the canonical HERMES_-prefixed env var and + the legacy bare ``LANGFUSE_PUBLIC_KEY``. The validator must run on + whichever value ``_get_langfuse()`` actually consumed.""" + self._clear_env(monkeypatch) + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "placeholder") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + # Warning names the canonical user-facing env var (the bare + # LANGFUSE_PUBLIC_KEY is a backwards-compat alias for the + # HERMES_-prefixed one — operators set the HERMES_-prefixed one). + assert "HERMES_LANGFUSE_PUBLIC_KEY" in caplog.text + assert "'placeholder'" in caplog.text + + def test_missing_credentials_still_skip_silently(self, monkeypatch, caplog): + """Missing-creds is the documented opt-out path (operator hasn't + configured the plugin yet) — it must remain SILENT. Regression + guard against the placeholder validator accidentally running on + empty values and re-introducing log noise for unconfigured + installs.""" + self._clear_env(monkeypatch) + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + warnings = [r for r in caplog.records if r.levelname == "WARNING" + and r.name == self.LOGGER_NAME] + assert warnings == [] + + def test_sdk_not_installed_still_skips_silently(self, monkeypatch, caplog): + """If the langfuse SDK isn't installed at all, the placeholder + check should never run — there's nothing the operator can do + about a credential mismatch when the package is missing, and + re-warning here would dilute the actually-actionable SDK-missing + signal upstream. The ``Langfuse is None`` guard at the top of + ``_get_langfuse`` already handles this; this test pins that + behaviour.""" + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "placeholder") + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "placeholder") + # NO monkeypatch on Langfuse here — falls back to whatever the + # plugin imported at module load (None if SDK absent). + plugin = self._fresh_plugin() + monkeypatch.setattr(plugin, "Langfuse", None, raising=False) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + assert plugin._get_langfuse() is None + warnings = [r for r in caplog.records if r.levelname == "WARNING" + and r.name == self.LOGGER_NAME] + assert warnings == [] + + def test_valid_prefixes_do_not_trigger_placeholder_warning(self, monkeypatch, caplog): + """Real Langfuse keys (``pk-lf-…`` / ``sk-lf-…``) must pass the + guard and proceed to SDK init. We stub the SDK constructor with + a recording fake so the assertion can confirm BOTH that the + placeholder warning didn't fire AND that the client was actually + constructed — the latter is the success signal the bug report + wanted.""" + self._clear_env(monkeypatch) + monkeypatch.setenv("HERMES_LANGFUSE_PUBLIC_KEY", "pk-lf-real-public-xyz") + monkeypatch.setenv("HERMES_LANGFUSE_SECRET_KEY", "sk-lf-real-secret-xyz") + plugin = self._fresh_plugin(monkeypatch) + with caplog.at_level(logging.WARNING, logger=self.LOGGER_NAME): + client = plugin._get_langfuse() + assert isinstance(client, _FakeLangfuse) + assert client.kwargs["public_key"] == "pk-lf-real-public-xyz" + assert client.kwargs["secret_key"] == "sk-lf-real-secret-xyz" + assert "placeholders" not in caplog.text.lower(), ( + f"Valid Langfuse keys tripped the placeholder guard: {caplog.text!r}" + ) + + +class TestRequestMessageCoercion: + def test_prefers_request_messages_then_messages_then_history_then_user_message(self): + sys.modules.pop("plugins.observability.langfuse", None) + mod = importlib.import_module("plugins.observability.langfuse") + + assert mod._coerce_request_messages( + request_messages=[{"role": "system", "content": "s"}], + messages=[{"role": "user", "content": "m"}], + conversation_history=[{"role": "user", "content": "h"}], + user_message="u", + ) == [{"role": "system", "content": "s"}] + assert mod._coerce_request_messages( + messages=[{"role": "user", "content": "m"}], + conversation_history=[{"role": "user", "content": "h"}], + user_message="u", + ) == [{"role": "user", "content": "m"}] + assert mod._coerce_request_messages( + conversation_history=[{"role": "user", "content": "h"}], + user_message="u", + ) == [{"role": "user", "content": "h"}] + assert mod._coerce_request_messages(user_message="u") == [{"role": "user", "content": "u"}] + + +class TestToolCallOutputBackfill: + def test_post_tool_call_backfills_matching_turn_tool_call_output(self, monkeypatch): + sys.modules.pop("plugins.observability.langfuse", None) + mod = importlib.import_module("plugins.observability.langfuse") + + observation = object() + state = mod.TraceState(trace_id="trace-1", root_ctx=None, root_span=None) + state.tools["call-1"] = observation + state.turn_tool_calls.append({ + "id": "call-1", + "type": "function", + "name": "web_extract", + "arguments": '{"urls": ["https://example.com"]}', + "function": { + "name": "web_extract", + "arguments": '{"urls": ["https://example.com"]}', + }, + }) + + task_key = mod._trace_key("task-1", "session-1") + monkeypatch.setitem(mod._TRACE_STATE, task_key, state) + + ended = {} + + def fake_end_observation(obs, *, output=None, metadata=None, usage_details=None, cost_details=None): + ended["observation"] = obs + ended["output"] = output + ended["metadata"] = metadata + + monkeypatch.setattr(mod, "_end_observation", fake_end_observation) + + mod.on_post_tool_call( + tool_name="web_extract", + args={"urls": ["https://example.com"]}, + result='{"results": [{"url": "https://example.com", "content": "Example Domain"}]}', + task_id="task-1", + session_id="session-1", + tool_call_id="call-1", + ) + + assert ended["observation"] is observation + assert state.turn_tool_calls[0]["output"] == ended["output"] + assert state.turn_tool_calls[0]["function"]["output"] == ended["output"] + assert state.turn_tool_calls[0]["output"] == { + "results": [{"url": "https://example.com", "content": "Example Domain"}] + } + + def test_serialize_messages_keeps_tool_name_and_call_id(self): + sys.modules.pop("plugins.observability.langfuse", None) + mod = importlib.import_module("plugins.observability.langfuse") + + messages = [{ + "role": "tool", + "name": "web_extract", + "tool_call_id": "call-1", + "content": '{"ok": true}', + }] + + assert mod._serialize_messages(messages) == [{ + "role": "tool", + "name": "web_extract", + "tool_call_id": "call-1", + "content": {"ok": True}, + }] + + def test_serialize_tool_calls_emits_openai_style_function_shape(self): + sys.modules.pop("plugins.observability.langfuse", None) + mod = importlib.import_module("plugins.observability.langfuse") + + class _Fn: + name = "web_extract" + arguments = '{"urls": ["https://example.com"]}' + + class _ToolCall: + id = "call-1" + type = "function" + function = _Fn() + + assert mod._serialize_tool_calls([_ToolCall()]) == [{ + "id": "call-1", + "type": "function", + "name": "web_extract", + "arguments": '{"urls": ["https://example.com"]}', + "function": { + "name": "web_extract", + "arguments": '{"urls": ["https://example.com"]}', + }, + }] + + +class TestToolObservationKeying: + """Tests for pre/post tool_call observation matching when tool_call_id is absent.""" + + def _make_mod(self): + sys.modules.pop("plugins.observability.langfuse", None) + return importlib.import_module("plugins.observability.langfuse") + + def test_empty_tool_call_id_single_tool_sets_output(self, monkeypatch): + mod = self._make_mod() + obs = object() + state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None) + state.pending_tools_by_name.setdefault("my_tool", []).append(obs) + + task_key = mod._trace_key("task-1", "sess-1") + monkeypatch.setitem(mod._TRACE_STATE, task_key, state) + + ended = {} + + def fake_end(o, *, output=None, metadata=None, **kw): + ended["obs"] = o + ended["output"] = output + + monkeypatch.setattr(mod, "_end_observation", fake_end) + + mod.on_post_tool_call( + tool_name="my_tool", + args={}, + result='{"ok": true}', + task_id="task-1", + session_id="sess-1", + tool_call_id="", + ) + + assert ended["obs"] is obs + assert ended["output"] == {"ok": True} + assert state.pending_tools_by_name.get("my_tool") is None + + def test_empty_tool_call_id_observations_are_fifo_within_tool_name(self, monkeypatch): + """Two queued observations are consumed in FIFO order so the first + post hook gets the first observation's output, not the second. + + Sequential-on-one-thread coverage; the real concurrent case is + guarded by ``_STATE_LOCK`` around every read-modify-write on + ``pending_tools_by_name`` and is exercised in + ``test_threaded_post_calls_preserve_fifo_under_lock`` below. + """ + mod = self._make_mod() + obs_a, obs_b = object(), object() + state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None) + state.pending_tools_by_name["web_extract"] = [obs_a, obs_b] + + task_key = mod._trace_key("task-1", "sess-1") + monkeypatch.setitem(mod._TRACE_STATE, task_key, state) + + calls = [] + + def fake_end(o, *, output=None, metadata=None, **kw): + calls.append((o, output)) + + monkeypatch.setattr(mod, "_end_observation", fake_end) + + mod.on_post_tool_call( + tool_name="web_extract", args={}, result='{"val": "a"}', + task_id="task-1", session_id="sess-1", tool_call_id="", + ) + mod.on_post_tool_call( + tool_name="web_extract", args={}, result='{"val": "b"}', + task_id="task-1", session_id="sess-1", tool_call_id="", + ) + + assert calls[0] == (obs_a, {"val": "a"}) + assert calls[1] == (obs_b, {"val": "b"}) + assert state.pending_tools_by_name.get("web_extract") is None + + def test_threaded_post_calls_preserve_fifo_under_lock(self, monkeypatch): + """The actual concurrency contract: when 8 threads race to drain + the pending queue, no observation is consumed twice and none is + lost. Validates ``_STATE_LOCK`` discipline, not Python list + semantics.""" + import threading + + mod = self._make_mod() + n = 8 + observations = [object() for _ in range(n)] + state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None) + state.pending_tools_by_name["web_extract"] = list(observations) + + task_key = mod._trace_key("task-thr", "sess-thr") + monkeypatch.setitem(mod._TRACE_STATE, task_key, state) + + recorded: list = [] + lock = threading.Lock() + + def fake_end(o, *, output=None, metadata=None, **kw): + with lock: + recorded.append(o) + + monkeypatch.setattr(mod, "_end_observation", fake_end) + + barrier = threading.Barrier(n) + + def worker(): + barrier.wait() + mod.on_post_tool_call( + tool_name="web_extract", args={}, result='{"ok": true}', + task_id="task-thr", session_id="sess-thr", tool_call_id="", + ) + + threads = [threading.Thread(target=worker) for _ in range(n)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Every observation was consumed exactly once; queue is empty. + assert len(recorded) == n + assert set(map(id, recorded)) == set(map(id, observations)) + assert state.pending_tools_by_name.get("web_extract") is None + + def test_explicit_tool_call_id_uses_tools_dict(self, monkeypatch): + """When tool_call_id is present, pending_tools_by_name is not touched.""" + mod = self._make_mod() + obs = object() + state = mod.TraceState(trace_id="t", root_ctx=None, root_span=None) + state.tools["call-99"] = obs + + task_key = mod._trace_key("task-1", "sess-1") + monkeypatch.setitem(mod._TRACE_STATE, task_key, state) + + ended = {} + + def fake_end(o, *, output=None, metadata=None, **kw): + ended["obs"] = o + ended["output"] = output + + monkeypatch.setattr(mod, "_end_observation", fake_end) + + mod.on_post_tool_call( + tool_name="my_tool", args={}, result='{"status": "done"}', + task_id="task-1", session_id="sess-1", tool_call_id="call-99", + ) + + assert ended["obs"] is obs + assert ended["output"] == {"status": "done"} + assert not state.tools + diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index dadb7b31cce..c493f91509a 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -2524,8 +2524,9 @@ class TestRunConversation: assert [call["api_call_count"] for call in pre_request_calls] == [1, 2] assert [call["api_call_count"] for call in post_request_calls] == [1, 2] assert all(call["session_id"] == agent.session_id for call in pre_request_calls) - assert all("message_count" in c and "messages" not in c for c in pre_request_calls) - assert all("usage" in c and "response" not in c for c in post_request_calls) + assert all("message_count" in c and isinstance(c.get("request_messages"), list) for c in pre_request_calls) + assert any(msg.get("role") == "user" and msg.get("content") == "search something" for msg in pre_request_calls[0]["request_messages"]) + assert all("usage" in c and "response" in c and "assistant_message" in c for c in post_request_calls) def test_content_with_tool_calls_stays_silent_for_non_cli_quiet_mode(self, agent): self._setup_agent(agent)