fix(environments): use incremental UTF-8 decoder in select-based drain

The first draft of the fix called `chunk.decode("utf-8")` directly on
each 4096-byte `os.read()` result, which corrupts output whenever a
multi-byte UTF-8 character straddles a read boundary:

  * `UnicodeDecodeError` fires on the valid-but-truncated byte sequence.
  * The except handler clears ALL previously-decoded output and replaces
    the whole buffer with `[binary output detected ...]`.

Empirically: 10000 '日' chars (30001 bytes) through the wrapper loses
all 10000 characters on the first draft; the baseline TextIOWrapper
drain (which uses `encoding='utf-8', errors='replace'` on Popen)
preserves them all. This regression affects any command emitting
non-ASCII output larger than one chunk — CJK/Arabic/emoji in
`npm install`, `pip install`, `docker logs`, `kubectl logs`, etc.

Fix: swap to `codecs.getincrementaldecoder('utf-8')(errors='replace')`,
which buffers partial multi-byte sequences across chunks and substitutes
U+FFFD for genuinely invalid bytes. Flush on drain exit via
`decoder.decode(b'', final=True)` to emit any trailing replacement
character for a dangling partial sequence.

Adds two regression tests:
  * test_utf8_multibyte_across_read_boundary — 10000 U+65E5 chars,
    verifies count round-trips and no fallback fires.
  * test_invalid_utf8_uses_replacement_not_fallback — deliberate
    \xff\xfe between valid ASCII, verifies surrounding text survives.
This commit is contained in:
Teknium 2026-04-19 11:26:02 -07:00 committed by Teknium
parent 0a02fbd842
commit f336ae3d7d
2 changed files with 84 additions and 26 deletions

View file

@ -6,6 +6,7 @@ re-sourced before each command. CWD persists via in-band stdout markers (remote)
or a temp file (local).
"""
import codecs
import json
import logging
import os
@ -453,37 +454,51 @@ class BaseEnvironment(ABC):
# shortly after ``bash`` exits even if the pipe hasn't EOF'd yet.
# Any output the grandchild writes after that point goes to an
# orphaned pipe (harmless — the kernel reaps it when our end closes).
#
# Decoding: we ``os.read()`` raw bytes in fixed-size chunks (4096)
# so a single multibyte UTF-8 character can split across reads. An
# incremental decoder buffers partial sequences across chunks, and
# ``errors="replace"`` mirrors the baseline ``TextIOWrapper`` (which
# was constructed with ``encoding="utf-8", errors="replace"`` on
# ``Popen``) so binary or mis-encoded output is preserved with
# U+FFFD substitution rather than clobbering the whole buffer.
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
def _drain():
fd = proc.stdout.fileno()
idle_after_exit = 0
while True:
try:
ready, _, _ = select.select([fd], [], [], 0.1)
except (ValueError, OSError):
break # fd already closed
if ready:
try:
while True:
try:
chunk = os.read(fd, 4096)
ready, _, _ = select.select([fd], [], [], 0.1)
except (ValueError, OSError):
break
if not chunk:
break # true EOF — all writers closed
try:
output_chunks.append(chunk.decode("utf-8"))
except UnicodeDecodeError:
output_chunks.clear()
output_chunks.append(
"[binary output detected — raw bytes not displayable]"
)
break
idle_after_exit = 0
elif proc.poll() is not None:
# bash is gone and the pipe was idle for ~100ms. Give
# it two more cycles to catch any buffered tail, then
# stop — otherwise we wait forever on a grandchild pipe.
idle_after_exit += 1
if idle_after_exit >= 3:
break
break # fd already closed
if ready:
try:
chunk = os.read(fd, 4096)
except (ValueError, OSError):
break
if not chunk:
break # true EOF — all writers closed
output_chunks.append(decoder.decode(chunk))
idle_after_exit = 0
elif proc.poll() is not None:
# bash is gone and the pipe was idle for ~100ms. Give
# it two more cycles to catch any buffered tail, then
# stop — otherwise we wait forever on a grandchild pipe.
idle_after_exit += 1
if idle_after_exit >= 3:
break
finally:
# Flush any bytes buffered mid-sequence. With ``errors="replace"``
# this emits U+FFFD for any final incomplete sequence rather than
# raising.
try:
tail = decoder.decode(b"", final=True)
if tail:
output_chunks.append(tail)
except Exception:
pass
drain_thread = threading.Thread(target=_drain, daemon=True)
drain_thread.start()