fix(tui): handle Windows PTY stdin and detached WS frames (#41953)

Two narrow Windows desktop fixes:

1. tools/process_registry.py — PTY stdin writes are now platform-aware.
   pywinpty (Windows) expects str; ptyprocess (POSIX) expects bytes.
   Previously bytes was unconditionally passed, producing a TypeError on
   Windows ("'bytes' object cannot be converted to 'PyString'").

2. tui_gateway/server.py + ws.py — Detached WebSocket sessions now park on
   a _DropTransport sink instead of _stdio_transport. In the desktop the
   gateway runs in-process and stdout is captured by Electron into
   desktop.log, so falling back to stdio leaked raw JSON-RPC frames into
   the desktop log after WS disconnects. Orphan-reap semantics are
   preserved via _ws_session_is_orphaned.

Verified on a Windows desktop install:
- pywinpty 2.0.15 rejects bytes / accepts str — reproduced exactly
- Focused suite green (write_stdin × 2, write_json_drops_detached_ws_frames,
  ws_orphan_reap × 2)
- All 6 CI test shards green, e2e green, nix (ubuntu/macos) green

Salvage commit (21be7ca) fixes the new test referencing an undefined
_ThreadUnsafeStdout — uses the existing _ChunkyStdout helper.
This commit is contained in:
qWait 2026-06-09 00:41:20 +08:00 committed by GitHub
parent 74744795af
commit cef00ae602
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 91 additions and 19 deletions

View file

@ -152,6 +152,21 @@ def test_write_json_returns_false_on_broken_pipe(monkeypatch):
assert server.write_json({"ok": True}) is False
def test_write_json_drops_detached_ws_frames(monkeypatch):
out = _ChunkyStdout()
monkeypatch.setattr(server, "_real_stdout", out)
server._sessions["detached-sid"] = {"transport": server._detached_ws_transport}
try:
assert server.write_json({
"jsonrpc": "2.0",
"method": "event",
"params": {"session_id": "detached-sid", "type": "message.delta"},
}) is False
assert out.parts == []
finally:
server._sessions.pop("detached-sid", None)
def test_tui_verbose_tool_details_fail_closed_when_redaction_fails(monkeypatch):
redact_module = types.ModuleType("agent.redact")
@ -933,7 +948,7 @@ def test_ws_orphan_reap_closes_worker_when_session_stays_detached(monkeypatch):
closed["worker"] = True
server._sessions["orphan-sid"] = _session(
transport=server._stdio_transport,
transport=server._detached_ws_transport,
slash_worker=_FakeWorker(),
running=False,
)
@ -992,11 +1007,15 @@ def test_ws_orphan_reap_spares_reattached_session(monkeypatch):
assert server._ws_session_is_orphaned(reattached) is False
# Mid-turn sessions are also spared even if detached.
mid_turn = _session(transport=server._stdio_transport, running=True)
mid_turn = _session(transport=server._detached_ws_transport, running=True)
assert server._ws_session_is_orphaned(mid_turn) is False
# Already finalized sessions are spared (idempotency).
done = _session(transport=server._stdio_transport, running=False, _finalized=True)
done = _session(
transport=server._detached_ws_transport,
running=False,
_finalized=True,
)
assert server._ws_session_is_orphaned(done) is False

View file

@ -63,6 +63,44 @@ def _wait_until(predicate, timeout: float = 5.0, interval: float = 0.05) -> bool
return False
def test_write_stdin_uses_str_for_windows_pty(monkeypatch, registry):
"""pywinpty expects str input; bytes raises a PyString conversion error."""
written = []
class _FakePty:
def write(self, value):
written.append(value)
session = _make_session(sid="pty-win")
session._pty = _FakePty()
registry._running[session.id] = session
monkeypatch.setattr("tools.process_registry._IS_WINDOWS", True)
result = registry.write_stdin(session.id, "hello\n")
assert result == {"status": "ok", "bytes_written": 6}
assert written == ["hello\n"]
assert isinstance(written[0], str)
def test_write_stdin_uses_bytes_for_posix_pty(monkeypatch, registry):
written = []
class _FakePty:
def write(self, value):
written.append(value)
session = _make_session(sid="pty-posix")
session._pty = _FakePty()
registry._running[session.id] = session
monkeypatch.setattr("tools.process_registry._IS_WINDOWS", False)
result = registry.write_stdin(session.id, "hello\n")
assert result == {"status": "ok", "bytes_written": 6}
assert written == [b"hello\n"]
# =========================================================================
# Get / Poll
# =========================================================================

View file

@ -1207,10 +1207,14 @@ class ProcessRegistry:
if session.exited:
return {"status": "already_exited", "error": "Process has already finished"}
# PTY mode -- write through pty handle (expects bytes)
# PTY mode -- write through pty handle.
if hasattr(session, '_pty') and session._pty:
try:
pty_data = data.encode("utf-8") if isinstance(data, str) else data
# pywinpty expects str on Windows; ptyprocess expects bytes on POSIX.
if _IS_WINDOWS:
pty_data = data.decode("utf-8") if isinstance(data, bytes) else str(data)
else:
pty_data = data.encode("utf-8") if isinstance(data, str) else data
session._pty.write(pty_data)
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:

View file

@ -202,11 +202,27 @@ atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True))
_real_stdout = sys.stdout
sys.stdout = sys.stderr
class _DropTransport:
"""Detached WS sink: keep sessions resumable without writing stale frames."""
def write(self, obj: dict) -> bool:
return False
def close(self) -> None:
return None
# Module-level stdio transport — fallback sink when no transport is bound via
# contextvar or session. Stream resolved through a lambda so runtime monkey-
# patches of `_real_stdout` (used extensively in tests) still land correctly.
_stdio_transport = StdioTransport(lambda: _real_stdout, _stdout_lock)
# Detached websocket sessions use a drop sink instead of stdio. Desktop embeds
# the gateway in-process and captures stdout into logs, so stale JSON-RPC frames
# must not fall through there while the session waits for resume or reap.
_detached_ws_transport = _DropTransport()
class _SlashWorker:
"""Persistent HermesCLI subprocess for slash commands."""
@ -394,16 +410,15 @@ def _teardown_session(session: dict | None) -> None:
def _ws_session_is_orphaned(session: dict | None) -> bool:
"""True if a WS session has no live transport and no in-flight turn.
After ``handle_ws`` detaches a disconnected client it points the session
at ``_stdio_transport``. In the dashboard's in-process gateway there is no
real stdio peer reading those frames, so a session left on the stdio
transport (and not mid-turn) is genuinely orphaned and safe to reap.
After ``handle_ws`` detaches a disconnected client it points the session at
``_detached_ws_transport``. A session left on that transport (and not
mid-turn) is genuinely orphaned and safe to reap.
"""
if not session or session.get("_finalized"):
return False
if session.get("running"):
return False
return session.get("transport") is _stdio_transport
return session.get("transport") is _detached_ws_transport
def _schedule_ws_orphan_reap(sid: str) -> None:

View file

@ -289,17 +289,13 @@ async def handle_ws(ws: Any) -> None:
transport.close()
# Detach the transport from any sessions it owned so later emits
# fall back to stdio instead of crashing into a closed socket.
#
# In the dashboard's in-process gateway that stdio fallback has no
# real reader, so a detached session would otherwise sit forever
# holding its _SlashWorker subprocess open (one leaked python proc
# per browser refresh — #38591 fallout). Schedule a grace-delayed
# reap; a quick reconnect / session.resume re-binds a live
# transport and cancels it (see _ws_session_is_orphaned).
# do not crash into a closed socket or fall through to desktop
# stdout logs. Schedule a grace-delayed reap; a quick reconnect /
# session.resume re-binds a live transport and cancels it (see
# _ws_session_is_orphaned).
for _sid, sess in list(server._sessions.items()):
if sess.get("transport") is transport:
sess["transport"] = server._stdio_transport
sess["transport"] = server._detached_ws_transport
detached_sessions += 1
try:
server._schedule_ws_orphan_reap(_sid)