diff --git a/hermes_cli/active_sessions.py b/hermes_cli/active_sessions.py index 7fdb9c2d729..7eba80e5024 100644 --- a/hermes_cli/active_sessions.py +++ b/hermes_cli/active_sessions.py @@ -78,7 +78,7 @@ def active_session_limit_message(active_count: int, max_sessions: int) -> str: def _state_dir() -> Path: - return get_hermes_home() / "runtime" + return Path(get_hermes_home()) / "runtime" def _state_path() -> Path: @@ -311,6 +311,43 @@ def release_active_session(lease: ActiveSessionLease) -> None: lease.released = True +def transfer_active_session( + lease: ActiveSessionLease, + *, + session_id: str, + metadata: Optional[dict[str, Any]] = None, +) -> bool: + """Move an existing lease to a new session id without dropping the slot.""" + new_session_id = str(session_id or "") + if not new_session_id: + return False + if lease.released: + return False + if not lease.enabled: + lease.session_id = new_session_id + return True + + state_path = _state_path() + with _FileLock(_lock_path()): + entries = _prune_dead(_read_entries(state_path)) + updated = False + for entry in entries: + if str(entry.get("lease_id") or "") != lease.lease_id: + continue + entry["session_id"] = new_session_id + entry["updated_at"] = time.time() + if metadata: + entry["metadata"] = { + str(k): v for k, v in metadata.items() if isinstance(k, str) + } + updated = True + break + if updated: + _write_entries(state_path, entries) + lease.session_id = new_session_id + return updated + + def active_session_registry_snapshot() -> list[dict[str, Any]]: """Return the pruned active-session registry for diagnostics/tests.""" state_path = _state_path() diff --git a/tests/hermes_cli/test_active_sessions.py b/tests/hermes_cli/test_active_sessions.py index 7988f3a0b02..dda461d686b 100644 --- a/tests/hermes_cli/test_active_sessions.py +++ b/tests/hermes_cli/test_active_sessions.py @@ -113,6 +113,33 @@ def test_active_session_registry_prunes_dead_pids(tmp_path, monkeypatch): lease.release() +def test_transfer_active_session_reanchors_existing_lease(tmp_path, monkeypatch): + home = tmp_path / ".hermes" + monkeypatch.setenv("HERMES_HOME", str(home)) + + lease, message = active_sessions.try_acquire_active_session( + session_id="session-old", + surface="tui", + config={"max_concurrent_sessions": 1}, + metadata={"live_session_id": "ui-1"}, + ) + + assert message is None + assert lease is not None + assert active_sessions.transfer_active_session( + lease, + session_id="session-new", + metadata={"live_session_id": "ui-1"}, + ) + + snapshot = active_sessions.active_session_registry_snapshot() + assert lease.session_id == "session-new" + assert len(snapshot) == 1 + assert snapshot[0]["session_id"] == "session-new" + assert snapshot[0]["metadata"] == {"live_session_id": "ui-1"} + lease.release() + + def test_pid_alive_uses_safe_pid_exists_without_signalling(monkeypatch): checked: list[int] = [] diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 775a07cb317..054fc4df09f 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -734,6 +734,100 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch): assert all(sid == winner for sid in server._sessions) +def test_session_resume_reuses_live_agent_after_compression_rotation(server, monkeypatch): + """Resume must match the live agent's current session_id, not stale session_key.""" + + target = "20260409_020202_child" + stale_parent = "20260409_010101_parent" + sid = "live-rotated" + server._sessions[sid] = { + "agent": types.SimpleNamespace(model="test/model", session_id=target), + "created_at": 123.0, + "display_history_prefix": [], + "history": [{"role": "assistant", "content": "live child"}], + "history_lock": threading.RLock(), + "last_active": 123.0, + "running": False, + "session_key": stale_parent, + "transport": server._stdio_transport, + } + + class _DB: + def get_session(self, _sid): + return {"id": target} + + def get_session_by_title(self, _title): + return None + + def resolve_resume_session_id(self, _target): + return target + + monkeypatch.setattr(server, "_get_db", lambda: _DB()) + monkeypatch.setattr(server, "_emit", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + server, + "_session_info", + lambda _agent, _session=None: {"model": "test/model"}, + ) + + result = server.handle_request( + { + "id": "r1", + "method": "session.resume", + "params": {"session_id": target, "cols": 100}, + } + ) + + assert "error" not in result + assert result["result"]["session_id"] == sid + assert result["result"]["session_key"] == target + assert len(server._sessions) == 1 + + +def test_sync_session_key_after_compress_reanchors_active_session_lease( + server, monkeypatch, tmp_path +): + home = tmp_path / ".hermes" + monkeypatch.setenv("HERMES_HOME", str(home)) + + from hermes_cli.active_sessions import ( + active_session_registry_snapshot, + try_acquire_active_session, + ) + + lease, message = try_acquire_active_session( + session_id="session-old", + surface="tui", + config={"max_concurrent_sessions": 1}, + metadata={"live_session_id": "ui-1"}, + ) + assert message is None + assert lease is not None + + session = { + "active_session_lease": lease, + "agent": types.SimpleNamespace(session_id="session-new"), + "session_key": "session-old", + } + fake_approval = types.SimpleNamespace( + disable_session_yolo=lambda *_args, **_kwargs: None, + enable_session_yolo=lambda *_args, **_kwargs: None, + is_session_yolo_enabled=lambda *_args, **_kwargs: False, + register_gateway_notify=lambda *_args, **_kwargs: None, + unregister_gateway_notify=lambda *_args, **_kwargs: None, + ) + monkeypatch.setattr(server, "_restart_slash_worker", lambda *_args, **_kwargs: None) + + with patch.dict(sys.modules, {"tools.approval": fake_approval}): + server._sync_session_key_after_compress("ui-1", session) + + snapshot = active_session_registry_snapshot() + assert session["session_key"] == "session-new" + assert lease.session_id == "session-new" + assert [entry["session_id"] for entry in snapshot] == ["session-new"] + lease.release() + + def test_session_resume_live_payload_uses_current_history_with_ancestors(server, monkeypatch): """Live resume should not reuse a stale ancestor-inclusive snapshot.""" diff --git a/tui_gateway/server.py b/tui_gateway/server.py index e4bcf1b0bfc..a7e1ba18b75 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -381,6 +381,59 @@ def _release_active_session_slot(session: dict | None) -> None: logger.debug("Failed to release active session slot", exc_info=True) +def _transfer_active_session_slot( + sid: str, + session: dict, + *, + new_session_id: str, +) -> bool: + if not new_session_id: + return False + lease = session.get("active_session_lease") + if lease is None: + return True + try: + from hermes_cli.active_sessions import transfer_active_session + + if transfer_active_session( + lease, + session_id=new_session_id, + metadata={"live_session_id": sid}, + ): + return True + except Exception: + logger.debug("Failed to transfer active session slot", exc_info=True) + + # Fallback: the in-place transfer could not move the lease (entry pruned / + # pid-check transiently failed). Reserve the new slot BEFORE releasing the + # old one, so a concurrent gateway at the session cap cannot grab the freed + # slot in a release-then-reacquire window and leave this session with no + # lease at all (#49041 review). If the reserve fails, KEEP the old lease. + new_lease, limit_message = _claim_active_session_slot( + new_session_id, + live_session_id=sid, + ) + if new_lease is not None: + old_lease = session.pop("active_session_lease", None) + if old_lease is not None: + try: + old_lease.release() + except Exception: + logger.debug("Failed to release stale active session slot", exc_info=True) + session["active_session_lease"] = new_lease + return True + # Reserve failed — retain the existing lease rather than dropping it. + if limit_message: + logger.warning( + "Compression session lease re-anchor failed (kept old lease): " + "sid=%s new_session_id=%s reason=%s", + sid, + new_session_id, + limit_message, + ) + return False + + def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> None: """Best-effort finalize hook + memory commit for a session. @@ -2543,6 +2596,19 @@ def _sync_session_key_after_compress( if not new_session_id or new_session_id == old_key: return + lease_reanchored = _transfer_active_session_slot( + sid, + session, + new_session_id=new_session_id, + ) + if not lease_reanchored: + logger.warning( + "Compression session lease did not re-anchor: sid=%s old_session_id=%s new_session_id=%s", + sid, + old_key, + new_session_id, + ) + try: from tools.approval import ( disable_session_yolo, @@ -4940,7 +5006,7 @@ def _session_live_title(session: dict, key: str) -> str: def _session_live_item(sid: str, session: dict, current_sid: str = "") -> dict: - key = str(session.get("session_key") or sid) + key = _session_lookup_key(session, fallback=sid) agent = session.get("agent") history = list(session.get("history") or []) status = _session_live_status(sid, session) @@ -4964,11 +5030,21 @@ def _session_live_item(sid: str, session: dict, current_sid: str = "") -> dict: } +def _session_lookup_key(session: dict, *, fallback: str = "") -> str: + agent = session.get("agent") + return str( + getattr(agent, "session_id", None) + or session.get("session_key") + or fallback + or "" + ) + + def _find_live_session_by_key(session_key: str) -> tuple[str, dict] | None: for sid, session in list(_sessions.items()): if session.get("_finalized"): continue - if str(session.get("session_key") or "") == session_key: + if _session_lookup_key(session, fallback=sid) == session_key: return sid, session return None @@ -5012,7 +5088,7 @@ def _live_session_payload( "messages": _history_to_messages(history), "running": running, "session_id": sid, - "session_key": session.get("session_key") or sid, + "session_key": _session_lookup_key(session, fallback=sid), "started_at": float(session.get("created_at") or time.time()), "status": _session_live_status(sid, session), }