fix(langfuse): complete observability fix — trace I/O, tool outputs, placeholder credentials (closes #22342, #22763) (#26320)

* fix(langfuse): reject placeholder credentials with one-shot warning

When operators leave HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY
at a template value like 'placeholder', 'test-key', or 'your-langfuse-key',
the Langfuse SDK silently accepts the credentials at construction time and
drops every trace at flush time. No warning, no error — just an empty
Langfuse dashboard the operator only notices hours later.

Add prefix-based validation in _get_langfuse() against the documented
'pk-lf-' / 'sk-lf-' prefixes that Langfuse always issues server-side.
Anything else fires a single warning naming the offending env var(s)
with a log-safe value preview (full string for short placeholders so the
operator knows which template they left in place; truncated for long
values so a real secret pasted into the wrong field never hits the log),
then short-circuits via the existing _INIT_FAILED cache so the warning
fires once per process, not once per hook invocation.

The check sits after the 'Langfuse is None' SDK-installed guard so hosts
without the optional langfuse SDK don't see misleading 'set real keys'
hints when the actionable fix is 'pip install langfuse'. Missing
credentials remains the documented opt-out path and stays silent — no
log noise for unconfigured installs.

Fixes #22763
Fixes #23823

* fix(langfuse): use actual API request messages for generation input

on_pre_llm_request previously used the messages kwarg alone, which
could be None when Hermes passes the payload via request_messages,
conversation_history, or user_message instead. Add _coerce_request_messages
to pick the first available list across all variants, falling back to a
synthetic user message. Generations now show the real outbound payload
rather than an empty input.

* fix(langfuse): record tool call outputs in traces

Tool observations showed input (arguments) but output was always
undefined. Root cause: when tool_call_id is empty, pre_tool_call stored
observations under a unique time-based key that post_tool_call could
never reconstruct, so every tool span was closed without output by the
_finish_trace sweep.

Fix pre/post matching by routing empty-tool_call_id tools through a
per-name FIFO queue (pending_tools_by_name) instead of the time-based
key. Tools with a tool_call_id continue to use the id-keyed dict.

Also:
 - Preserve OpenAI-style nested function shape in serialized tool calls
   so Langfuse renders name/arguments correctly
 - Keep name + tool_call_id on role:tool messages for proper pairing
 - Backfill tool results onto the matching turn_tool_calls entry so the
   generation's tool-call record carries the result alongside arguments
 - Coerce request messages from whichever field the runtime provides
   (request_messages, messages, conversation_history, user_message)

* fix(langfuse): salvage-review polish — drop dead is_first_turn, shallow-copy request_messages, real threaded FIFO test

Self-review of the combined #22345 + #23831 salvage surfaced three issues
worth fixing in the same PR rather than as follow-ups:

1. Drop is_first_turn from the pre_api_request hook. The boolean expression
   `not bool(conversation_history)` was wrong: conversation_history is
   reassigned to None mid-run after compression (5 sites in run_agent.py),
   so the value flips False -> True mid-conversation on every post-compression
   API call. The langfuse plugin never consumed it, so the kwarg was both
   misleading AND dead.

2. Replace copy.deepcopy(request_messages) with shallow list() copy. The
   pre_api_request hook contract discards return values (invoke_hook never
   writes back to api_kwargs), and the langfuse plugin's _serialize_messages
   already builds its own snapshot dicts via _safe_value. A deepcopy on every
   API call would walk every tool result and base64 image — significant
   overhead for no real isolation benefit. Shallow copy of the outer list
   protects against later mutations of api_messages without paying for the
   inner-dict walk.

3. Rename test_empty_tool_call_id_concurrent_fifo_order ->
   test_empty_tool_call_id_observations_are_fifo_within_tool_name and add a
   real test_threaded_post_calls_preserve_fifo_under_lock that spawns 8
   threads behind a barrier to actually exercise _STATE_LOCK on the
   pending_tools_by_name queue. The original test was sequential and only
   validated Python list semantics; this one validates the lock discipline.

4. Fix stale 'Cleared by reset_cache_for_tests()' comment on _INIT_FAILED —
   that function does not exist. Tests reload the module via sys.modules.pop
   + importlib.import_module instead.

Tests: 37 langfuse plugin tests pass, 658 plugin tests overall pass.

---------

Co-authored-by: xxxigm <tuancanhnguyen706@gmail.com>
Co-authored-by: Brian Conklin <brian@dralth.com>
This commit is contained in:
kshitij 2026-05-15 05:04:02 -07:00 committed by GitHub
parent f199cd9f84
commit db84a78e61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 705 additions and 22 deletions

View file

@ -47,6 +47,7 @@ class TraceState:
root_span: Any
generations: Dict[str, Any] = field(default_factory=dict)
tools: Dict[str, Any] = field(default_factory=dict)
pending_tools_by_name: Dict[str, list] = field(default_factory=dict)
turn_tool_calls: list[dict[str, Any]] = field(default_factory=list)
last_updated_at: float = field(default_factory=time.time)
@ -58,6 +59,17 @@ _READ_FILE_LINE_RE = re.compile(r"^\s*(\d+)\|(.*)$")
_READ_FILE_HEAD_LINES = 25
_READ_FILE_TAIL_LINES = 15
# Langfuse-issued keys always carry these prefixes (cloud or self-hosted —
# the prefix is baked into the server-side issuance flow, not a UI hint).
# Anything else (`placeholder`, `test-key`, `your-langfuse-key`, etc.) is a
# leftover template value and would cause the SDK to silently accept the
# credentials at construction time but drop every trace at flush time.
# See #23823 — the silent-failure bug this guard fixes.
_LANGFUSE_KEY_PREFIXES: Dict[str, str] = {
"HERMES_LANGFUSE_PUBLIC_KEY": "pk-lf-",
"HERMES_LANGFUSE_SECRET_KEY": "sk-lf-",
}
def _env(name: str, default: str = "") -> str:
return os.environ.get(name, default).strip()
@ -82,10 +94,49 @@ def _debug(message: str) -> None:
# Sentinel: "_get_langfuse() has tried and failed". Lets us short-circuit
# every subsequent hook call without re-checking env vars or re-attempting
# SDK init. Cleared by reset_cache_for_tests().
# SDK init. Tests clear this by reloading the module via
# ``sys.modules.pop(...) + importlib.import_module(...)`` rather than via a
# dedicated reset function. Runtime callers cannot reset the cache; if an
# operator fixes a misconfigured credential they must restart the process.
_INIT_FAILED = object()
def _redact_key_preview(value: str) -> str:
"""Return a brief, log-safe preview of a credential value.
Keeps enough characters to disambiguate common placeholders
(``placeholder``, ``test-key``, ``your-key``) without echoing a
real secret in full if an operator pasted one into the wrong env
var. Used only for the once-per-process placeholder-detection
warning in :func:`_get_langfuse`.
"""
if not value:
return "<empty>"
if len(value) <= 12:
return repr(value)
return repr(value[:6] + "...")
def _validate_langfuse_key(env_name: str, value: str) -> Optional[str]:
"""Return an error message if ``value`` is not a real Langfuse key.
Returns ``None`` when the value matches the documented Langfuse
prefix for ``env_name``, or when no prefix is registered for the
name (in which case we trust the operator). When validation
fails the returned string is suitable for direct inclusion in a
single log line it names the env var and shows a safe preview.
"""
expected = _LANGFUSE_KEY_PREFIXES.get(env_name, "")
if not expected:
return None
if value.startswith(expected):
return None
return (
f"{env_name}={_redact_key_preview(value)} "
f"(expected {expected!r} prefix)"
)
def _get_langfuse() -> Optional[Langfuse]:
"""Return a cached Langfuse client, or ``None`` if unavailable.
@ -111,6 +162,33 @@ def _get_langfuse() -> Optional[Langfuse]:
_LANGFUSE_CLIENT = _INIT_FAILED
return None
# Reject placeholder credentials with a one-shot warning so the
# operator sees the misconfiguration instead of silently shipping a
# broken observability stack (#23823). The SDK does not validate
# keys at construction time — it queues traces in memory and only
# discovers the auth failure when the background flush thread tries
# to post them, by which point the warning is buried under whatever
# else the process is logging. Catch it here, surface it once, and
# short-circuit via the same _INIT_FAILED path as the empty case.
placeholder_issues = [
msg
for msg in (
_validate_langfuse_key("HERMES_LANGFUSE_PUBLIC_KEY", public_key),
_validate_langfuse_key("HERMES_LANGFUSE_SECRET_KEY", secret_key),
)
if msg
]
if placeholder_issues:
logger.warning(
"Langfuse plugin: credentials look like placeholders, traces will "
"NOT be emitted (%s). Set real Langfuse keys (pk-lf-... / sk-lf-...) "
"or unset HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY to "
"silence this warning.",
"; ".join(placeholder_issues),
)
_LANGFUSE_CLIENT = _INIT_FAILED
return None
base_url = _env("HERMES_LANGFUSE_BASE_URL") or _env("LANGFUSE_BASE_URL") or "https://cloud.langfuse.com"
environment = _env("HERMES_LANGFUSE_ENV") or _env("LANGFUSE_ENV")
release = _env("HERMES_LANGFUSE_RELEASE") or _env("LANGFUSE_RELEASE")
@ -328,6 +406,21 @@ def _extract_last_user_message(messages: Any) -> Any:
return None
def _coerce_request_messages(
*,
request_messages: Any = None,
messages: Any = None,
conversation_history: Any = None,
user_message: Any = None,
) -> list[dict[str, Any]]:
for candidate in (request_messages, messages, conversation_history):
if isinstance(candidate, list):
return candidate
if user_message is None:
return []
return [{"role": "user", "content": user_message}]
def _serialize_messages(messages: Any) -> list[dict[str, Any]]:
if not isinstance(messages, list):
return []
@ -343,8 +436,11 @@ def _serialize_messages(messages: Any) -> list[dict[str, Any]]:
parse_json_strings=(role == "tool"),
),
}
if role == "tool" and message.get("tool_call_id"):
item["tool_call_id"] = message.get("tool_call_id")
if role == "tool":
if message.get("tool_call_id"):
item["tool_call_id"] = message.get("tool_call_id")
if message.get("name"):
item["name"] = _safe_value(message.get("name"))
if message.get("tool_calls"):
item["tool_calls"] = _safe_value(message.get("tool_calls"), parse_json_strings=True)
serialized.append(item)
@ -359,15 +455,16 @@ def _serialize_tool_calls(tool_calls: Any) -> list[dict[str, Any]]:
fn = getattr(tool_call, "function", None)
name = getattr(fn, "name", None) if fn else None
arguments = getattr(fn, "arguments", None) if fn else None
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except Exception:
pass
safe_arguments = _safe_value(arguments, parse_json_strings=False)
serialized.append({
"id": getattr(tool_call, "id", None),
"type": getattr(tool_call, "type", None) or "function",
"name": name,
"arguments": _safe_value(arguments, parse_json_strings=True),
"arguments": safe_arguments,
"function": {
"name": name,
"arguments": safe_arguments,
},
})
return serialized
@ -564,6 +661,9 @@ def _finish_trace(task_key: str, *, output: Any = None) -> None:
_end_observation(observation)
for observation in state.tools.values():
_end_observation(observation)
for queue in state.pending_tools_by_name.values():
for observation in queue:
_end_observation(observation)
final_output = _merge_trace_output(output, state)
if final_output is not None:
state.root_span.set_trace_io(output=final_output)
@ -636,6 +736,7 @@ def on_pre_llm_request(
base_url: str = "",
api_mode: str = "",
api_call_count: int = 0,
request_messages: Any = None,
messages: Any = None,
turn_type: str = "user",
message_count: int = 0,
@ -643,12 +744,21 @@ def on_pre_llm_request(
approx_input_tokens: int = 0,
request_char_count: int = 0,
max_tokens: Any = None,
conversation_history: Any = None,
user_message: Any = None,
**_: Any,
) -> None:
client = _get_langfuse()
if client is None:
return
input_messages = _coerce_request_messages(
request_messages=request_messages,
messages=messages,
conversation_history=conversation_history,
user_message=user_message,
)
task_key = _trace_key(task_id, session_id)
req_key = _request_key(api_call_count)
@ -663,7 +773,7 @@ def on_pre_llm_request(
provider=provider,
model=model,
api_mode=api_mode,
messages=messages,
messages=input_messages,
client=client,
)
_TRACE_STATE[task_key] = state
@ -676,7 +786,7 @@ def on_pre_llm_request(
client=client,
name=f"LLM call {api_call_count}",
as_type="generation",
input_value=_serialize_messages(messages),
input_value=_serialize_messages(input_messages),
metadata={
"provider": provider,
"platform": platform,
@ -815,13 +925,12 @@ def on_pre_tool_call(*, tool_name: str = "", args: Any = None, task_id: str = ""
return
task_key = _trace_key(task_id, session_id)
tool_key = tool_call_id or f"{tool_name}:{time.time_ns()}"
with _STATE_LOCK:
state = _TRACE_STATE.get(task_key)
if state is None:
return
state.tools[tool_key] = _start_child_observation(
observation = _start_child_observation(
state,
client=client,
name=f"Tool: {tool_name}",
@ -829,22 +938,29 @@ def on_pre_tool_call(*, tool_name: str = "", args: Any = None, task_id: str = ""
input_value=_safe_value(args),
metadata={"tool_name": tool_name, "tool_call_id": tool_call_id},
)
if tool_call_id:
state.tools[tool_call_id] = observation
else:
state.pending_tools_by_name.setdefault(tool_name, []).append(observation)
def on_post_tool_call(*, tool_name: str = "", args: Any = None, result: Any = None,
task_id: str = "", session_id: str = "", tool_call_id: str = "", **_: Any) -> None:
task_key = _trace_key(task_id, session_id)
tool_key = tool_call_id or ""
observation = None
with _STATE_LOCK:
state = _TRACE_STATE.get(task_key)
if state is None:
return
if tool_key:
observation = state.tools.pop(tool_key, None)
elif state.tools:
_, observation = state.tools.popitem()
if tool_call_id:
observation = state.tools.pop(tool_call_id, None)
if observation is None:
queue = state.pending_tools_by_name.get(tool_name)
if queue:
observation = queue.pop(0)
if not queue:
state.pending_tools_by_name.pop(tool_name, None)
if observation is None:
return
@ -854,10 +970,24 @@ def on_post_tool_call(*, tool_name: str = "", args: Any = None, result: Any = No
else:
result_value = result
result_value = _normalize_payload(result_value, tool_name=tool_name, args=args)
safe_result_value = _safe_value(result_value, parse_json_strings=True)
# Backfill so the generation's tool_call record carries the result alongside arguments.
if tool_call_id:
with _STATE_LOCK:
state = _TRACE_STATE.get(task_key)
if state is not None:
for tool_call in reversed(state.turn_tool_calls):
if tool_call.get("id") == tool_call_id:
tool_call["output"] = safe_result_value
function_payload = tool_call.get("function")
if isinstance(function_payload, dict):
function_payload["output"] = safe_result_value
break
_end_observation(
observation,
output=_safe_value(result_value, parse_json_strings=True),
output=safe_result_value,
metadata={"tool_name": tool_name, "args": _safe_value(args, parse_json_strings=True)},
)