diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index 477ca21d2cf..b5534046b7e 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -152,6 +152,21 @@ def test_write_json_returns_false_on_broken_pipe(monkeypatch): assert server.write_json({"ok": True}) is False +def test_write_json_drops_detached_ws_frames(monkeypatch): + out = _ChunkyStdout() + monkeypatch.setattr(server, "_real_stdout", out) + server._sessions["detached-sid"] = {"transport": server._detached_ws_transport} + try: + assert server.write_json({ + "jsonrpc": "2.0", + "method": "event", + "params": {"session_id": "detached-sid", "type": "message.delta"}, + }) is False + assert out.parts == [] + finally: + server._sessions.pop("detached-sid", None) + + def test_tui_verbose_tool_details_fail_closed_when_redaction_fails(monkeypatch): redact_module = types.ModuleType("agent.redact") @@ -933,7 +948,7 @@ def test_ws_orphan_reap_closes_worker_when_session_stays_detached(monkeypatch): closed["worker"] = True server._sessions["orphan-sid"] = _session( - transport=server._stdio_transport, + transport=server._detached_ws_transport, slash_worker=_FakeWorker(), running=False, ) @@ -992,11 +1007,15 @@ def test_ws_orphan_reap_spares_reattached_session(monkeypatch): assert server._ws_session_is_orphaned(reattached) is False # Mid-turn sessions are also spared even if detached. - mid_turn = _session(transport=server._stdio_transport, running=True) + mid_turn = _session(transport=server._detached_ws_transport, running=True) assert server._ws_session_is_orphaned(mid_turn) is False # Already finalized sessions are spared (idempotency). - done = _session(transport=server._stdio_transport, running=False, _finalized=True) + done = _session( + transport=server._detached_ws_transport, + running=False, + _finalized=True, + ) assert server._ws_session_is_orphaned(done) is False diff --git a/tests/tools/test_process_registry.py b/tests/tools/test_process_registry.py index 8e3426b2713..6a95e46b7e1 100644 --- a/tests/tools/test_process_registry.py +++ b/tests/tools/test_process_registry.py @@ -63,6 +63,44 @@ def _wait_until(predicate, timeout: float = 5.0, interval: float = 0.05) -> bool return False +def test_write_stdin_uses_str_for_windows_pty(monkeypatch, registry): + """pywinpty expects str input; bytes raises a PyString conversion error.""" + written = [] + + class _FakePty: + def write(self, value): + written.append(value) + + session = _make_session(sid="pty-win") + session._pty = _FakePty() + registry._running[session.id] = session + monkeypatch.setattr("tools.process_registry._IS_WINDOWS", True) + + result = registry.write_stdin(session.id, "hello\n") + + assert result == {"status": "ok", "bytes_written": 6} + assert written == ["hello\n"] + assert isinstance(written[0], str) + + +def test_write_stdin_uses_bytes_for_posix_pty(monkeypatch, registry): + written = [] + + class _FakePty: + def write(self, value): + written.append(value) + + session = _make_session(sid="pty-posix") + session._pty = _FakePty() + registry._running[session.id] = session + monkeypatch.setattr("tools.process_registry._IS_WINDOWS", False) + + result = registry.write_stdin(session.id, "hello\n") + + assert result == {"status": "ok", "bytes_written": 6} + assert written == [b"hello\n"] + + # ========================================================================= # Get / Poll # ========================================================================= diff --git a/tools/process_registry.py b/tools/process_registry.py index d9eb02a4ab8..86970c0fd29 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -1207,10 +1207,14 @@ class ProcessRegistry: if session.exited: return {"status": "already_exited", "error": "Process has already finished"} - # PTY mode -- write through pty handle (expects bytes) + # PTY mode -- write through pty handle. if hasattr(session, '_pty') and session._pty: try: - pty_data = data.encode("utf-8") if isinstance(data, str) else data + # pywinpty expects str on Windows; ptyprocess expects bytes on POSIX. + if _IS_WINDOWS: + pty_data = data.decode("utf-8") if isinstance(data, bytes) else str(data) + else: + pty_data = data.encode("utf-8") if isinstance(data, str) else data session._pty.write(pty_data) return {"status": "ok", "bytes_written": len(data)} except Exception as e: diff --git a/tui_gateway/server.py b/tui_gateway/server.py index c0885e661c2..962813c47c5 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -202,11 +202,27 @@ atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True)) _real_stdout = sys.stdout sys.stdout = sys.stderr + +class _DropTransport: + """Detached WS sink: keep sessions resumable without writing stale frames.""" + + def write(self, obj: dict) -> bool: + return False + + def close(self) -> None: + return None + + # Module-level stdio transport — fallback sink when no transport is bound via # contextvar or session. Stream resolved through a lambda so runtime monkey- # patches of `_real_stdout` (used extensively in tests) still land correctly. _stdio_transport = StdioTransport(lambda: _real_stdout, _stdout_lock) +# Detached websocket sessions use a drop sink instead of stdio. Desktop embeds +# the gateway in-process and captures stdout into logs, so stale JSON-RPC frames +# must not fall through there while the session waits for resume or reap. +_detached_ws_transport = _DropTransport() + class _SlashWorker: """Persistent HermesCLI subprocess for slash commands.""" @@ -394,16 +410,15 @@ def _teardown_session(session: dict | None) -> None: def _ws_session_is_orphaned(session: dict | None) -> bool: """True if a WS session has no live transport and no in-flight turn. - After ``handle_ws`` detaches a disconnected client it points the session - at ``_stdio_transport``. In the dashboard's in-process gateway there is no - real stdio peer reading those frames, so a session left on the stdio - transport (and not mid-turn) is genuinely orphaned and safe to reap. + After ``handle_ws`` detaches a disconnected client it points the session at + ``_detached_ws_transport``. A session left on that transport (and not + mid-turn) is genuinely orphaned and safe to reap. """ if not session or session.get("_finalized"): return False if session.get("running"): return False - return session.get("transport") is _stdio_transport + return session.get("transport") is _detached_ws_transport def _schedule_ws_orphan_reap(sid: str) -> None: diff --git a/tui_gateway/ws.py b/tui_gateway/ws.py index 1babfc1d3c2..2e88f457802 100644 --- a/tui_gateway/ws.py +++ b/tui_gateway/ws.py @@ -289,17 +289,13 @@ async def handle_ws(ws: Any) -> None: transport.close() # Detach the transport from any sessions it owned so later emits - # fall back to stdio instead of crashing into a closed socket. - # - # In the dashboard's in-process gateway that stdio fallback has no - # real reader, so a detached session would otherwise sit forever - # holding its _SlashWorker subprocess open (one leaked python proc - # per browser refresh — #38591 fallout). Schedule a grace-delayed - # reap; a quick reconnect / session.resume re-binds a live - # transport and cancels it (see _ws_session_is_orphaned). + # do not crash into a closed socket or fall through to desktop + # stdout logs. Schedule a grace-delayed reap; a quick reconnect / + # session.resume re-binds a live transport and cancels it (see + # _ws_session_is_orphaned). for _sid, sess in list(server._sessions.items()): if sess.get("transport") is transport: - sess["transport"] = server._stdio_transport + sess["transport"] = server._detached_ws_transport detached_sessions += 1 try: server._schedule_ws_orphan_reap(_sid)