mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): guard session slot ownership after stop/reset
Closes the runner-side half of the split-brain described in issue #11016 by wiring the existing _session_run_generation counter through the session-slot promotion and release paths. Without this, an older async run could still: - promote itself from sentinel to real agent after /stop or /new invalidated its run generation - clear _running_agents on the way out, deleting a newer run's slot Both races leave _running_agents desynced from what the user actually has in flight, which is half of what shows up as 'No active task to stop' followed by late 'Interrupting current task...' acks. Changes: - track_agent() in _run_agent now calls _is_session_run_current() before writing the real agent into _running_agents[session_key]; if /stop or /new bumped the generation while the agent was spinning up, the slot is left alone (the newer run owns it). - _release_running_agent_state() gained an optional run_generation keyword. When provided, it only clears the slot if the generation is still current. The final cleanup at the tail of _run_agent passes the run's generation so an old unwind can't blow away a newer run's state. - Returns bool so callers can tell when a release was blocked. All the existing call sites that do NOT pass run_generation behave exactly as before — this is a strict additive guard. Refs #11016
This commit is contained in:
parent
d72985b7ce
commit
b7bdf32d4e
1 changed files with 45 additions and 7 deletions
|
|
@ -8665,7 +8665,12 @@ class GatewayRunner:
|
|||
override = self._session_model_overrides.get(session_key)
|
||||
return override is not None and override.get("model") == agent_model
|
||||
|
||||
def _release_running_agent_state(self, session_key: str) -> None:
|
||||
def _release_running_agent_state(
|
||||
self,
|
||||
session_key: str,
|
||||
*,
|
||||
run_generation: Optional[int] = None,
|
||||
) -> bool:
|
||||
"""Pop ALL per-running-agent state entries for ``session_key``.
|
||||
|
||||
Replaces ad-hoc ``del self._running_agents[key]`` calls scattered
|
||||
|
|
@ -8681,13 +8686,25 @@ class GatewayRunner:
|
|||
across turns (``_session_model_overrides``, ``_voice_mode``,
|
||||
``_pending_approvals``, ``_update_prompt_pending``) is NOT
|
||||
touched here — those have their own lifecycles.
|
||||
|
||||
When ``run_generation`` is provided, only clear the slot if that
|
||||
generation is still current for the session. This prevents an
|
||||
older async run whose generation was bumped by /stop or /new from
|
||||
clobbering a newer run's state during its own unwind. Returns
|
||||
True when the slot was cleared, False when an ownership guard
|
||||
blocked it.
|
||||
"""
|
||||
if not session_key:
|
||||
return
|
||||
return False
|
||||
if run_generation is not None and not self._is_session_run_current(
|
||||
session_key, run_generation
|
||||
):
|
||||
return False
|
||||
self._running_agents.pop(session_key, None)
|
||||
self._running_agents_ts.pop(session_key, None)
|
||||
if hasattr(self, "_busy_ack_ts"):
|
||||
self._busy_ack_ts.pop(session_key, None)
|
||||
return True
|
||||
|
||||
def _clear_session_boundary_security_state(self, session_key: str) -> None:
|
||||
"""Clear approval state that must not survive a real conversation switch."""
|
||||
|
|
@ -10249,10 +10266,24 @@ class GatewayRunner:
|
|||
# Wait for agent to be created
|
||||
while agent_holder[0] is None:
|
||||
await asyncio.sleep(0.05)
|
||||
if session_key:
|
||||
self._running_agents[session_key] = agent_holder[0]
|
||||
if self._draining:
|
||||
self._update_runtime_status("draining")
|
||||
if not session_key:
|
||||
return
|
||||
# Only promote the sentinel to the real agent if this run is still
|
||||
# current. If /stop or /new bumped the generation while we were
|
||||
# spinning up, leave the newer run's slot alone — we'll be
|
||||
# discarded by the stale-result check in _handle_message_with_agent.
|
||||
if run_generation is not None and not self._is_session_run_current(
|
||||
session_key, run_generation
|
||||
):
|
||||
logger.info(
|
||||
"Skipping stale agent promotion for %s — generation %s is no longer current",
|
||||
(session_key or "")[:20],
|
||||
run_generation,
|
||||
)
|
||||
return
|
||||
self._running_agents[session_key] = agent_holder[0]
|
||||
if self._draining:
|
||||
self._update_runtime_status("draining")
|
||||
|
||||
tracking_task = asyncio.create_task(track_agent())
|
||||
|
||||
|
|
@ -10758,7 +10789,14 @@ class GatewayRunner:
|
|||
# Clean up tracking
|
||||
tracking_task.cancel()
|
||||
if session_key:
|
||||
self._release_running_agent_state(session_key)
|
||||
# Only release the slot if this run's generation still owns
|
||||
# it. A /stop or /new that bumped the generation while we
|
||||
# were unwinding has already installed its own state; this
|
||||
# guard prevents an old run from clobbering it on the way
|
||||
# out.
|
||||
self._release_running_agent_state(
|
||||
session_key, run_generation=run_generation
|
||||
)
|
||||
if self._draining:
|
||||
self._update_runtime_status("draining")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue