From 9d61076f88d9972cfed2fe6cf0ddd2c192fdf562 Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Sun, 7 Jun 2026 14:29:30 -0700 Subject: [PATCH 1/3] fix: flush plugin-config OpenInference when the final session closes Clear NeMo Relay plugin-config observability only after the last active Hermes session finalizes. Use the plugin's async-safe awaitable helper for both initialize and clear so session rotation remains safe under active event loops. Disable the direct ATIF fallback when plugins.toml already owns the ATIF exporter lifecycle to avoid duplicate trajectory export on finalization. --- plugins/observability/nemo_relay/README.md | 5 +- plugins/observability/nemo_relay/__init__.py | 45 +++++-- tests/plugins/test_nemo_relay_plugin.py | 126 ++++++++++++++++++- 3 files changed, 167 insertions(+), 9 deletions(-) diff --git a/plugins/observability/nemo_relay/README.md b/plugins/observability/nemo_relay/README.md index b5376696213..fa7a78a8568 100644 --- a/plugins/observability/nemo_relay/README.md +++ b/plugins/observability/nemo_relay/README.md @@ -163,7 +163,10 @@ agent_version = "local" When `HERMES_NEMO_RELAY_PLUGINS_TOML` is set and initializes successfully, NeMo Relay owns exporter lifecycle through that config. The direct -`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. +`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. If the same +`plugins.toml` observability config enables `atif`, the direct +`HERMES_NEMO_RELAY_ATIF_*` fallback setup is also skipped so Hermes does not +double-export trajectories on teardown. To enable NeMo Relay managed execution intercepts for provider and tool calls, include an adaptive component in the same `plugins.toml`: diff --git a/plugins/observability/nemo_relay/__init__.py b/plugins/observability/nemo_relay/__init__.py index cd1587fdab0..bc498e951ce 100644 --- a/plugins/observability/nemo_relay/__init__.py +++ b/plugins/observability/nemo_relay/__init__.py @@ -78,17 +78,22 @@ class _Runtime: return False try: self._ensure_plugin_config_output_dirs(self.settings.plugins_config) - result = initialize(self.settings.plugins_config) - if inspect.isawaitable(result): - asyncio.run(result) + _resolve_awaitable(initialize(self.settings.plugins_config)) return True - except RuntimeError: - logger.debug("NeMo Relay plugins.toml init skipped inside a running event loop") - return False except Exception as exc: logger.debug("NeMo Relay plugins.toml init failed: %s", exc, exc_info=True) return False + def _clear_plugins_toml(self) -> None: + if not self._plugin_config_initialized: + return + plugin_mod = getattr(self.nemo_relay, "plugin", None) + clear = getattr(plugin_mod, "clear", None) + if not callable(clear): + return + _resolve_awaitable(clear()) + self._plugin_config_initialized = False + def _ensure_plugin_config_output_dirs(self, config: dict[str, Any]) -> None: for component in config.get("components", []): if not isinstance(component, dict): @@ -124,6 +129,8 @@ class _Runtime: self.atof_exporter.register("hermes.nemo_relay.atof") def ensure_session(self, kwargs: dict[str, Any]) -> _SessionState: + if self.settings.plugins_config and not self._plugin_config_initialized: + self._plugin_config_initialized = self._configure_plugins_toml() session_id = _session_id(kwargs) state = self.sessions.get(session_id) if state is not None: @@ -189,6 +196,11 @@ class _Runtime: state.atif_exporter.deregister(state.atif_subscriber_name) except Exception: logger.debug("NeMo Relay ATIF deregister failed", exc_info=True) + if self._plugin_config_initialized and not self.sessions: + try: + self._clear_plugins_toml() + except Exception: + logger.debug("NeMo Relay plugins.toml clear failed", exc_info=True) def mark(self, name: str, kwargs: dict[str, Any]) -> None: state = self.ensure_session(kwargs) @@ -561,6 +573,12 @@ def _load_settings() -> _Settings: plugins_toml_path = _env("HERMES_NEMO_RELAY_PLUGINS_TOML") plugins_config = _load_plugins_config(plugins_toml_path) adaptive_config = _enabled_component_config(plugins_config, "adaptive") + atif_enabled = _env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED") + if atif_enabled and _observability_exporter_enabled(plugins_config, "atif"): + logger.debug( + "NeMo Relay direct ATIF fallback disabled because plugins.toml observability.atif owns exporter lifecycle" + ) + atif_enabled = False return _Settings( plugins_toml_path=plugins_toml_path, plugins_config=plugins_config, @@ -570,7 +588,7 @@ def _load_settings() -> _Settings: atof_output_directory=_env("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY"), atof_filename=_env("HERMES_NEMO_RELAY_ATOF_FILENAME") or "hermes-atof.jsonl", atof_mode=_env("HERMES_NEMO_RELAY_ATOF_MODE") or "append", - atif_enabled=_env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED"), + atif_enabled=atif_enabled, atif_output_directory=_env("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY"), atif_filename_template=_env("HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE") or "hermes-atif-{session_id}.json", atif_subagent_export_mode=_atif_subagent_export_mode(), @@ -618,6 +636,19 @@ def _adaptive_mode(config: dict[str, Any] | None) -> str: return "observe" +def _observability_exporter_enabled( + plugins_config: dict[str, Any] | None, + exporter_name: str, +) -> bool: + observability_config = _enabled_component_config(plugins_config, "observability") + if not isinstance(observability_config, dict): + return False + exporter_config = observability_config.get(exporter_name) + if not isinstance(exporter_config, dict): + return False + return exporter_config.get("enabled", True) is not False + + def _env(name: str) -> str: return os.environ.get(name, "").strip() diff --git a/tests/plugins/test_nemo_relay_plugin.py b/tests/plugins/test_nemo_relay_plugin.py index c4970bf2415..12a4d89e980 100644 --- a/tests/plugins/test_nemo_relay_plugin.py +++ b/tests/plugins/test_nemo_relay_plugin.py @@ -2,10 +2,13 @@ from __future__ import annotations +import asyncio import builtins +import gc import importlib import json import sys +import warnings from pathlib import Path from types import SimpleNamespace @@ -37,7 +40,7 @@ class _FakeNemoRelay: call_end=self._tool_call_end, execute=self._tool_execute, ) - self.plugin = SimpleNamespace(initialize=self._plugin_initialize) + self.plugin = SimpleNamespace(initialize=self._plugin_initialize, clear=self._plugin_clear) self.LLMRequest = _FakeLLMRequest self.AtofExporterConfig = _FakeAtofExporterConfig self.AtofExporterMode = SimpleNamespace(Append="append", Overwrite="overwrite") @@ -93,6 +96,9 @@ class _FakeNemoRelay: self.events.append(("plugin.initialize", config)) return {"diagnostics": []} + async def _plugin_clear(self): + self.events.append(("plugin.clear",)) + class _FakeLLMRequest: def __init__(self, headers, content): @@ -445,6 +451,124 @@ output_directory = "{atif_dir}" assert atif_dir.is_dir() +def test_nemo_relay_plugin_clears_plugins_toml_on_final_session_finalize_and_reinitializes(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize") == 2 + assert event_names.count("plugin.clear") == 1 + + +def test_nemo_relay_plugin_keeps_plugins_toml_active_while_other_sessions_remain(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="parent") + plugin.on_session_start(session_id="child") + plugin.on_session_finalize(session_id="child", reason="shutdown") + plugin.on_session_finalize(session_id="parent", reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize") == 1 + assert event_names.count("plugin.clear") == 1 + + +def test_nemo_relay_plugin_reinitializes_plugins_toml_inside_active_event_loop(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + async def _drive() -> None: + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + await asyncio.sleep(0) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + asyncio.run(_drive()) + gc.collect() + + assert not any("was never awaited" in str(w.message) for w in caught) + runtime = plugin._get_runtime() + assert runtime is not None + assert runtime._plugin_config_initialized is True + scope_push_names = [event[1] for event in fake.events if event[0] == "scope.push"] + assert "hermes-session-s2" in scope_push_names + + +def test_nemo_relay_plugin_disables_direct_atif_when_plugins_toml_owns_atif(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atif] +enabled = true +output_directory = "{(tmp_path / "managed-atif").as_posix()}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atif")) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert "plugin.initialize" in event_names + assert "plugin.clear" in event_names + assert "atif.register" not in event_names + assert not (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists() + + def test_nemo_relay_adaptive_llm_execution_middleware_preserves_raw_response(tmp_path, monkeypatch): fake = _FakeNemoRelay() plugin = _fresh_plugin(monkeypatch, fake) From ecd4679d8cd23f3565f68a3e7af1e3f030018ced Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Sun, 7 Jun 2026 17:27:31 -0700 Subject: [PATCH 2/3] fix(observability): preserve direct fallback until plugin-config init succeeds Signed-off-by: mnajafian-nv --- plugins/observability/nemo_relay/README.md | 3 +- plugins/observability/nemo_relay/__init__.py | 55 +++++++++--- tests/plugins/test_nemo_relay_plugin.py | 91 ++++++++++++++++++++ 3 files changed, 135 insertions(+), 14 deletions(-) diff --git a/plugins/observability/nemo_relay/README.md b/plugins/observability/nemo_relay/README.md index fa7a78a8568..7e0604205d0 100644 --- a/plugins/observability/nemo_relay/README.md +++ b/plugins/observability/nemo_relay/README.md @@ -166,7 +166,8 @@ Relay owns exporter lifecycle through that config. The direct `HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. If the same `plugins.toml` observability config enables `atif`, the direct `HERMES_NEMO_RELAY_ATIF_*` fallback setup is also skipped so Hermes does not -double-export trajectories on teardown. +double-export trajectories on teardown. If `plugins.toml` initialization fails, +Hermes keeps the direct env-var fallbacks active for that run. To enable NeMo Relay managed execution intercepts for provider and tool calls, include an adaptive component in the same `plugins.toml`: diff --git a/plugins/observability/nemo_relay/__init__.py b/plugins/observability/nemo_relay/__init__.py index bc498e951ce..0f403515112 100644 --- a/plugins/observability/nemo_relay/__init__.py +++ b/plugins/observability/nemo_relay/__init__.py @@ -65,9 +65,11 @@ class _Runtime: self.sessions: dict[str, _SessionState] = {} self.subagent_parents: dict[str, _SubagentParent] = {} self.atof_exporter: Any = None + self._atof_subscriber_name = "hermes.nemo_relay.atof" self._plugin_config_initialized = self._configure_plugins_toml() + self._plugin_config_needs_reinit = False if not self._plugin_config_initialized: - self._configure_atof() + self._activate_direct_fallbacks() def _configure_plugins_toml(self) -> bool: if not self.settings.plugins_config: @@ -93,6 +95,27 @@ class _Runtime: return _resolve_awaitable(clear()) self._plugin_config_initialized = False + self._plugin_config_needs_reinit = bool(self.settings.plugins_config) + + def _activate_direct_fallbacks(self) -> None: + self._plugin_config_needs_reinit = False + self._configure_atof() + + def _maybe_reinitialize_plugins_toml(self) -> None: + if not self._plugin_config_needs_reinit or self._plugin_config_initialized: + return + self._plugin_config_initialized = self._configure_plugins_toml() + if not self._plugin_config_initialized: + self._activate_direct_fallbacks() + return + self._clear_atof() + self._plugin_config_needs_reinit = False + + def _plugins_toml_owns_exporter(self, exporter_name: str) -> bool: + return self._plugin_config_initialized and _observability_exporter_enabled( + self.settings.plugins_config, + exporter_name, + ) def _ensure_plugin_config_output_dirs(self, config: dict[str, Any]) -> None: for component in config.get("components", []): @@ -114,7 +137,7 @@ class _Runtime: Path(output_directory).mkdir(parents=True, exist_ok=True) def _configure_atof(self) -> None: - if not self.settings.atof_enabled: + if not self.settings.atof_enabled or self.atof_exporter is not None: return config = self.nemo_relay.AtofExporterConfig() if self.settings.atof_output_directory: @@ -126,18 +149,28 @@ class _Runtime: else: config.mode = self.nemo_relay.AtofExporterMode.Append self.atof_exporter = self.nemo_relay.AtofExporter(config) - self.atof_exporter.register("hermes.nemo_relay.atof") + self.atof_exporter.register(self._atof_subscriber_name) + + def _clear_atof(self) -> None: + if self.atof_exporter is None: + return + deregister = getattr(self.atof_exporter, "deregister", None) + if callable(deregister): + try: + deregister(self._atof_subscriber_name) + except Exception: + logger.debug("NeMo Relay ATOF deregister failed", exc_info=True) + self.atof_exporter = None def ensure_session(self, kwargs: dict[str, Any]) -> _SessionState: - if self.settings.plugins_config and not self._plugin_config_initialized: - self._plugin_config_initialized = self._configure_plugins_toml() + self._maybe_reinitialize_plugins_toml() session_id = _session_id(kwargs) state = self.sessions.get(session_id) if state is not None: return state state = _SessionState(session_id=session_id) - if self.settings.atif_enabled: + if self.settings.atif_enabled and not self._plugins_toml_owns_exporter("atif"): state.atif_exporter = self.nemo_relay.AtifExporter( session_id, self.settings.atif_agent_name, @@ -201,6 +234,8 @@ class _Runtime: self._clear_plugins_toml() except Exception: logger.debug("NeMo Relay plugins.toml clear failed", exc_info=True) + elif self.settings.plugins_config and not self.sessions: + self._plugin_config_needs_reinit = True def mark(self, name: str, kwargs: dict[str, Any]) -> None: state = self.ensure_session(kwargs) @@ -573,12 +608,6 @@ def _load_settings() -> _Settings: plugins_toml_path = _env("HERMES_NEMO_RELAY_PLUGINS_TOML") plugins_config = _load_plugins_config(plugins_toml_path) adaptive_config = _enabled_component_config(plugins_config, "adaptive") - atif_enabled = _env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED") - if atif_enabled and _observability_exporter_enabled(plugins_config, "atif"): - logger.debug( - "NeMo Relay direct ATIF fallback disabled because plugins.toml observability.atif owns exporter lifecycle" - ) - atif_enabled = False return _Settings( plugins_toml_path=plugins_toml_path, plugins_config=plugins_config, @@ -588,7 +617,7 @@ def _load_settings() -> _Settings: atof_output_directory=_env("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY"), atof_filename=_env("HERMES_NEMO_RELAY_ATOF_FILENAME") or "hermes-atof.jsonl", atof_mode=_env("HERMES_NEMO_RELAY_ATOF_MODE") or "append", - atif_enabled=atif_enabled, + atif_enabled=_env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED"), atif_output_directory=_env("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY"), atif_filename_template=_env("HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE") or "hermes-atif-{session_id}.json", atif_subagent_export_mode=_atif_subagent_export_mode(), diff --git a/tests/plugins/test_nemo_relay_plugin.py b/tests/plugins/test_nemo_relay_plugin.py index 12a4d89e980..229695a11ef 100644 --- a/tests/plugins/test_nemo_relay_plugin.py +++ b/tests/plugins/test_nemo_relay_plugin.py @@ -121,6 +121,10 @@ class _FakeAtofExporter: def register(self, name): self.events.append(("atof.register", name, self.config.output_directory, self.config.filename)) + def deregister(self, name): + self.events.append(("atof.deregister", name, self.config.output_directory, self.config.filename)) + return True + class _FakeAtifExporter: def __init__(self, events, session_id, agent_name, agent_version, kwargs): @@ -569,6 +573,93 @@ output_directory = "{(tmp_path / "managed-atif").as_posix()}" assert not (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists() +def test_nemo_relay_plugin_keeps_direct_atif_when_plugins_toml_init_fails(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + + async def _failing_initialize(config): + fake.events.append(("plugin.initialize.failed", config)) + raise RuntimeError("boom") + + fake.plugin.initialize = _failing_initialize + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atif] +enabled = true +output_directory = "{(tmp_path / "managed-atif").as_posix()}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atif")) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert "plugin.initialize.failed" in event_names + assert "plugin.clear" not in event_names + assert "atif.register" in event_names + assert (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists() + + +def test_nemo_relay_plugin_retries_plugins_toml_after_fallback_only_session_and_clears_direct_atof( + tmp_path, + monkeypatch, +): + fake = _FakeNemoRelay() + initialize_calls = 0 + + async def _flaky_initialize(config): + nonlocal initialize_calls + initialize_calls += 1 + fake.events.append(("plugin.initialize.attempt", initialize_calls, config)) + if initialize_calls == 1: + raise RuntimeError("boom") + return {"diagnostics": []} + + fake.plugin.initialize = _flaky_initialize + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atof] +enabled = true +output_directory = "{(tmp_path / "managed-atof").as_posix()}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atof")) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + + runtime = plugin._get_runtime() + assert runtime is not None + assert runtime._plugin_config_initialized is True + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize.attempt") == 2 + assert event_names.count("atof.register") == 1 + assert event_names.count("atof.deregister") == 1 + + def test_nemo_relay_adaptive_llm_execution_middleware_preserves_raw_response(tmp_path, monkeypatch): fake = _FakeNemoRelay() plugin = _fresh_plugin(monkeypatch, fake) From 728612c29c9d96b375363cc689f987b6434833be Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Mon, 8 Jun 2026 07:50:10 -0700 Subject: [PATCH 3/3] fix(observability): recover after plugin-config clear failure Ensure failed plugin-config clear operations still re-arm managed reinitialization on the next Hermes session. Add focused regression coverage for successful init, failed final-session clear, and next-session recovery. Signed-off-by: mnajafian-nv --- plugins/observability/nemo_relay/__init__.py | 8 ++-- tests/plugins/test_nemo_relay_plugin.py | 41 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/plugins/observability/nemo_relay/__init__.py b/plugins/observability/nemo_relay/__init__.py index 0f403515112..e86573d56d3 100644 --- a/plugins/observability/nemo_relay/__init__.py +++ b/plugins/observability/nemo_relay/__init__.py @@ -93,9 +93,11 @@ class _Runtime: clear = getattr(plugin_mod, "clear", None) if not callable(clear): return - _resolve_awaitable(clear()) - self._plugin_config_initialized = False - self._plugin_config_needs_reinit = bool(self.settings.plugins_config) + try: + _resolve_awaitable(clear()) + finally: + self._plugin_config_initialized = False + self._plugin_config_needs_reinit = bool(self.settings.plugins_config) def _activate_direct_fallbacks(self) -> None: self._plugin_config_needs_reinit = False diff --git a/tests/plugins/test_nemo_relay_plugin.py b/tests/plugins/test_nemo_relay_plugin.py index 229695a11ef..953b6043b3e 100644 --- a/tests/plugins/test_nemo_relay_plugin.py +++ b/tests/plugins/test_nemo_relay_plugin.py @@ -541,6 +541,47 @@ enabled = true assert "hermes-session-s2" in scope_push_names +def test_nemo_relay_plugin_retries_plugins_toml_after_clear_failure(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + initialize_calls = 0 + + async def _counting_initialize(config): + nonlocal initialize_calls + initialize_calls += 1 + fake.events.append(("plugin.initialize.attempt", initialize_calls, config)) + return {"diagnostics": []} + + async def _failing_clear(): + fake.events.append(("plugin.clear.failed",)) + raise RuntimeError("boom") + + fake.plugin.initialize = _counting_initialize + fake.plugin.clear = _failing_clear + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + plugins_toml.write_text( + """ +version = 1 + +[[components]] +kind = "observability" +enabled = true +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="s1") + plugin.on_session_finalize(session_id="s1", reason="shutdown") + plugin.on_session_start(session_id="s2") + + event_names = [event[0] for event in fake.events] + assert event_names.count("plugin.initialize.attempt") == 2 + assert event_names.count("plugin.clear.failed") == 1 + scope_push_names = [event[1] for event in fake.events if event[0] == "scope.push"] + assert "hermes-session-s2" in scope_push_names + + def test_nemo_relay_plugin_disables_direct_atif_when_plugins_toml_owns_atif(tmp_path, monkeypatch): fake = _FakeNemoRelay() plugin = _fresh_plugin(monkeypatch, fake)