fix: flush plugin-config OpenInference when the final session closes

Clear NeMo Relay plugin-config observability only after the last active Hermes session finalizes.

Use the plugin's async-safe awaitable helper for both initialize and clear so session rotation remains safe under active event loops.

Disable the direct ATIF fallback when plugins.toml already owns the ATIF exporter lifecycle to avoid duplicate trajectory export on finalization.
This commit is contained in:
mnajafian-nv 2026-06-07 14:29:30 -07:00
parent c986377236
commit 9d61076f88
No known key found for this signature in database
GPG key ID: C0C3EEEE9FB11E38
3 changed files with 167 additions and 9 deletions

View file

@ -163,7 +163,10 @@ agent_version = "local"
When `HERMES_NEMO_RELAY_PLUGINS_TOML` is set and initializes successfully, NeMo
Relay owns exporter lifecycle through that config. The direct
`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped.
`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. If the same
`plugins.toml` observability config enables `atif`, the direct
`HERMES_NEMO_RELAY_ATIF_*` fallback setup is also skipped so Hermes does not
double-export trajectories on teardown.
To enable NeMo Relay managed execution intercepts for provider and tool calls,
include an adaptive component in the same `plugins.toml`:

View file

@ -78,17 +78,22 @@ class _Runtime:
return False
try:
self._ensure_plugin_config_output_dirs(self.settings.plugins_config)
result = initialize(self.settings.plugins_config)
if inspect.isawaitable(result):
asyncio.run(result)
_resolve_awaitable(initialize(self.settings.plugins_config))
return True
except RuntimeError:
logger.debug("NeMo Relay plugins.toml init skipped inside a running event loop")
return False
except Exception as exc:
logger.debug("NeMo Relay plugins.toml init failed: %s", exc, exc_info=True)
return False
def _clear_plugins_toml(self) -> None:
if not self._plugin_config_initialized:
return
plugin_mod = getattr(self.nemo_relay, "plugin", None)
clear = getattr(plugin_mod, "clear", None)
if not callable(clear):
return
_resolve_awaitable(clear())
self._plugin_config_initialized = False
def _ensure_plugin_config_output_dirs(self, config: dict[str, Any]) -> None:
for component in config.get("components", []):
if not isinstance(component, dict):
@ -124,6 +129,8 @@ class _Runtime:
self.atof_exporter.register("hermes.nemo_relay.atof")
def ensure_session(self, kwargs: dict[str, Any]) -> _SessionState:
if self.settings.plugins_config and not self._plugin_config_initialized:
self._plugin_config_initialized = self._configure_plugins_toml()
session_id = _session_id(kwargs)
state = self.sessions.get(session_id)
if state is not None:
@ -189,6 +196,11 @@ class _Runtime:
state.atif_exporter.deregister(state.atif_subscriber_name)
except Exception:
logger.debug("NeMo Relay ATIF deregister failed", exc_info=True)
if self._plugin_config_initialized and not self.sessions:
try:
self._clear_plugins_toml()
except Exception:
logger.debug("NeMo Relay plugins.toml clear failed", exc_info=True)
def mark(self, name: str, kwargs: dict[str, Any]) -> None:
state = self.ensure_session(kwargs)
@ -561,6 +573,12 @@ def _load_settings() -> _Settings:
plugins_toml_path = _env("HERMES_NEMO_RELAY_PLUGINS_TOML")
plugins_config = _load_plugins_config(plugins_toml_path)
adaptive_config = _enabled_component_config(plugins_config, "adaptive")
atif_enabled = _env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED")
if atif_enabled and _observability_exporter_enabled(plugins_config, "atif"):
logger.debug(
"NeMo Relay direct ATIF fallback disabled because plugins.toml observability.atif owns exporter lifecycle"
)
atif_enabled = False
return _Settings(
plugins_toml_path=plugins_toml_path,
plugins_config=plugins_config,
@ -570,7 +588,7 @@ def _load_settings() -> _Settings:
atof_output_directory=_env("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY"),
atof_filename=_env("HERMES_NEMO_RELAY_ATOF_FILENAME") or "hermes-atof.jsonl",
atof_mode=_env("HERMES_NEMO_RELAY_ATOF_MODE") or "append",
atif_enabled=_env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED"),
atif_enabled=atif_enabled,
atif_output_directory=_env("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY"),
atif_filename_template=_env("HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE") or "hermes-atif-{session_id}.json",
atif_subagent_export_mode=_atif_subagent_export_mode(),
@ -618,6 +636,19 @@ def _adaptive_mode(config: dict[str, Any] | None) -> str:
return "observe"
def _observability_exporter_enabled(
plugins_config: dict[str, Any] | None,
exporter_name: str,
) -> bool:
observability_config = _enabled_component_config(plugins_config, "observability")
if not isinstance(observability_config, dict):
return False
exporter_config = observability_config.get(exporter_name)
if not isinstance(exporter_config, dict):
return False
return exporter_config.get("enabled", True) is not False
def _env(name: str) -> str:
return os.environ.get(name, "").strip()

View file

@ -2,10 +2,13 @@
from __future__ import annotations
import asyncio
import builtins
import gc
import importlib
import json
import sys
import warnings
from pathlib import Path
from types import SimpleNamespace
@ -37,7 +40,7 @@ class _FakeNemoRelay:
call_end=self._tool_call_end,
execute=self._tool_execute,
)
self.plugin = SimpleNamespace(initialize=self._plugin_initialize)
self.plugin = SimpleNamespace(initialize=self._plugin_initialize, clear=self._plugin_clear)
self.LLMRequest = _FakeLLMRequest
self.AtofExporterConfig = _FakeAtofExporterConfig
self.AtofExporterMode = SimpleNamespace(Append="append", Overwrite="overwrite")
@ -93,6 +96,9 @@ class _FakeNemoRelay:
self.events.append(("plugin.initialize", config))
return {"diagnostics": []}
async def _plugin_clear(self):
self.events.append(("plugin.clear",))
class _FakeLLMRequest:
def __init__(self, headers, content):
@ -445,6 +451,124 @@ output_directory = "{atif_dir}"
assert atif_dir.is_dir()
def test_nemo_relay_plugin_clears_plugins_toml_on_final_session_finalize_and_reinitializes(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
plugin = _fresh_plugin(monkeypatch, fake)
plugins_toml = tmp_path / "plugins.toml"
plugins_toml.write_text(
"""
version = 1
[[components]]
kind = "observability"
enabled = true
""",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml))
plugin.on_session_start(session_id="s1")
plugin.on_session_finalize(session_id="s1", reason="shutdown")
plugin.on_session_start(session_id="s2")
event_names = [event[0] for event in fake.events]
assert event_names.count("plugin.initialize") == 2
assert event_names.count("plugin.clear") == 1
def test_nemo_relay_plugin_keeps_plugins_toml_active_while_other_sessions_remain(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
plugin = _fresh_plugin(monkeypatch, fake)
plugins_toml = tmp_path / "plugins.toml"
plugins_toml.write_text(
"""
version = 1
[[components]]
kind = "observability"
enabled = true
""",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml))
plugin.on_session_start(session_id="parent")
plugin.on_session_start(session_id="child")
plugin.on_session_finalize(session_id="child", reason="shutdown")
plugin.on_session_finalize(session_id="parent", reason="shutdown")
event_names = [event[0] for event in fake.events]
assert event_names.count("plugin.initialize") == 1
assert event_names.count("plugin.clear") == 1
def test_nemo_relay_plugin_reinitializes_plugins_toml_inside_active_event_loop(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
plugin = _fresh_plugin(monkeypatch, fake)
plugins_toml = tmp_path / "plugins.toml"
plugins_toml.write_text(
"""
version = 1
[[components]]
kind = "observability"
enabled = true
""",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml))
async def _drive() -> None:
plugin.on_session_start(session_id="s1")
plugin.on_session_finalize(session_id="s1", reason="shutdown")
plugin.on_session_start(session_id="s2")
await asyncio.sleep(0)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
asyncio.run(_drive())
gc.collect()
assert not any("was never awaited" in str(w.message) for w in caught)
runtime = plugin._get_runtime()
assert runtime is not None
assert runtime._plugin_config_initialized is True
scope_push_names = [event[1] for event in fake.events if event[0] == "scope.push"]
assert "hermes-session-s2" in scope_push_names
def test_nemo_relay_plugin_disables_direct_atif_when_plugins_toml_owns_atif(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
plugin = _fresh_plugin(monkeypatch, fake)
plugins_toml = tmp_path / "plugins.toml"
plugins_toml.write_text(
f"""
version = 1
[[components]]
kind = "observability"
enabled = true
[components.config.atif]
enabled = true
output_directory = "{(tmp_path / "managed-atif").as_posix()}"
""",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml))
monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1")
monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atif"))
plugin.on_session_start(session_id="s1")
plugin.on_session_finalize(session_id="s1", reason="shutdown")
event_names = [event[0] for event in fake.events]
assert "plugin.initialize" in event_names
assert "plugin.clear" in event_names
assert "atif.register" not in event_names
assert not (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists()
def test_nemo_relay_adaptive_llm_execution_middleware_preserves_raw_response(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
plugin = _fresh_plugin(monkeypatch, fake)