diff --git a/plugins/observability/nemo_relay/README.md b/plugins/observability/nemo_relay/README.md index b5376696213..7e0604205d0 100644 --- a/plugins/observability/nemo_relay/README.md +++ b/plugins/observability/nemo_relay/README.md @@ -163,7 +163,11 @@ agent_version = "local" When `HERMES_NEMO_RELAY_PLUGINS_TOML` is set and initializes successfully, NeMo Relay owns exporter lifecycle through that config. The direct -`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. +`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. If the same +`plugins.toml` observability config enables `atif`, the direct +`HERMES_NEMO_RELAY_ATIF_*` fallback setup is also skipped so Hermes does not +double-export trajectories on teardown. If `plugins.toml` initialization fails, +Hermes keeps the direct env-var fallbacks active for that run. To enable NeMo Relay managed execution intercepts for provider and tool calls, include an adaptive component in the same `plugins.toml`: diff --git a/plugins/observability/nemo_relay/__init__.py b/plugins/observability/nemo_relay/__init__.py index cd1587fdab0..e86573d56d3 100644 --- a/plugins/observability/nemo_relay/__init__.py +++ b/plugins/observability/nemo_relay/__init__.py @@ -65,9 +65,11 @@ class _Runtime: self.sessions: dict[str, _SessionState] = {} self.subagent_parents: dict[str, _SubagentParent] = {} self.atof_exporter: Any = None + self._atof_subscriber_name = "hermes.nemo_relay.atof" self._plugin_config_initialized = self._configure_plugins_toml() + self._plugin_config_needs_reinit = False if not self._plugin_config_initialized: - self._configure_atof() + self._activate_direct_fallbacks() def _configure_plugins_toml(self) -> bool: if not self.settings.plugins_config: @@ -78,17 +80,45 @@ class _Runtime: return False try: self._ensure_plugin_config_output_dirs(self.settings.plugins_config) - result = initialize(self.settings.plugins_config) - if inspect.isawaitable(result): - asyncio.run(result) + _resolve_awaitable(initialize(self.settings.plugins_config)) return True - except RuntimeError: - logger.debug("NeMo Relay plugins.toml init skipped inside a running event loop") - return False except Exception as exc: logger.debug("NeMo Relay plugins.toml init failed: %s", exc, exc_info=True) return False + def _clear_plugins_toml(self) -> None: + if not self._plugin_config_initialized: + return + plugin_mod = getattr(self.nemo_relay, "plugin", None) + clear = getattr(plugin_mod, "clear", None) + if not callable(clear): + return + try: + _resolve_awaitable(clear()) + finally: + self._plugin_config_initialized = False + self._plugin_config_needs_reinit = bool(self.settings.plugins_config) + + def _activate_direct_fallbacks(self) -> None: + self._plugin_config_needs_reinit = False + self._configure_atof() + + def _maybe_reinitialize_plugins_toml(self) -> None: + if not self._plugin_config_needs_reinit or self._plugin_config_initialized: + return + self._plugin_config_initialized = self._configure_plugins_toml() + if not self._plugin_config_initialized: + self._activate_direct_fallbacks() + return + self._clear_atof() + self._plugin_config_needs_reinit = False + + def _plugins_toml_owns_exporter(self, exporter_name: str) -> bool: + return self._plugin_config_initialized and _observability_exporter_enabled( + self.settings.plugins_config, + exporter_name, + ) + def _ensure_plugin_config_output_dirs(self, config: dict[str, Any]) -> None: for component in config.get("components", []): if not isinstance(component, dict): @@ -109,7 +139,7 @@ class _Runtime: Path(output_directory).mkdir(parents=True, exist_ok=True) def _configure_atof(self) -> None: - if not self.settings.atof_enabled: + if not self.settings.atof_enabled or self.atof_exporter is not None: return config = self.nemo_relay.AtofExporterConfig() if self.settings.atof_output_directory: @@ -121,16 +151,28 @@ class _Runtime: else: config.mode = self.nemo_relay.AtofExporterMode.Append self.atof_exporter = self.nemo_relay.AtofExporter(config) - self.atof_exporter.register("hermes.nemo_relay.atof") + self.atof_exporter.register(self._atof_subscriber_name) + + def _clear_atof(self) -> None: + if self.atof_exporter is None: + return + deregister = getattr(self.atof_exporter, "deregister", None) + if callable(deregister): + try: + deregister(self._atof_subscriber_name) + except Exception: + logger.debug("NeMo Relay ATOF deregister failed", exc_info=True) + self.atof_exporter = None def ensure_session(self, kwargs: dict[str, Any]) -> _SessionState: + self._maybe_reinitialize_plugins_toml() session_id = _session_id(kwargs) state = self.sessions.get(session_id) if state is not None: return state state = _SessionState(session_id=session_id) - if self.settings.atif_enabled: + if self.settings.atif_enabled and not self._plugins_toml_owns_exporter("atif"): state.atif_exporter = self.nemo_relay.AtifExporter( session_id, self.settings.atif_agent_name, @@ -189,6 +231,13 @@ class _Runtime: state.atif_exporter.deregister(state.atif_subscriber_name) except Exception: logger.debug("NeMo Relay ATIF deregister failed", exc_info=True) + if self._plugin_config_initialized and not self.sessions: + try: + self._clear_plugins_toml() + except Exception: + logger.debug("NeMo Relay plugins.toml clear failed", exc_info=True) + elif self.settings.plugins_config and not self.sessions: + self._plugin_config_needs_reinit = True def mark(self, name: str, kwargs: dict[str, Any]) -> None: state = self.ensure_session(kwargs) @@ -618,6 +667,19 @@ def _adaptive_mode(config: dict[str, Any] | None) -> str: return "observe" +def _observability_exporter_enabled( + plugins_config: dict[str, Any] | None, + exporter_name: str, +) -> bool: + observability_config = _enabled_component_config(plugins_config, "observability") + if not isinstance(observability_config, dict): + return False + exporter_config = observability_config.get(exporter_name) + if not isinstance(exporter_config, dict): + return False + return exporter_config.get("enabled", True) is not False + + def _env(name: str) -> str: return os.environ.get(name, "").strip() diff --git a/tests/plugins/test_nemo_relay_plugin.py b/tests/plugins/test_nemo_relay_plugin.py index c4970bf2415..953b6043b3e 100644 --- a/tests/plugins/test_nemo_relay_plugin.py +++ b/tests/plugins/test_nemo_relay_plugin.py @@ -2,10 +2,13 @@ from __future__ import annotations +import asyncio import builtins +import gc import importlib import json import sys +import warnings from pathlib import Path from types import SimpleNamespace @@ -37,7 +40,7 @@ class _FakeNemoRelay: call_end=self._tool_call_end, execute=self._tool_execute, ) - self.plugin = SimpleNamespace(initialize=self._plugin_initialize) + self.plugin = SimpleNamespace(initialize=self._plugin_initialize, clear=self._plugin_clear) self.LLMRequest = _FakeLLMRequest self.AtofExporterConfig = _FakeAtofExporterConfig self.AtofExporterMode = SimpleNamespace(Append="append", Overwrite="overwrite") @@ -93,6 +96,9 @@ class _FakeNemoRelay: self.events.append(("plugin.initialize", config)) return {"diagnostics": []} + async def _plugin_clear(self): + self.events.append(("plugin.clear",)) + class _FakeLLMRequest: def __init__(self, headers, content): @@ -115,6 +121,10 @@ class _FakeAtofExporter: def register(self, name): self.events.append(("atof.register", name, self.config.output_directory, self.config.filename)) + def deregister(self, name): + self.events.append(("atof.deregister", name, self.config.output_directory, self.config.filename)) + return True + class _FakeAtifExporter: def __init__(self, events, session_id, agent_name, agent_version, kwargs): @@ -445,6 +455,252 @@ output_directory = "{atif_dir}" assert atif_dir.is_dir() +def test_nemo_relay_plugin_clears_plugins_toml_on_final_session_finalize_and_reinitializes(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize") == 2 + assert event_names.count("plugin.clear") == 1 + + +def test_nemo_relay_plugin_keeps_plugins_toml_active_while_other_sessions_remain(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="parent") + plugin.on_session_start(session_id="child") + plugin.on_session_finalize(session_id="child", reason="shutdown") + plugin.on_session_finalize(session_id="parent", reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize") == 1 + assert event_names.count("plugin.clear") == 1 + + +def test_nemo_relay_plugin_reinitializes_plugins_toml_inside_active_event_loop(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + async def _drive() -> None: + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + await asyncio.sleep(0) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + asyncio.run(_drive()) + gc.collect() + + assert not any("was never awaited" in str(w.message) for w in caught) + runtime = plugin._get_runtime() + assert runtime is not None + assert runtime._plugin_config_initialized is True + scope_push_names = [event[1] for event in fake.events if event[0] == "scope.push"] + assert "hermes-session-s2" in scope_push_names + + +def test_nemo_relay_plugin_retries_plugins_toml_after_clear_failure(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + initialize_calls = 0 + + async def _counting_initialize(config): + nonlocal initialize_calls + initialize_calls += 1 + fake.events.append(("plugin.initialize.attempt", initialize_calls, config)) + return {"diagnostics": []} + + async def _failing_clear(): + fake.events.append(("plugin.clear.failed",)) + raise RuntimeError("boom") + + fake.plugin.initialize = _counting_initialize + fake.plugin.clear = _failing_clear + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize.attempt") == 2 + assert event_names.count("plugin.clear.failed") == 1 + scope_push_names = [event[1] for event in fake.events if event[0] == "scope.push"] + assert "hermes-session-s2" in scope_push_names + + +def test_nemo_relay_plugin_disables_direct_atif_when_plugins_toml_owns_atif(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atif] +enabled = true +output_directory = "{(tmp_path / "managed-atif").as_posix()}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atif")) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert "plugin.initialize" in event_names + assert "plugin.clear" in event_names + assert "atif.register" not in event_names + assert not (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists() + + +def test_nemo_relay_plugin_keeps_direct_atif_when_plugins_toml_init_fails(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + + async def _failing_initialize(config): + fake.events.append(("plugin.initialize.failed", config)) + raise RuntimeError("boom") + + fake.plugin.initialize = _failing_initialize + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atif] +enabled = true +output_directory = "{(tmp_path / "managed-atif").as_posix()}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atif")) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert "plugin.initialize.failed" in event_names + assert "plugin.clear" not in event_names + assert "atif.register" in event_names + assert (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists() + + +def test_nemo_relay_plugin_retries_plugins_toml_after_fallback_only_session_and_clears_direct_atof( + tmp_path, + monkeypatch, +): + fake = _FakeNemoRelay() + initialize_calls = 0 + + async def _flaky_initialize(config): + nonlocal initialize_calls + initialize_calls += 1 + fake.events.append(("plugin.initialize.attempt", initialize_calls, config)) + if initialize_calls == 1: + raise RuntimeError("boom") + return {"diagnostics": []} + + fake.plugin.initialize = _flaky_initialize + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atof] +enabled = true +output_directory = "{(tmp_path / "managed-atof").as_posix()}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atof")) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + + runtime = plugin._get_runtime() + assert runtime is not None + assert runtime._plugin_config_initialized is True + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize.attempt") == 2 + assert event_names.count("atof.register") == 1 + assert event_names.count("atof.deregister") == 1 + + def test_nemo_relay_adaptive_llm_execution_middleware_preserves_raw_response(tmp_path, monkeypatch): fake = _FakeNemoRelay() plugin = _fresh_plugin(monkeypatch, fake)