test(langfuse): add end-to-end turn-isolation regression

The PR added helper-level tests for _trace_key but nothing exercised the
keys through the real hooks. This adds TestTurnTraceIsolation, which drives
on_pre_llm_request / on_post_llm_call across two turns of one gateway
session (task_id == session_id, unique turn_id, api_call_count reset per
turn) and asserts each turn opens its own root trace when the first turn
fails to finalize (tool-only final step). This test fails on the pre-fix
code (only one trace opened, turn 2 absorbed into turn 1) and passes with
the scoping fix.

Also pins the turn_id-over-api_request_id key precedence: the turn-scoped
post_llm_call carries no api_request_id, so it must still resolve to the
same key as the request-scoped hooks or finalization breaks.
This commit is contained in:
kshitijk4poor 2026-06-18 12:38:44 +05:30
parent 40ed67ccfe
commit b4356135f2

View file

@ -236,6 +236,144 @@ class TestTraceScopeKey:
assert plugin._trace_key("task-1", "session-1") == "task-1"
# ---------------------------------------------------------------------------
# End-to-end collision regression: two turns of ONE gateway session must not
# share trace state. The helper-level tests above prove _trace_key returns
# distinct keys; this drives the real pre/post hooks to prove the keys are
# actually threaded through so the second turn gets its own root trace.
#
# Gateway reality this reproduces:
# * task_id == session_id for every turn (gateway/run.py)
# * turn_id is unique per turn (turn_context.py)
# * api_call_count resets to 1 each turn (conversation_loop.py)
#
# Before the turn/request scoping, _trace_key collapsed to the constant
# session_id. That worked only because _finish_trace pops the key on a clean
# turn end. When turn 1 does NOT finalize (interrupted, tool-only final step,
# or empty final content), its state lingered under session_id and turn 2
# silently merged into turn 1's trace instead of opening its own.
# ---------------------------------------------------------------------------
class TestTurnTraceIsolation:
def _fresh_plugin(self):
sys.modules.pop("plugins.observability.langfuse", None)
return importlib.import_module("plugins.observability.langfuse")
@staticmethod
def _fake_client(started):
"""A minimal Langfuse stand-in that records each root trace opened.
``_start_root_trace`` calls ``create_trace_id`` then opens a root via
``start_as_current_observation(...)`` (a context manager whose
``__enter__`` returns the root span). We record one entry per root
actually opened so the test can count distinct traces.
"""
class _Span:
def update(self, **kw):
pass
def end(self, **kw):
pass
def set_trace_io(self, **kw):
pass
def start_observation(self, **kw):
return _Span()
class _RootCM:
def __enter__(self):
return _Span()
def __exit__(self, *exc):
return False
class _Client:
def create_trace_id(self, seed=None):
return f"trace::{seed}"
def start_as_current_observation(self, **kw):
started.append(kw.get("trace_context", {}).get("trace_id"))
return _RootCM()
def flush(self):
pass
return _Client()
def _run_turn(self, mod, *, session, turn_n, finalize):
"""Drive one turn through the request-scoped hooks the gateway fires."""
task_id = session # gateway sets task_id == session_id
turn_id = f"{session}:{task_id}:turn{turn_n}"
api_call_count = 1 # resets every turn
api_request_id = f"{turn_id}:api:{api_call_count}"
mod.on_pre_llm_request(
task_id=task_id,
session_id=session,
model="m",
provider="p",
api_mode="chat",
api_call_count=api_call_count,
request_messages=[{"role": "user", "content": "hi"}],
turn_id=turn_id,
api_request_id=api_request_id,
)
# finalize=False => leave a tool call on the final response so
# _finish_trace is skipped and the turn's state lingers.
mod.on_post_llm_call(
task_id=task_id,
session_id=session,
model="m",
provider="p",
api_mode="chat",
api_call_count=api_call_count,
assistant_content_chars=5 if finalize else 0,
assistant_tool_call_count=0 if finalize else 1,
usage={"input_tokens": 10, "output_tokens": 5},
turn_id=turn_id,
api_request_id=api_request_id,
)
def test_unfinalized_turn_does_not_capture_next_turn(self, monkeypatch):
"""A turn that never finalizes must not absorb the following turn."""
mod = self._fresh_plugin()
started: list = []
monkeypatch.setattr(mod, "_get_langfuse", lambda: self._fake_client(started))
monkeypatch.setattr(mod, "_end_observation", lambda *a, **k: None)
mod._TRACE_STATE.clear()
# Turn 1 ends without finalizing (its final step still has a tool call).
self._run_turn(mod, session="sess-iso", turn_n=1, finalize=False)
# Turn 2 is a normal, fully finalizing turn in the SAME session.
self._run_turn(mod, session="sess-iso", turn_n=2, finalize=True)
# Each turn opened its OWN root trace. On the pre-fix code the second
# turn reused turn 1's lingering state and only one trace was opened.
assert len(started) == 2
# The two turns are tracked under distinct, turn-scoped keys.
keys = list(mod._TRACE_STATE.keys())
assert all("turn1" in k or "turn2" in k for k in keys)
def test_pre_and_post_hooks_share_one_key_within_a_turn(self, monkeypatch):
"""turn_id is preferred over api_request_id so the turn-scoped
post_llm_call (which carries no api_request_id) still resolves to the
same key as the request-scoped pre/post_api_request hooks. If the
ordering were reversed, finalization would silently break."""
mod = self._fresh_plugin()
turn_id = "S:T:turnX"
api_request_id = f"{turn_id}:api:1"
k_pre_api = mod._trace_key("T", "S", turn_id=turn_id, api_request_id=api_request_id)
k_post_api = mod._trace_key("T", "S", turn_id=turn_id, api_request_id=api_request_id)
k_post_turn = mod._trace_key("T", "S", turn_id=turn_id, api_request_id="")
assert k_pre_api == k_post_api == k_post_turn
# ---------------------------------------------------------------------------
# Placeholder-credential guard (#23823).
#