mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
* fix(langfuse): reject placeholder credentials with one-shot warning When operators leave HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY at a template value like 'placeholder', 'test-key', or 'your-langfuse-key', the Langfuse SDK silently accepts the credentials at construction time and drops every trace at flush time. No warning, no error — just an empty Langfuse dashboard the operator only notices hours later. Add prefix-based validation in _get_langfuse() against the documented 'pk-lf-' / 'sk-lf-' prefixes that Langfuse always issues server-side. Anything else fires a single warning naming the offending env var(s) with a log-safe value preview (full string for short placeholders so the operator knows which template they left in place; truncated for long values so a real secret pasted into the wrong field never hits the log), then short-circuits via the existing _INIT_FAILED cache so the warning fires once per process, not once per hook invocation. The check sits after the 'Langfuse is None' SDK-installed guard so hosts without the optional langfuse SDK don't see misleading 'set real keys' hints when the actionable fix is 'pip install langfuse'. Missing credentials remains the documented opt-out path and stays silent — no log noise for unconfigured installs. Fixes #22763 Fixes #23823 * fix(langfuse): use actual API request messages for generation input on_pre_llm_request previously used the messages kwarg alone, which could be None when Hermes passes the payload via request_messages, conversation_history, or user_message instead. Add _coerce_request_messages to pick the first available list across all variants, falling back to a synthetic user message. Generations now show the real outbound payload rather than an empty input. * fix(langfuse): record tool call outputs in traces Tool observations showed input (arguments) but output was always undefined. Root cause: when tool_call_id is empty, pre_tool_call stored observations under a unique time-based key that post_tool_call could never reconstruct, so every tool span was closed without output by the _finish_trace sweep. Fix pre/post matching by routing empty-tool_call_id tools through a per-name FIFO queue (pending_tools_by_name) instead of the time-based key. Tools with a tool_call_id continue to use the id-keyed dict. Also: - Preserve OpenAI-style nested function shape in serialized tool calls so Langfuse renders name/arguments correctly - Keep name + tool_call_id on role:tool messages for proper pairing - Backfill tool results onto the matching turn_tool_calls entry so the generation's tool-call record carries the result alongside arguments - Coerce request messages from whichever field the runtime provides (request_messages, messages, conversation_history, user_message) * fix(langfuse): salvage-review polish — drop dead is_first_turn, shallow-copy request_messages, real threaded FIFO test Self-review of the combined #22345 + #23831 salvage surfaced three issues worth fixing in the same PR rather than as follow-ups: 1. Drop is_first_turn from the pre_api_request hook. The boolean expression `not bool(conversation_history)` was wrong: conversation_history is reassigned to None mid-run after compression (5 sites in run_agent.py), so the value flips False -> True mid-conversation on every post-compression API call. The langfuse plugin never consumed it, so the kwarg was both misleading AND dead. 2. Replace copy.deepcopy(request_messages) with shallow list() copy. The pre_api_request hook contract discards return values (invoke_hook never writes back to api_kwargs), and the langfuse plugin's _serialize_messages already builds its own snapshot dicts via _safe_value. A deepcopy on every API call would walk every tool result and base64 image — significant overhead for no real isolation benefit. Shallow copy of the outer list protects against later mutations of api_messages without paying for the inner-dict walk. 3. Rename test_empty_tool_call_id_concurrent_fifo_order -> test_empty_tool_call_id_observations_are_fifo_within_tool_name and add a real test_threaded_post_calls_preserve_fifo_under_lock that spawns 8 threads behind a barrier to actually exercise _STATE_LOCK on the pending_tools_by_name queue. The original test was sequential and only validated Python list semantics; this one validates the lock discipline. 4. Fix stale 'Cleared by reset_cache_for_tests()' comment on _INIT_FAILED — that function does not exist. Tests reload the module via sys.modules.pop + importlib.import_module instead. Tests: 37 langfuse plugin tests pass, 658 plugin tests overall pass. --------- Co-authored-by: xxxigm <tuancanhnguyen706@gmail.com> Co-authored-by: Brian Conklin <brian@dralth.com>
1004 lines
37 KiB
Python
1004 lines
37 KiB
Python
"""langfuse — Hermes plugin for Langfuse observability.
|
||
|
||
Traces Hermes conversations, LLM calls, and tool usage to Langfuse.
|
||
|
||
Activation is handled by the Hermes plugin system — standalone plugins only
|
||
load when listed in ``plugins.enabled`` (via ``hermes plugins enable
|
||
observability/langfuse`` or ``hermes tools → Langfuse Observability``). At
|
||
runtime the plugin also requires the ``langfuse`` SDK and credentials; if
|
||
either is missing the hooks are inert.
|
||
|
||
Required env vars (set via ``hermes tools`` or ~/.hermes/.env):
|
||
HERMES_LANGFUSE_PUBLIC_KEY - Langfuse project public key (pk-lf-...)
|
||
HERMES_LANGFUSE_SECRET_KEY - Langfuse project secret key (sk-lf-...)
|
||
HERMES_LANGFUSE_BASE_URL - Langfuse server URL (default: https://cloud.langfuse.com)
|
||
|
||
Optional env vars:
|
||
HERMES_LANGFUSE_ENV - environment tag (e.g. "production", "local")
|
||
HERMES_LANGFUSE_RELEASE - release/version tag
|
||
HERMES_LANGFUSE_SAMPLE_RATE - sampling rate 0.0–1.0 (default: 1.0)
|
||
HERMES_LANGFUSE_MAX_CHARS - max chars per field (default: 12000)
|
||
HERMES_LANGFUSE_DEBUG - set to "true" for verbose logging
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import threading
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Dict, Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
from langfuse import Langfuse, propagate_attributes
|
||
except Exception: # pragma: no cover - fail-open when optional dep is missing
|
||
Langfuse = None
|
||
propagate_attributes = None
|
||
|
||
|
||
@dataclass
|
||
class TraceState:
|
||
trace_id: str
|
||
root_ctx: Any
|
||
root_span: Any
|
||
generations: Dict[str, Any] = field(default_factory=dict)
|
||
tools: Dict[str, Any] = field(default_factory=dict)
|
||
pending_tools_by_name: Dict[str, list] = field(default_factory=dict)
|
||
turn_tool_calls: list[dict[str, Any]] = field(default_factory=list)
|
||
last_updated_at: float = field(default_factory=time.time)
|
||
|
||
|
||
_STATE_LOCK = threading.Lock()
|
||
_TRACE_STATE: Dict[str, TraceState] = {}
|
||
_LANGFUSE_CLIENT = None
|
||
_READ_FILE_LINE_RE = re.compile(r"^\s*(\d+)\|(.*)$")
|
||
_READ_FILE_HEAD_LINES = 25
|
||
_READ_FILE_TAIL_LINES = 15
|
||
|
||
# Langfuse-issued keys always carry these prefixes (cloud or self-hosted —
|
||
# the prefix is baked into the server-side issuance flow, not a UI hint).
|
||
# Anything else (`placeholder`, `test-key`, `your-langfuse-key`, etc.) is a
|
||
# leftover template value and would cause the SDK to silently accept the
|
||
# credentials at construction time but drop every trace at flush time.
|
||
# See #23823 — the silent-failure bug this guard fixes.
|
||
_LANGFUSE_KEY_PREFIXES: Dict[str, str] = {
|
||
"HERMES_LANGFUSE_PUBLIC_KEY": "pk-lf-",
|
||
"HERMES_LANGFUSE_SECRET_KEY": "sk-lf-",
|
||
}
|
||
|
||
|
||
def _env(name: str, default: str = "") -> str:
|
||
return os.environ.get(name, default).strip()
|
||
|
||
|
||
def _env_bool(*names: str) -> bool:
|
||
for name in names:
|
||
value = _env(name).lower()
|
||
if value:
|
||
return value in {"1", "true", "yes", "on"}
|
||
return False
|
||
|
||
|
||
def _debug_enabled() -> bool:
|
||
return _env_bool("HERMES_LANGFUSE_DEBUG")
|
||
|
||
|
||
def _debug(message: str) -> None:
|
||
if _debug_enabled():
|
||
logger.info("Langfuse tracing: %s", message)
|
||
|
||
|
||
# Sentinel: "_get_langfuse() has tried and failed". Lets us short-circuit
|
||
# every subsequent hook call without re-checking env vars or re-attempting
|
||
# SDK init. Tests clear this by reloading the module via
|
||
# ``sys.modules.pop(...) + importlib.import_module(...)`` rather than via a
|
||
# dedicated reset function. Runtime callers cannot reset the cache; if an
|
||
# operator fixes a misconfigured credential they must restart the process.
|
||
_INIT_FAILED = object()
|
||
|
||
|
||
def _redact_key_preview(value: str) -> str:
|
||
"""Return a brief, log-safe preview of a credential value.
|
||
|
||
Keeps enough characters to disambiguate common placeholders
|
||
(``placeholder``, ``test-key``, ``your-key``) without echoing a
|
||
real secret in full if an operator pasted one into the wrong env
|
||
var. Used only for the once-per-process placeholder-detection
|
||
warning in :func:`_get_langfuse`.
|
||
"""
|
||
if not value:
|
||
return "<empty>"
|
||
if len(value) <= 12:
|
||
return repr(value)
|
||
return repr(value[:6] + "...")
|
||
|
||
|
||
def _validate_langfuse_key(env_name: str, value: str) -> Optional[str]:
|
||
"""Return an error message if ``value`` is not a real Langfuse key.
|
||
|
||
Returns ``None`` when the value matches the documented Langfuse
|
||
prefix for ``env_name``, or when no prefix is registered for the
|
||
name (in which case we trust the operator). When validation
|
||
fails the returned string is suitable for direct inclusion in a
|
||
single log line — it names the env var and shows a safe preview.
|
||
"""
|
||
expected = _LANGFUSE_KEY_PREFIXES.get(env_name, "")
|
||
if not expected:
|
||
return None
|
||
if value.startswith(expected):
|
||
return None
|
||
return (
|
||
f"{env_name}={_redact_key_preview(value)} "
|
||
f"(expected {expected!r} prefix)"
|
||
)
|
||
|
||
|
||
def _get_langfuse() -> Optional[Langfuse]:
|
||
"""Return a cached Langfuse client, or ``None`` if unavailable.
|
||
|
||
Activation of this plugin is controlled by the Hermes plugin system —
|
||
this function only handles the runtime-availability gate (SDK installed
|
||
+ credentials present). The result is cached: on the first call we try
|
||
to construct a client, and every subsequent call returns that client
|
||
(or fast-returns ``None`` if init failed).
|
||
"""
|
||
global _LANGFUSE_CLIENT
|
||
if _LANGFUSE_CLIENT is _INIT_FAILED:
|
||
return None
|
||
if _LANGFUSE_CLIENT is not None:
|
||
return _LANGFUSE_CLIENT
|
||
|
||
if Langfuse is None:
|
||
_LANGFUSE_CLIENT = _INIT_FAILED
|
||
return None
|
||
|
||
public_key = _env("HERMES_LANGFUSE_PUBLIC_KEY") or _env("LANGFUSE_PUBLIC_KEY")
|
||
secret_key = _env("HERMES_LANGFUSE_SECRET_KEY") or _env("LANGFUSE_SECRET_KEY")
|
||
if not (public_key and secret_key):
|
||
_LANGFUSE_CLIENT = _INIT_FAILED
|
||
return None
|
||
|
||
# Reject placeholder credentials with a one-shot warning so the
|
||
# operator sees the misconfiguration instead of silently shipping a
|
||
# broken observability stack (#23823). The SDK does not validate
|
||
# keys at construction time — it queues traces in memory and only
|
||
# discovers the auth failure when the background flush thread tries
|
||
# to post them, by which point the warning is buried under whatever
|
||
# else the process is logging. Catch it here, surface it once, and
|
||
# short-circuit via the same _INIT_FAILED path as the empty case.
|
||
placeholder_issues = [
|
||
msg
|
||
for msg in (
|
||
_validate_langfuse_key("HERMES_LANGFUSE_PUBLIC_KEY", public_key),
|
||
_validate_langfuse_key("HERMES_LANGFUSE_SECRET_KEY", secret_key),
|
||
)
|
||
if msg
|
||
]
|
||
if placeholder_issues:
|
||
logger.warning(
|
||
"Langfuse plugin: credentials look like placeholders, traces will "
|
||
"NOT be emitted (%s). Set real Langfuse keys (pk-lf-... / sk-lf-...) "
|
||
"or unset HERMES_LANGFUSE_PUBLIC_KEY / HERMES_LANGFUSE_SECRET_KEY to "
|
||
"silence this warning.",
|
||
"; ".join(placeholder_issues),
|
||
)
|
||
_LANGFUSE_CLIENT = _INIT_FAILED
|
||
return None
|
||
|
||
base_url = _env("HERMES_LANGFUSE_BASE_URL") or _env("LANGFUSE_BASE_URL") or "https://cloud.langfuse.com"
|
||
environment = _env("HERMES_LANGFUSE_ENV") or _env("LANGFUSE_ENV")
|
||
release = _env("HERMES_LANGFUSE_RELEASE") or _env("LANGFUSE_RELEASE")
|
||
sample_rate = _env("HERMES_LANGFUSE_SAMPLE_RATE")
|
||
|
||
kwargs: Dict[str, Any] = {
|
||
"public_key": public_key,
|
||
"secret_key": secret_key,
|
||
"base_url": base_url,
|
||
}
|
||
if environment:
|
||
kwargs["environment"] = environment
|
||
if release:
|
||
kwargs["release"] = release
|
||
if sample_rate:
|
||
try:
|
||
kwargs["sample_rate"] = float(sample_rate)
|
||
except ValueError:
|
||
logger.warning("Invalid HERMES_LANGFUSE_SAMPLE_RATE=%r", sample_rate)
|
||
|
||
try:
|
||
_LANGFUSE_CLIENT = Langfuse(**kwargs)
|
||
except Exception as exc: # pragma: no cover - fail-open
|
||
logger.warning("Could not initialize Langfuse client: %s", exc)
|
||
_LANGFUSE_CLIENT = _INIT_FAILED
|
||
return None
|
||
|
||
return _LANGFUSE_CLIENT
|
||
|
||
|
||
def _trace_key(task_id: str, session_id: str) -> str:
|
||
if task_id:
|
||
return task_id
|
||
if session_id:
|
||
return f"session:{session_id}"
|
||
return f"thread:{threading.get_ident()}"
|
||
|
||
|
||
def _truncate_text(value: str, max_chars: int) -> str:
|
||
if len(value) <= max_chars:
|
||
return value
|
||
return value[:max_chars] + f"... [truncated {len(value) - max_chars} chars]"
|
||
|
||
|
||
def _maybe_parse_json_string(value: str) -> Any:
|
||
stripped = value.strip()
|
||
if len(stripped) < 2 or stripped[0] not in "{[" or stripped[-1] not in "}]":
|
||
if len(stripped) < 2 or stripped[0] not in "{[":
|
||
return value
|
||
try:
|
||
parsed, idx = json.JSONDecoder().raw_decode(stripped)
|
||
except Exception:
|
||
return value
|
||
if not isinstance(parsed, (dict, list)):
|
||
return value
|
||
|
||
trailing = stripped[idx:].strip()
|
||
if not trailing:
|
||
return parsed
|
||
|
||
hint_key = "_hint" if trailing.startswith("[Hint:") else "_trailing_text"
|
||
if isinstance(parsed, dict):
|
||
merged = dict(parsed)
|
||
key = hint_key if hint_key not in merged else "_trailing_text"
|
||
merged[key] = trailing
|
||
return merged
|
||
|
||
return {"data": parsed, hint_key: trailing}
|
||
|
||
|
||
def _looks_like_read_file_payload(value: Any) -> bool:
|
||
if not isinstance(value, dict):
|
||
return False
|
||
content = value.get("content")
|
||
return (
|
||
isinstance(content, str)
|
||
and "total_lines" in value
|
||
and "file_size" in value
|
||
and "is_binary" in value
|
||
and "is_image" in value
|
||
and not value.get("error")
|
||
)
|
||
|
||
|
||
def _parse_read_file_lines(content: str) -> list[dict[str, Any]]:
|
||
if not isinstance(content, str) or not content:
|
||
return []
|
||
|
||
lines = []
|
||
for raw_line in content.splitlines():
|
||
match = _READ_FILE_LINE_RE.match(raw_line)
|
||
if not match:
|
||
return []
|
||
lines.append({
|
||
"line": int(match.group(1)),
|
||
"text": match.group(2),
|
||
})
|
||
return lines
|
||
|
||
|
||
def _build_read_file_preview(lines: list[dict[str, Any]]) -> dict[str, Any]:
|
||
if len(lines) <= (_READ_FILE_HEAD_LINES + _READ_FILE_TAIL_LINES):
|
||
return {"lines": lines}
|
||
|
||
return {
|
||
"head": lines[:_READ_FILE_HEAD_LINES],
|
||
"tail": lines[-_READ_FILE_TAIL_LINES:],
|
||
"omitted_line_count": len(lines) - _READ_FILE_HEAD_LINES - _READ_FILE_TAIL_LINES,
|
||
}
|
||
|
||
|
||
def _normalize_read_file_payload(value: dict[str, Any], *, args: Any = None) -> dict[str, Any]:
|
||
normalized: dict[str, Any] = {}
|
||
if isinstance(args, dict):
|
||
path = args.get("path")
|
||
offset = args.get("offset")
|
||
limit = args.get("limit")
|
||
if isinstance(path, str) and path:
|
||
normalized["path"] = path
|
||
if isinstance(offset, int):
|
||
normalized["offset"] = offset
|
||
if isinstance(limit, int):
|
||
normalized["limit"] = limit
|
||
|
||
lines = _parse_read_file_lines(value.get("content", ""))
|
||
if lines:
|
||
normalized["returned_lines"] = {
|
||
"start": lines[0]["line"],
|
||
"end": lines[-1]["line"],
|
||
"count": len(lines),
|
||
}
|
||
normalized["content_preview"] = _build_read_file_preview(lines)
|
||
elif value.get("content"):
|
||
normalized["content_preview"] = {
|
||
"text": value.get("content", ""),
|
||
}
|
||
|
||
for key in (
|
||
"total_lines",
|
||
"file_size",
|
||
"truncated",
|
||
"is_binary",
|
||
"is_image",
|
||
"hint",
|
||
"_warning",
|
||
"mime_type",
|
||
"dimensions",
|
||
"similar_files",
|
||
"error",
|
||
):
|
||
if key in value:
|
||
normalized[key] = value[key]
|
||
|
||
base64_content = value.get("base64_content")
|
||
if isinstance(base64_content, str) and base64_content:
|
||
normalized["base64_content"] = {
|
||
"omitted": True,
|
||
"length": len(base64_content),
|
||
}
|
||
|
||
return normalized
|
||
|
||
|
||
def _normalize_payload(value: Any, *, tool_name: str = "", args: Any = None) -> Any:
|
||
if _looks_like_read_file_payload(value):
|
||
return _normalize_read_file_payload(
|
||
value,
|
||
args=args if tool_name == "read_file" else None,
|
||
)
|
||
return value
|
||
|
||
|
||
def _safe_value(value: Any, *, max_chars: Optional[int] = None, depth: int = 0,
|
||
parse_json_strings: bool = False) -> Any:
|
||
max_chars = max_chars if max_chars is not None else int(_env("HERMES_LANGFUSE_MAX_CHARS", "12000") or "12000")
|
||
if depth > 4:
|
||
return "<max-depth>"
|
||
if value is None or isinstance(value, (int, float, bool)):
|
||
return value
|
||
if isinstance(value, bytes):
|
||
return {"type": "bytes", "len": len(value)}
|
||
if isinstance(value, str):
|
||
if parse_json_strings:
|
||
parsed = _maybe_parse_json_string(value)
|
||
if parsed is not value:
|
||
return _safe_value(parsed, max_chars=max_chars, depth=depth, parse_json_strings=True)
|
||
return _truncate_text(value, max_chars)
|
||
if isinstance(value, dict):
|
||
normalized = _normalize_payload(value)
|
||
if normalized is not value:
|
||
return _safe_value(normalized, max_chars=max_chars, depth=depth, parse_json_strings=parse_json_strings)
|
||
return {
|
||
str(k): _safe_value(v, max_chars=max_chars, depth=depth + 1, parse_json_strings=parse_json_strings)
|
||
for k, v in list(value.items())[:50]
|
||
}
|
||
if isinstance(value, (list, tuple, set)):
|
||
return [
|
||
_safe_value(v, max_chars=max_chars, depth=depth + 1, parse_json_strings=parse_json_strings)
|
||
for v in list(value)[:50]
|
||
]
|
||
if hasattr(value, "__dict__"):
|
||
return _safe_value(vars(value), max_chars=max_chars, depth=depth + 1, parse_json_strings=parse_json_strings)
|
||
return _truncate_text(repr(value), max_chars)
|
||
|
||
|
||
def _extract_last_user_message(messages: Any) -> Any:
|
||
if not isinstance(messages, list):
|
||
return None
|
||
for message in reversed(messages):
|
||
if isinstance(message, dict) and message.get("role") == "user":
|
||
return {
|
||
"role": "user",
|
||
"content": _safe_value(message.get("content")),
|
||
}
|
||
return None
|
||
|
||
|
||
def _coerce_request_messages(
|
||
*,
|
||
request_messages: Any = None,
|
||
messages: Any = None,
|
||
conversation_history: Any = None,
|
||
user_message: Any = None,
|
||
) -> list[dict[str, Any]]:
|
||
for candidate in (request_messages, messages, conversation_history):
|
||
if isinstance(candidate, list):
|
||
return candidate
|
||
if user_message is None:
|
||
return []
|
||
return [{"role": "user", "content": user_message}]
|
||
|
||
|
||
def _serialize_messages(messages: Any) -> list[dict[str, Any]]:
|
||
if not isinstance(messages, list):
|
||
return []
|
||
serialized = []
|
||
for message in messages[-12:]:
|
||
if not isinstance(message, dict):
|
||
continue
|
||
role = message.get("role")
|
||
item = {
|
||
"role": role,
|
||
"content": _safe_value(
|
||
message.get("content"),
|
||
parse_json_strings=(role == "tool"),
|
||
),
|
||
}
|
||
if role == "tool":
|
||
if message.get("tool_call_id"):
|
||
item["tool_call_id"] = message.get("tool_call_id")
|
||
if message.get("name"):
|
||
item["name"] = _safe_value(message.get("name"))
|
||
if message.get("tool_calls"):
|
||
item["tool_calls"] = _safe_value(message.get("tool_calls"), parse_json_strings=True)
|
||
serialized.append(item)
|
||
return serialized
|
||
|
||
|
||
def _serialize_tool_calls(tool_calls: Any) -> list[dict[str, Any]]:
|
||
if not tool_calls:
|
||
return []
|
||
serialized = []
|
||
for tool_call in tool_calls:
|
||
fn = getattr(tool_call, "function", None)
|
||
name = getattr(fn, "name", None) if fn else None
|
||
arguments = getattr(fn, "arguments", None) if fn else None
|
||
safe_arguments = _safe_value(arguments, parse_json_strings=False)
|
||
serialized.append({
|
||
"id": getattr(tool_call, "id", None),
|
||
"type": getattr(tool_call, "type", None) or "function",
|
||
"name": name,
|
||
"arguments": safe_arguments,
|
||
"function": {
|
||
"name": name,
|
||
"arguments": safe_arguments,
|
||
},
|
||
})
|
||
return serialized
|
||
|
||
|
||
def _serialize_assistant_message(message: Any) -> dict[str, Any]:
|
||
return {
|
||
"content": _safe_value(getattr(message, "content", None)),
|
||
"reasoning": _safe_value(getattr(message, "reasoning", None)),
|
||
"tool_calls": _serialize_tool_calls(getattr(message, "tool_calls", None)),
|
||
}
|
||
|
||
|
||
def _usage_and_cost(response: Any, *, provider: str, api_mode: str, model: str, base_url: str) -> tuple[dict[str, int], dict[str, float]]:
|
||
usage_details: Dict[str, int] = {}
|
||
cost_details: Dict[str, float] = {}
|
||
raw_usage = getattr(response, "usage", None)
|
||
if not raw_usage:
|
||
return usage_details, cost_details
|
||
|
||
try:
|
||
from agent.usage_pricing import estimate_usage_cost, normalize_usage
|
||
|
||
canonical = normalize_usage(raw_usage, provider=provider, api_mode=api_mode)
|
||
# Langfuse usage_details keys follow a naming convention:
|
||
# - Dashboard sums all keys containing "input" as input total
|
||
# - Dashboard sums all keys containing "output" as output total
|
||
# - If no "total" key, Langfuse derives it from all usage types
|
||
# Use Anthropic-style key names so cache tokens roll into the
|
||
# dashboard input total automatically.
|
||
# Ref: https://langfuse.com/docs/model-usage-and-cost
|
||
usage_details = {
|
||
"input": canonical.input_tokens,
|
||
"output": canonical.output_tokens,
|
||
}
|
||
if canonical.cache_read_tokens:
|
||
usage_details["cache_read_input_tokens"] = canonical.cache_read_tokens
|
||
if canonical.cache_write_tokens:
|
||
usage_details["cache_creation_input_tokens"] = canonical.cache_write_tokens
|
||
if canonical.reasoning_tokens:
|
||
usage_details["reasoning_tokens"] = canonical.reasoning_tokens
|
||
cost = estimate_usage_cost(
|
||
model,
|
||
canonical,
|
||
provider=provider,
|
||
base_url=base_url,
|
||
api_key="",
|
||
)
|
||
if cost.amount_usd is not None:
|
||
# Langfuse cost_details keys must match usage_details keys.
|
||
# Provide per-type breakdown so dashboard can show cost by type.
|
||
try:
|
||
from agent.usage_pricing import get_pricing_entry
|
||
from decimal import Decimal
|
||
_ONE_M = Decimal("1000000")
|
||
entry = get_pricing_entry(model, provider=provider, base_url=base_url)
|
||
if entry:
|
||
if entry.input_cost_per_million is not None and canonical.input_tokens:
|
||
cost_details["input"] = float(Decimal(canonical.input_tokens) * entry.input_cost_per_million / _ONE_M)
|
||
if entry.output_cost_per_million is not None and canonical.output_tokens:
|
||
cost_details["output"] = float(Decimal(canonical.output_tokens) * entry.output_cost_per_million / _ONE_M)
|
||
if entry.cache_read_cost_per_million is not None and canonical.cache_read_tokens:
|
||
cost_details["cache_read_input_tokens"] = float(Decimal(canonical.cache_read_tokens) * entry.cache_read_cost_per_million / _ONE_M)
|
||
if entry.cache_write_cost_per_million is not None and canonical.cache_write_tokens:
|
||
cost_details["cache_creation_input_tokens"] = float(Decimal(canonical.cache_write_tokens) * entry.cache_write_cost_per_million / _ONE_M)
|
||
else:
|
||
cost_details["total"] = float(cost.amount_usd)
|
||
except Exception:
|
||
cost_details["total"] = float(cost.amount_usd)
|
||
except Exception as exc: # pragma: no cover - fail-open
|
||
_debug(f"usage normalization failed: {exc}")
|
||
|
||
return usage_details, cost_details
|
||
|
||
|
||
def _start_root_trace(task_key: str, *, task_id: str, session_id: str, platform: str, provider: str, model: str,
|
||
api_mode: str, messages: Any, client: Langfuse) -> TraceState:
|
||
trace_id = client.create_trace_id(seed=f"{session_id or 'sessionless'}::{task_id or task_key}")
|
||
trace_input = _extract_last_user_message(messages)
|
||
metadata = {
|
||
"source": "hermes",
|
||
"task_id": task_id,
|
||
"platform": platform,
|
||
"provider": provider,
|
||
"model": model,
|
||
"api_mode": api_mode,
|
||
}
|
||
|
||
# session_id must be passed in trace_context for Langfuse session grouping.
|
||
trace_ctx: Dict[str, Any] = {"trace_id": trace_id}
|
||
if session_id:
|
||
trace_ctx["session_id"] = session_id
|
||
|
||
if propagate_attributes is not None:
|
||
try:
|
||
with propagate_attributes(
|
||
session_id=session_id or task_key,
|
||
trace_name="Hermes turn",
|
||
tags=["hermes", "langfuse"],
|
||
):
|
||
root_ctx = client.start_as_current_observation(
|
||
trace_context=trace_ctx,
|
||
name="Hermes turn",
|
||
as_type="chain",
|
||
input=trace_input,
|
||
metadata=metadata,
|
||
end_on_exit=False,
|
||
)
|
||
root_span = root_ctx.__enter__()
|
||
except Exception:
|
||
root_ctx = client.start_as_current_observation(
|
||
trace_context=trace_ctx,
|
||
name="Hermes turn",
|
||
as_type="chain",
|
||
input=trace_input,
|
||
metadata=metadata,
|
||
end_on_exit=False,
|
||
)
|
||
root_span = root_ctx.__enter__()
|
||
else:
|
||
root_ctx = client.start_as_current_observation(
|
||
trace_context=trace_ctx,
|
||
name="Hermes turn",
|
||
as_type="chain",
|
||
input=trace_input,
|
||
metadata=metadata,
|
||
end_on_exit=False,
|
||
)
|
||
root_span = root_ctx.__enter__()
|
||
|
||
try:
|
||
root_span.set_trace_io(input=trace_input)
|
||
except Exception:
|
||
pass
|
||
|
||
_debug(f"started trace {trace_id} for {task_key}")
|
||
return TraceState(trace_id=trace_id, root_ctx=root_ctx, root_span=root_span)
|
||
|
||
|
||
def _start_child_observation(state: TraceState, *, client: Langfuse, name: str, as_type: str,
|
||
input_value: Any, metadata: Optional[dict] = None,
|
||
model: Optional[str] = None, model_parameters: Optional[dict] = None) -> Any:
|
||
return state.root_span.start_observation(
|
||
name=name,
|
||
as_type=as_type,
|
||
input=input_value,
|
||
metadata=metadata or {},
|
||
model=model,
|
||
model_parameters=model_parameters,
|
||
)
|
||
|
||
|
||
def _end_observation(observation: Any, *, output: Any = None, metadata: Optional[dict] = None,
|
||
usage_details: Optional[dict] = None, cost_details: Optional[dict] = None) -> None:
|
||
if observation is None:
|
||
return
|
||
try:
|
||
update_kwargs: Dict[str, Any] = {}
|
||
if output is not None:
|
||
update_kwargs["output"] = output
|
||
if metadata:
|
||
update_kwargs["metadata"] = metadata
|
||
if usage_details:
|
||
update_kwargs["usage_details"] = usage_details
|
||
if cost_details:
|
||
update_kwargs["cost_details"] = cost_details
|
||
if update_kwargs:
|
||
observation.update(**update_kwargs)
|
||
observation.end()
|
||
except Exception as exc: # pragma: no cover - fail-open
|
||
_debug(f"end observation failed: {exc}")
|
||
|
||
|
||
def _merge_trace_output(output: Any, state: TraceState) -> Any:
|
||
if not state.turn_tool_calls:
|
||
return output
|
||
|
||
merged = dict(output) if isinstance(output, dict) else {"content": output}
|
||
merged["tool_calls"] = list(state.turn_tool_calls)
|
||
return merged
|
||
|
||
|
||
def _finish_trace(task_key: str, *, output: Any = None) -> None:
|
||
client = _get_langfuse()
|
||
if client is None:
|
||
return
|
||
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.pop(task_key, None)
|
||
if state is None:
|
||
return
|
||
|
||
try:
|
||
for observation in state.generations.values():
|
||
_end_observation(observation)
|
||
for observation in state.tools.values():
|
||
_end_observation(observation)
|
||
for queue in state.pending_tools_by_name.values():
|
||
for observation in queue:
|
||
_end_observation(observation)
|
||
final_output = _merge_trace_output(output, state)
|
||
if final_output is not None:
|
||
state.root_span.set_trace_io(output=final_output)
|
||
state.root_span.update(output=final_output)
|
||
state.root_span.end()
|
||
except Exception as exc: # pragma: no cover - fail-open
|
||
_debug(f"finish trace failed: {exc}")
|
||
finally:
|
||
try:
|
||
client.flush()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _assistant_has_tool_calls(message: Any) -> bool:
|
||
return bool(getattr(message, "tool_calls", None))
|
||
|
||
|
||
def _request_key(api_call_count: Any) -> str:
|
||
return str(api_call_count or 0)
|
||
|
||
|
||
def on_pre_llm_call(*, task_id: str = "", session_id: str = "", platform: str = "", model: str = "",
|
||
provider: str = "", base_url: str = "", api_mode: str = "",
|
||
api_call_count: int = 0, messages: Any = None, turn_type: str = "user",
|
||
conversation_history: Any = None, user_message: Any = None, **_: Any) -> None:
|
||
# Older Hermes branches used pre_llm_call for request-scoped tracing and
|
||
# passed the actual API messages. Current Hermes also has a turn-scoped
|
||
# pre_llm_call used for context injection; tracing that hook creates an
|
||
# extra orphan/root trace before the real request trace. Only trace the
|
||
# legacy request-shaped call here.
|
||
if not isinstance(messages, list):
|
||
return
|
||
|
||
client = _get_langfuse()
|
||
if client is None:
|
||
return
|
||
|
||
# messages is a list only for legacy Hermes branches that fired
|
||
# pre_llm_call with API messages directly. Current Hermes fires
|
||
# pre_llm_call for context injection (conversation_history/user_message,
|
||
# no messages list) — tracing that would create orphan traces.
|
||
task_key = _trace_key(task_id, session_id)
|
||
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.get(task_key)
|
||
if state is None:
|
||
state = _start_root_trace(
|
||
task_key,
|
||
task_id=task_id,
|
||
session_id=session_id,
|
||
platform=platform,
|
||
provider=provider,
|
||
model=model,
|
||
api_mode=api_mode,
|
||
messages=messages,
|
||
client=client,
|
||
)
|
||
_TRACE_STATE[task_key] = state
|
||
state.last_updated_at = time.time()
|
||
|
||
|
||
def on_pre_llm_request(
|
||
*,
|
||
task_id: str = "",
|
||
session_id: str = "",
|
||
platform: str = "",
|
||
model: str = "",
|
||
provider: str = "",
|
||
base_url: str = "",
|
||
api_mode: str = "",
|
||
api_call_count: int = 0,
|
||
request_messages: Any = None,
|
||
messages: Any = None,
|
||
turn_type: str = "user",
|
||
message_count: int = 0,
|
||
tool_count: int = 0,
|
||
approx_input_tokens: int = 0,
|
||
request_char_count: int = 0,
|
||
max_tokens: Any = None,
|
||
conversation_history: Any = None,
|
||
user_message: Any = None,
|
||
**_: Any,
|
||
) -> None:
|
||
client = _get_langfuse()
|
||
if client is None:
|
||
return
|
||
|
||
input_messages = _coerce_request_messages(
|
||
request_messages=request_messages,
|
||
messages=messages,
|
||
conversation_history=conversation_history,
|
||
user_message=user_message,
|
||
)
|
||
|
||
task_key = _trace_key(task_id, session_id)
|
||
req_key = _request_key(api_call_count)
|
||
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.get(task_key)
|
||
if state is None:
|
||
state = _start_root_trace(
|
||
task_key,
|
||
task_id=task_id,
|
||
session_id=session_id,
|
||
platform=platform,
|
||
provider=provider,
|
||
model=model,
|
||
api_mode=api_mode,
|
||
messages=input_messages,
|
||
client=client,
|
||
)
|
||
_TRACE_STATE[task_key] = state
|
||
state.last_updated_at = time.time()
|
||
previous = state.generations.pop(req_key, None)
|
||
if previous is not None:
|
||
_end_observation(previous)
|
||
state.generations[req_key] = _start_child_observation(
|
||
state,
|
||
client=client,
|
||
name=f"LLM call {api_call_count}",
|
||
as_type="generation",
|
||
input_value=_serialize_messages(input_messages),
|
||
metadata={
|
||
"provider": provider,
|
||
"platform": platform,
|
||
"api_mode": api_mode,
|
||
"base_url": base_url,
|
||
},
|
||
model=model,
|
||
model_parameters={"api_mode": api_mode, "provider": provider},
|
||
)
|
||
|
||
|
||
def on_post_llm_call(*, task_id: str = "", session_id: str = "", provider: str = "", base_url: str = "",
|
||
api_mode: str = "", model: str = "", api_call_count: int = 0,
|
||
assistant_message: Any = None, response: Any = None,
|
||
api_duration: float = 0.0, finish_reason: str = "",
|
||
usage: Any = None, assistant_content_chars: int = 0,
|
||
assistant_tool_call_count: int = 0, assistant_response: Any = None,
|
||
**_: Any) -> None:
|
||
client = _get_langfuse()
|
||
if client is None:
|
||
return
|
||
|
||
task_key = _trace_key(task_id, session_id)
|
||
req_key = _request_key(api_call_count)
|
||
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.get(task_key)
|
||
generation = state.generations.pop(req_key, None) if state else None
|
||
if state is None or generation is None:
|
||
return
|
||
|
||
# Handle both call patterns:
|
||
# 1. post_api_request: passes usage (dict), assistant_content_chars, assistant_tool_call_count
|
||
# 2. post_llm_call: passes assistant_message (object), response (object), assistant_response (str)
|
||
if assistant_message is not None:
|
||
output = _serialize_assistant_message(assistant_message)
|
||
elif assistant_response is not None:
|
||
# post_llm_call passes assistant_response as a plain string
|
||
output = {"content": _safe_value(assistant_response), "reasoning": None, "tool_calls": []}
|
||
else:
|
||
# post_api_request path — reconstruct from summary kwargs
|
||
output = {
|
||
"content": f"[{assistant_content_chars} chars]" if assistant_content_chars else None,
|
||
"reasoning": None,
|
||
"tool_calls": [{"id": f"tc_{i}"} for i in range(assistant_tool_call_count)] if assistant_tool_call_count else [],
|
||
}
|
||
|
||
if output.get("tool_calls"):
|
||
state.turn_tool_calls.extend(output["tool_calls"])
|
||
|
||
# Extract usage: prefer response object, fall back to usage dict from post_api_request
|
||
if response is not None:
|
||
usage_details, cost_details = _usage_and_cost(
|
||
response,
|
||
provider=provider,
|
||
api_mode=api_mode,
|
||
model=model,
|
||
base_url=base_url,
|
||
)
|
||
elif isinstance(usage, dict) and usage:
|
||
# post_api_request passes a pre-built CanonicalUsage summary dict.
|
||
# Use Langfuse-convention key names: "input", "output", and
|
||
# "cache_read_input_tokens" / "cache_creation_input_tokens" so the
|
||
# dashboard sums cache tokens into the input total automatically.
|
||
_input = usage.get("input_tokens", 0)
|
||
_output = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0)
|
||
_cache_read = usage.get("cache_read_tokens", 0)
|
||
_cache_write = usage.get("cache_write_tokens", 0)
|
||
_reasoning = usage.get("reasoning_tokens", 0)
|
||
usage_details = {
|
||
"input": _input,
|
||
"output": _output,
|
||
}
|
||
if _cache_read:
|
||
usage_details["cache_read_input_tokens"] = _cache_read
|
||
if _cache_write:
|
||
usage_details["cache_creation_input_tokens"] = _cache_write
|
||
if _reasoning:
|
||
usage_details["reasoning_tokens"] = _reasoning
|
||
cost_details = {}
|
||
# Estimate per-type cost from the summary if possible
|
||
try:
|
||
from agent.usage_pricing import CanonicalUsage, estimate_usage_cost, get_pricing_entry
|
||
from decimal import Decimal
|
||
_ONE_M = Decimal("1000000")
|
||
_cu = CanonicalUsage(
|
||
input_tokens=_input,
|
||
output_tokens=_output,
|
||
cache_read_tokens=_cache_read,
|
||
cache_write_tokens=_cache_write,
|
||
reasoning_tokens=_reasoning,
|
||
)
|
||
entry = get_pricing_entry(model, provider=provider, base_url=base_url)
|
||
if entry:
|
||
if entry.input_cost_per_million is not None and _input:
|
||
cost_details["input"] = float(Decimal(_input) * entry.input_cost_per_million / _ONE_M)
|
||
if entry.output_cost_per_million is not None and _output:
|
||
cost_details["output"] = float(Decimal(_output) * entry.output_cost_per_million / _ONE_M)
|
||
if entry.cache_read_cost_per_million is not None and _cache_read:
|
||
cost_details["cache_read_input_tokens"] = float(Decimal(_cache_read) * entry.cache_read_cost_per_million / _ONE_M)
|
||
if entry.cache_write_cost_per_million is not None and _cache_write:
|
||
cost_details["cache_creation_input_tokens"] = float(Decimal(_cache_write) * entry.cache_write_cost_per_million / _ONE_M)
|
||
else:
|
||
_cost = estimate_usage_cost(model, _cu, provider=provider, base_url=base_url, api_key="")
|
||
if _cost.amount_usd is not None:
|
||
cost_details["total"] = float(_cost.amount_usd)
|
||
except Exception:
|
||
pass
|
||
else:
|
||
usage_details, cost_details = {}, {}
|
||
|
||
tool_count = len(output.get("tool_calls", [])) or assistant_tool_call_count
|
||
gen_metadata: Dict[str, Any] = {"tool_call_count": tool_count}
|
||
if api_duration and api_duration > 0:
|
||
gen_metadata["api_duration_s"] = round(api_duration, 3)
|
||
if finish_reason:
|
||
gen_metadata["finish_reason"] = finish_reason
|
||
_end_observation(
|
||
generation,
|
||
output=output,
|
||
usage_details=usage_details,
|
||
cost_details=cost_details,
|
||
metadata=gen_metadata,
|
||
)
|
||
|
||
has_tools = _assistant_has_tool_calls(assistant_message) if assistant_message else (assistant_tool_call_count > 0)
|
||
has_content = bool(output.get("content"))
|
||
if not has_tools and has_content:
|
||
_finish_trace(task_key, output=output)
|
||
|
||
|
||
def on_pre_tool_call(*, tool_name: str = "", args: Any = None, task_id: str = "",
|
||
session_id: str = "", tool_call_id: str = "", **_: Any) -> None:
|
||
client = _get_langfuse()
|
||
if client is None:
|
||
return
|
||
|
||
task_key = _trace_key(task_id, session_id)
|
||
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.get(task_key)
|
||
if state is None:
|
||
return
|
||
observation = _start_child_observation(
|
||
state,
|
||
client=client,
|
||
name=f"Tool: {tool_name}",
|
||
as_type="tool",
|
||
input_value=_safe_value(args),
|
||
metadata={"tool_name": tool_name, "tool_call_id": tool_call_id},
|
||
)
|
||
if tool_call_id:
|
||
state.tools[tool_call_id] = observation
|
||
else:
|
||
state.pending_tools_by_name.setdefault(tool_name, []).append(observation)
|
||
|
||
|
||
def on_post_tool_call(*, tool_name: str = "", args: Any = None, result: Any = None,
|
||
task_id: str = "", session_id: str = "", tool_call_id: str = "", **_: Any) -> None:
|
||
task_key = _trace_key(task_id, session_id)
|
||
observation = None
|
||
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.get(task_key)
|
||
if state is None:
|
||
return
|
||
if tool_call_id:
|
||
observation = state.tools.pop(tool_call_id, None)
|
||
if observation is None:
|
||
queue = state.pending_tools_by_name.get(tool_name)
|
||
if queue:
|
||
observation = queue.pop(0)
|
||
if not queue:
|
||
state.pending_tools_by_name.pop(tool_name, None)
|
||
|
||
if observation is None:
|
||
return
|
||
|
||
if isinstance(result, str):
|
||
result_value = _maybe_parse_json_string(result)
|
||
else:
|
||
result_value = result
|
||
result_value = _normalize_payload(result_value, tool_name=tool_name, args=args)
|
||
safe_result_value = _safe_value(result_value, parse_json_strings=True)
|
||
|
||
# Backfill so the generation's tool_call record carries the result alongside arguments.
|
||
if tool_call_id:
|
||
with _STATE_LOCK:
|
||
state = _TRACE_STATE.get(task_key)
|
||
if state is not None:
|
||
for tool_call in reversed(state.turn_tool_calls):
|
||
if tool_call.get("id") == tool_call_id:
|
||
tool_call["output"] = safe_result_value
|
||
function_payload = tool_call.get("function")
|
||
if isinstance(function_payload, dict):
|
||
function_payload["output"] = safe_result_value
|
||
break
|
||
|
||
_end_observation(
|
||
observation,
|
||
output=safe_result_value,
|
||
metadata={"tool_name": tool_name, "args": _safe_value(args, parse_json_strings=True)},
|
||
)
|
||
|
||
|
||
def register(ctx) -> None:
|
||
# Register for both hook name variants so the plugin works across
|
||
# Hermes versions. pre_api_request / post_api_request fire per API
|
||
# call (preferred); pre_llm_call / post_llm_call fire once per turn.
|
||
ctx.register_hook("pre_api_request", on_pre_llm_request)
|
||
ctx.register_hook("post_api_request", on_post_llm_call)
|
||
ctx.register_hook("pre_llm_call", on_pre_llm_call)
|
||
ctx.register_hook("post_llm_call", on_post_llm_call)
|
||
ctx.register_hook("pre_tool_call", on_pre_tool_call)
|
||
ctx.register_hook("post_tool_call", on_post_tool_call)
|