fix(voice): chunk oversized CLI recordings

This commit is contained in:
helix4u 2026-05-21 14:15:47 -06:00 committed by Teknium
parent 552e9c7881
commit 3462b097e2
4 changed files with 199 additions and 4 deletions

11
cli.py
View file

@ -10221,6 +10221,7 @@ class HermesCLI:
self._voice_processing = True
submitted = False
transcription_failed = False
wav_path = None
try:
if self._voice_recorder is None:
@ -10269,18 +10270,24 @@ class HermesCLI:
else:
error = result.get("error", "Unknown error")
_cprint(f"\n{_DIM}Transcription failed: {error}{_RST}")
transcription_failed = True
except Exception as e:
_cprint(f"\n{_DIM}Voice processing error: {e}{_RST}")
transcription_failed = wav_path is not None
finally:
with self._voice_lock:
self._voice_processing = False
if hasattr(self, '_app') and self._app:
self._app.invalidate()
# Clean up temp file
# Clean up temp file unless transcription failed. On failure, keep
# the source recording so long dictation is not lost.
try:
if wav_path and os.path.isfile(wav_path):
os.unlink(wav_path)
if transcription_failed:
_cprint(f"{_DIM}Recording preserved at: {wav_path}{_RST}")
else:
os.unlink(wav_path)
except Exception:
pass

View file

@ -1214,6 +1214,11 @@ class TestVoiceStopAndTranscribeReal:
cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
cli._voice_stop_and_transcribe()
assert cli._pending_input.empty()
_unl.assert_not_called()
assert any(
"Recording preserved at: /tmp/test.wav" in str(call)
for call in _cp.call_args_list
)
@patch("cli._cprint")
@patch("cli.os.unlink")
@ -1227,6 +1232,11 @@ class TestVoiceStopAndTranscribeReal:
recorder.stop.return_value = "/tmp/test.wav"
cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
cli._voice_stop_and_transcribe() # Should not raise
_unl.assert_not_called()
assert any(
"Recording preserved at: /tmp/test.wav" in str(call)
for call in _cp.call_args_list
)
@patch("cli._cprint")
@patch("tools.voice_mode.play_beep")

View file

@ -586,6 +586,73 @@ class TestTranscribeRecording:
assert result["transcript"] == "Thank you for helping me with this code."
assert "filtered" not in result
def test_oversized_wav_is_chunked_and_stitched(self, tmp_path, monkeypatch):
wav_path = tmp_path / "long.wav"
n_frames = 50000
audio = struct.pack(f"<{n_frames}h", *([1000] * n_frames))
with wave.open(str(wav_path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(audio)
temp_dir = tmp_path / "chunks"
temp_dir.mkdir()
monkeypatch.setattr("tools.voice_mode._TEMP_DIR", str(temp_dir))
monkeypatch.setattr("tools.transcription_tools.MAX_FILE_SIZE", 70 * 1024)
seen_paths = []
def fake_transcribe(path, model=None):
seen_paths.append(path)
assert model == "base"
assert path != str(wav_path)
assert os.path.getsize(path) <= 70 * 1024
return {
"success": True,
"transcript": f"part {len(seen_paths)}",
"provider": "local",
}
with patch("tools.transcription_tools.transcribe_audio", side_effect=fake_transcribe):
from tools.voice_mode import transcribe_recording
result = transcribe_recording(str(wav_path), model="base")
assert result["success"] is True
assert result["transcript"] == " ".join(
f"part {i}" for i in range(1, len(seen_paths) + 1)
)
assert result["chunks"] == len(seen_paths)
assert len(seen_paths) > 1
assert all(not os.path.exists(path) for path in seen_paths)
def test_oversized_wav_reports_failing_chunk(self, tmp_path, monkeypatch):
wav_path = tmp_path / "long.wav"
n_frames = 50000
audio = struct.pack(f"<{n_frames}h", *([1000] * n_frames))
with wave.open(str(wav_path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(audio)
temp_dir = tmp_path / "chunks"
temp_dir.mkdir()
monkeypatch.setattr("tools.voice_mode._TEMP_DIR", str(temp_dir))
monkeypatch.setattr("tools.transcription_tools.MAX_FILE_SIZE", 70 * 1024)
def fake_transcribe(path, model=None):
return {"success": False, "transcript": "", "error": "provider rejected audio"}
with patch("tools.transcription_tools.transcribe_audio", side_effect=fake_transcribe):
from tools.voice_mode import transcribe_recording
result = transcribe_recording(str(wav_path), model="base")
assert result["success"] is False
assert result["error"].startswith("Chunk 1/")
assert "provider rejected audio" in result["error"]
assert list(temp_dir.iterdir()) == []
class TestWhisperHallucinationFilter:
def test_known_hallucinations(self):

View file

@ -800,9 +800,12 @@ def transcribe_recording(wav_path: str, model: Optional[str] = None) -> Dict[str
Returns:
Dict with ``success``, ``transcript``, and optionally ``error``.
"""
from tools.transcription_tools import transcribe_audio
from tools.transcription_tools import MAX_FILE_SIZE, transcribe_audio
result = transcribe_audio(wav_path, model=model)
if _should_chunk_for_transcription(wav_path, MAX_FILE_SIZE):
result = _transcribe_wav_in_chunks(wav_path, model=model, max_file_size=MAX_FILE_SIZE)
else:
result = transcribe_audio(wav_path, model=model)
# Filter out Whisper hallucinations (common on silent/near-silent audio)
if result.get("success") and is_whisper_hallucination(result.get("transcript", "")):
@ -812,6 +815,114 @@ def transcribe_recording(wav_path: str, model: Optional[str] = None) -> Dict[str
return result
def _should_chunk_for_transcription(file_path: str, max_file_size: int) -> bool:
"""Return whether a CLI WAV recording needs to be split before STT."""
if not file_path.lower().endswith(".wav"):
return False
try:
return os.path.getsize(file_path) > max_file_size
except OSError:
return False
def _transcribe_wav_in_chunks(
wav_path: str,
*,
model: Optional[str],
max_file_size: int,
) -> Dict[str, Any]:
"""Split an oversized WAV into provider-sized chunks and join transcripts."""
from tools.transcription_tools import transcribe_audio
chunk_paths: List[str] = []
transcripts: List[str] = []
try:
chunk_paths = _split_wav_for_transcription(wav_path, max_file_size=max_file_size)
if not chunk_paths:
return {"success": False, "transcript": "", "error": "No audio chunks were created"}
logger.info("Transcribing oversized WAV in %d chunks: %s", len(chunk_paths), wav_path)
for index, chunk_path in enumerate(chunk_paths, start=1):
result = transcribe_audio(chunk_path, model=model)
if not result.get("success"):
error = result.get("error", "Unknown transcription error")
return {
"success": False,
"transcript": "",
"error": f"Chunk {index}/{len(chunk_paths)} failed: {error}",
}
transcript = result.get("transcript", "").strip()
if transcript and not is_whisper_hallucination(transcript):
transcripts.append(transcript)
return {
"success": True,
"transcript": " ".join(transcripts).strip(),
"provider": result.get("provider"),
"chunks": len(chunk_paths),
}
except Exception as e:
logger.error("Chunked transcription failed for %s: %s", wav_path, e, exc_info=True)
return {"success": False, "transcript": "", "error": f"Chunked transcription failed: {e}"}
finally:
for chunk_path in chunk_paths:
try:
if os.path.isfile(chunk_path):
os.unlink(chunk_path)
except OSError:
pass
def _split_wav_for_transcription(wav_path: str, *, max_file_size: int) -> List[str]:
"""Write WAV chunks small enough to pass the shared STT file-size gate."""
os.makedirs(_TEMP_DIR, exist_ok=True)
chunk_paths: List[str] = []
header_reserve = 64 * 1024
with wave.open(wav_path, "rb") as source:
params = source.getparams()
block_align = max(1, params.nchannels * params.sampwidth)
max_data_bytes = max_file_size - header_reserve
if max_data_bytes < block_align:
raise ValueError("STT max_file_size is too small for WAV chunking")
frames_per_chunk = max(1, max_data_bytes // block_align)
index = 0
while True:
frames = source.readframes(frames_per_chunk)
if not frames:
break
index += 1
temp = tempfile.NamedTemporaryFile(
prefix=f"{os.path.splitext(os.path.basename(wav_path))[0]}_chunk{index:03d}_",
suffix=".wav",
dir=_TEMP_DIR,
delete=False,
)
chunk_path = temp.name
temp.close()
try:
with wave.open(chunk_path, "wb") as chunk:
chunk.setnchannels(params.nchannels)
chunk.setsampwidth(params.sampwidth)
chunk.setframerate(params.framerate)
chunk.setcomptype(params.comptype, params.compname)
chunk.writeframes(frames)
chunk_paths.append(chunk_path)
except Exception:
try:
os.unlink(chunk_path)
except OSError:
pass
raise
return chunk_paths
# ============================================================================
# Audio playback (interruptable)
# ============================================================================