fix(observability): preserve direct fallback until plugin-config init succeeds

Signed-off-by: mnajafian-nv <mnajafian@nvidia.com>
This commit is contained in:
mnajafian-nv 2026-06-07 17:27:31 -07:00
parent 9d61076f88
commit ecd4679d8c
No known key found for this signature in database
GPG key ID: C0C3EEEE9FB11E38
3 changed files with 135 additions and 14 deletions

View file

@ -166,7 +166,8 @@ Relay owns exporter lifecycle through that config. The direct
`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. If the same
`plugins.toml` observability config enables `atif`, the direct
`HERMES_NEMO_RELAY_ATIF_*` fallback setup is also skipped so Hermes does not
double-export trajectories on teardown.
double-export trajectories on teardown. If `plugins.toml` initialization fails,
Hermes keeps the direct env-var fallbacks active for that run.
To enable NeMo Relay managed execution intercepts for provider and tool calls,
include an adaptive component in the same `plugins.toml`:

View file

@ -65,9 +65,11 @@ class _Runtime:
self.sessions: dict[str, _SessionState] = {}
self.subagent_parents: dict[str, _SubagentParent] = {}
self.atof_exporter: Any = None
self._atof_subscriber_name = "hermes.nemo_relay.atof"
self._plugin_config_initialized = self._configure_plugins_toml()
self._plugin_config_needs_reinit = False
if not self._plugin_config_initialized:
self._configure_atof()
self._activate_direct_fallbacks()
def _configure_plugins_toml(self) -> bool:
if not self.settings.plugins_config:
@ -93,6 +95,27 @@ class _Runtime:
return
_resolve_awaitable(clear())
self._plugin_config_initialized = False
self._plugin_config_needs_reinit = bool(self.settings.plugins_config)
def _activate_direct_fallbacks(self) -> None:
self._plugin_config_needs_reinit = False
self._configure_atof()
def _maybe_reinitialize_plugins_toml(self) -> None:
if not self._plugin_config_needs_reinit or self._plugin_config_initialized:
return
self._plugin_config_initialized = self._configure_plugins_toml()
if not self._plugin_config_initialized:
self._activate_direct_fallbacks()
return
self._clear_atof()
self._plugin_config_needs_reinit = False
def _plugins_toml_owns_exporter(self, exporter_name: str) -> bool:
return self._plugin_config_initialized and _observability_exporter_enabled(
self.settings.plugins_config,
exporter_name,
)
def _ensure_plugin_config_output_dirs(self, config: dict[str, Any]) -> None:
for component in config.get("components", []):
@ -114,7 +137,7 @@ class _Runtime:
Path(output_directory).mkdir(parents=True, exist_ok=True)
def _configure_atof(self) -> None:
if not self.settings.atof_enabled:
if not self.settings.atof_enabled or self.atof_exporter is not None:
return
config = self.nemo_relay.AtofExporterConfig()
if self.settings.atof_output_directory:
@ -126,18 +149,28 @@ class _Runtime:
else:
config.mode = self.nemo_relay.AtofExporterMode.Append
self.atof_exporter = self.nemo_relay.AtofExporter(config)
self.atof_exporter.register("hermes.nemo_relay.atof")
self.atof_exporter.register(self._atof_subscriber_name)
def _clear_atof(self) -> None:
if self.atof_exporter is None:
return
deregister = getattr(self.atof_exporter, "deregister", None)
if callable(deregister):
try:
deregister(self._atof_subscriber_name)
except Exception:
logger.debug("NeMo Relay ATOF deregister failed", exc_info=True)
self.atof_exporter = None
def ensure_session(self, kwargs: dict[str, Any]) -> _SessionState:
if self.settings.plugins_config and not self._plugin_config_initialized:
self._plugin_config_initialized = self._configure_plugins_toml()
self._maybe_reinitialize_plugins_toml()
session_id = _session_id(kwargs)
state = self.sessions.get(session_id)
if state is not None:
return state
state = _SessionState(session_id=session_id)
if self.settings.atif_enabled:
if self.settings.atif_enabled and not self._plugins_toml_owns_exporter("atif"):
state.atif_exporter = self.nemo_relay.AtifExporter(
session_id,
self.settings.atif_agent_name,
@ -201,6 +234,8 @@ class _Runtime:
self._clear_plugins_toml()
except Exception:
logger.debug("NeMo Relay plugins.toml clear failed", exc_info=True)
elif self.settings.plugins_config and not self.sessions:
self._plugin_config_needs_reinit = True
def mark(self, name: str, kwargs: dict[str, Any]) -> None:
state = self.ensure_session(kwargs)
@ -573,12 +608,6 @@ def _load_settings() -> _Settings:
plugins_toml_path = _env("HERMES_NEMO_RELAY_PLUGINS_TOML")
plugins_config = _load_plugins_config(plugins_toml_path)
adaptive_config = _enabled_component_config(plugins_config, "adaptive")
atif_enabled = _env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED")
if atif_enabled and _observability_exporter_enabled(plugins_config, "atif"):
logger.debug(
"NeMo Relay direct ATIF fallback disabled because plugins.toml observability.atif owns exporter lifecycle"
)
atif_enabled = False
return _Settings(
plugins_toml_path=plugins_toml_path,
plugins_config=plugins_config,
@ -588,7 +617,7 @@ def _load_settings() -> _Settings:
atof_output_directory=_env("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY"),
atof_filename=_env("HERMES_NEMO_RELAY_ATOF_FILENAME") or "hermes-atof.jsonl",
atof_mode=_env("HERMES_NEMO_RELAY_ATOF_MODE") or "append",
atif_enabled=atif_enabled,
atif_enabled=_env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED"),
atif_output_directory=_env("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY"),
atif_filename_template=_env("HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE") or "hermes-atif-{session_id}.json",
atif_subagent_export_mode=_atif_subagent_export_mode(),

View file

@ -121,6 +121,10 @@ class _FakeAtofExporter:
def register(self, name):
self.events.append(("atof.register", name, self.config.output_directory, self.config.filename))
def deregister(self, name):
self.events.append(("atof.deregister", name, self.config.output_directory, self.config.filename))
return True
class _FakeAtifExporter:
def __init__(self, events, session_id, agent_name, agent_version, kwargs):
@ -569,6 +573,93 @@ output_directory = "{(tmp_path / "managed-atif").as_posix()}"
assert not (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists()
def test_nemo_relay_plugin_keeps_direct_atif_when_plugins_toml_init_fails(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
async def _failing_initialize(config):
fake.events.append(("plugin.initialize.failed", config))
raise RuntimeError("boom")
fake.plugin.initialize = _failing_initialize
plugin = _fresh_plugin(monkeypatch, fake)
plugins_toml = tmp_path / "plugins.toml"
plugins_toml.write_text(
f"""
version = 1
[[components]]
kind = "observability"
enabled = true
[components.config.atif]
enabled = true
output_directory = "{(tmp_path / "managed-atif").as_posix()}"
""",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml))
monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1")
monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atif"))
plugin.on_session_start(session_id="s1")
plugin.on_session_finalize(session_id="s1", reason="shutdown")
event_names = [event[0] for event in fake.events]
assert "plugin.initialize.failed" in event_names
assert "plugin.clear" not in event_names
assert "atif.register" in event_names
assert (tmp_path / "direct-atif" / "hermes-atif-s1.json").exists()
def test_nemo_relay_plugin_retries_plugins_toml_after_fallback_only_session_and_clears_direct_atof(
tmp_path,
monkeypatch,
):
fake = _FakeNemoRelay()
initialize_calls = 0
async def _flaky_initialize(config):
nonlocal initialize_calls
initialize_calls += 1
fake.events.append(("plugin.initialize.attempt", initialize_calls, config))
if initialize_calls == 1:
raise RuntimeError("boom")
return {"diagnostics": []}
fake.plugin.initialize = _flaky_initialize
plugin = _fresh_plugin(monkeypatch, fake)
plugins_toml = tmp_path / "plugins.toml"
plugins_toml.write_text(
f"""
version = 1
[[components]]
kind = "observability"
enabled = true
[components.config.atof]
enabled = true
output_directory = "{(tmp_path / "managed-atof").as_posix()}"
""",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml))
monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_ENABLED", "1")
monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY", str(tmp_path / "direct-atof"))
plugin.on_session_start(session_id="s1")
plugin.on_session_finalize(session_id="s1", reason="shutdown")
plugin.on_session_start(session_id="s2")
runtime = plugin._get_runtime()
assert runtime is not None
assert runtime._plugin_config_initialized is True
event_names = [event[0] for event in fake.events]
assert event_names.count("plugin.initialize.attempt") == 2
assert event_names.count("atof.register") == 1
assert event_names.count("atof.deregister") == 1
def test_nemo_relay_adaptive_llm_execution_middleware_preserves_raw_response(tmp_path, monkeypatch):
fake = _FakeNemoRelay()
plugin = _fresh_plugin(monkeypatch, fake)