From b4356135f2aa7c7cd3ed371d7ae0cb9efbe845e4 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Thu, 18 Jun 2026 12:38:44 +0530 Subject: [PATCH] test(langfuse): add end-to-end turn-isolation regression The PR added helper-level tests for _trace_key but nothing exercised the keys through the real hooks. This adds TestTurnTraceIsolation, which drives on_pre_llm_request / on_post_llm_call across two turns of one gateway session (task_id == session_id, unique turn_id, api_call_count reset per turn) and asserts each turn opens its own root trace when the first turn fails to finalize (tool-only final step). This test fails on the pre-fix code (only one trace opened, turn 2 absorbed into turn 1) and passes with the scoping fix. Also pins the turn_id-over-api_request_id key precedence: the turn-scoped post_llm_call carries no api_request_id, so it must still resolve to the same key as the request-scoped hooks or finalization breaks. --- tests/plugins/test_langfuse_plugin.py | 138 ++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/tests/plugins/test_langfuse_plugin.py b/tests/plugins/test_langfuse_plugin.py index 6c9ebedbf94..c3cb116d43c 100644 --- a/tests/plugins/test_langfuse_plugin.py +++ b/tests/plugins/test_langfuse_plugin.py @@ -236,6 +236,144 @@ class TestTraceScopeKey: assert plugin._trace_key("task-1", "session-1") == "task-1" +# --------------------------------------------------------------------------- +# End-to-end collision regression: two turns of ONE gateway session must not +# share trace state. The helper-level tests above prove _trace_key returns +# distinct keys; this drives the real pre/post hooks to prove the keys are +# actually threaded through so the second turn gets its own root trace. +# +# Gateway reality this reproduces: +# * task_id == session_id for every turn (gateway/run.py) +# * turn_id is unique per turn (turn_context.py) +# * api_call_count resets to 1 each turn (conversation_loop.py) +# +# Before the turn/request scoping, _trace_key collapsed to the constant +# session_id. That worked only because _finish_trace pops the key on a clean +# turn end. When turn 1 does NOT finalize (interrupted, tool-only final step, +# or empty final content), its state lingered under session_id and turn 2 +# silently merged into turn 1's trace instead of opening its own. +# --------------------------------------------------------------------------- + + +class TestTurnTraceIsolation: + def _fresh_plugin(self): + sys.modules.pop("plugins.observability.langfuse", None) + return importlib.import_module("plugins.observability.langfuse") + + @staticmethod + def _fake_client(started): + """A minimal Langfuse stand-in that records each root trace opened. + + ``_start_root_trace`` calls ``create_trace_id`` then opens a root via + ``start_as_current_observation(...)`` (a context manager whose + ``__enter__`` returns the root span). We record one entry per root + actually opened so the test can count distinct traces. + """ + + class _Span: + def update(self, **kw): + pass + + def end(self, **kw): + pass + + def set_trace_io(self, **kw): + pass + + def start_observation(self, **kw): + return _Span() + + class _RootCM: + def __enter__(self): + return _Span() + + def __exit__(self, *exc): + return False + + class _Client: + def create_trace_id(self, seed=None): + return f"trace::{seed}" + + def start_as_current_observation(self, **kw): + started.append(kw.get("trace_context", {}).get("trace_id")) + return _RootCM() + + def flush(self): + pass + + return _Client() + + def _run_turn(self, mod, *, session, turn_n, finalize): + """Drive one turn through the request-scoped hooks the gateway fires.""" + task_id = session # gateway sets task_id == session_id + turn_id = f"{session}:{task_id}:turn{turn_n}" + api_call_count = 1 # resets every turn + api_request_id = f"{turn_id}:api:{api_call_count}" + + mod.on_pre_llm_request( + task_id=task_id, + session_id=session, + model="m", + provider="p", + api_mode="chat", + api_call_count=api_call_count, + request_messages=[{"role": "user", "content": "hi"}], + turn_id=turn_id, + api_request_id=api_request_id, + ) + # finalize=False => leave a tool call on the final response so + # _finish_trace is skipped and the turn's state lingers. + mod.on_post_llm_call( + task_id=task_id, + session_id=session, + model="m", + provider="p", + api_mode="chat", + api_call_count=api_call_count, + assistant_content_chars=5 if finalize else 0, + assistant_tool_call_count=0 if finalize else 1, + usage={"input_tokens": 10, "output_tokens": 5}, + turn_id=turn_id, + api_request_id=api_request_id, + ) + + def test_unfinalized_turn_does_not_capture_next_turn(self, monkeypatch): + """A turn that never finalizes must not absorb the following turn.""" + mod = self._fresh_plugin() + started: list = [] + monkeypatch.setattr(mod, "_get_langfuse", lambda: self._fake_client(started)) + monkeypatch.setattr(mod, "_end_observation", lambda *a, **k: None) + mod._TRACE_STATE.clear() + + # Turn 1 ends without finalizing (its final step still has a tool call). + self._run_turn(mod, session="sess-iso", turn_n=1, finalize=False) + # Turn 2 is a normal, fully finalizing turn in the SAME session. + self._run_turn(mod, session="sess-iso", turn_n=2, finalize=True) + + # Each turn opened its OWN root trace. On the pre-fix code the second + # turn reused turn 1's lingering state and only one trace was opened. + assert len(started) == 2 + + # The two turns are tracked under distinct, turn-scoped keys. + keys = list(mod._TRACE_STATE.keys()) + assert all("turn1" in k or "turn2" in k for k in keys) + + def test_pre_and_post_hooks_share_one_key_within_a_turn(self, monkeypatch): + """turn_id is preferred over api_request_id so the turn-scoped + post_llm_call (which carries no api_request_id) still resolves to the + same key as the request-scoped pre/post_api_request hooks. If the + ordering were reversed, finalization would silently break.""" + mod = self._fresh_plugin() + turn_id = "S:T:turnX" + api_request_id = f"{turn_id}:api:1" + + k_pre_api = mod._trace_key("T", "S", turn_id=turn_id, api_request_id=api_request_id) + k_post_api = mod._trace_key("T", "S", turn_id=turn_id, api_request_id=api_request_id) + k_post_turn = mod._trace_key("T", "S", turn_id=turn_id, api_request_id="") + + assert k_pre_api == k_post_api == k_post_turn + + # --------------------------------------------------------------------------- # Placeholder-credential guard (#23823). #