fix(gateway): transcribe voice messages during active agent runs

Salvaged from #6600 (@kristianvast) — re-scoped to the voice half only and
rebased onto current main. The cascading-interrupt hang half of the original
PR landed independently in dd0d1222a, so this carries ONLY Problem 1.

When a voice/audio message arrives while the agent is busy on the same
session, it hit the interrupt path with empty text because STT only ran after
the running-agent guard — the voice was effectively lost. Now we transcribe
audio BEFORE signaling the agent (and on the fresh-message path), echo the raw
transcript back to the user (🎙️), and _enrich_message_with_transcription
returns (text, transcripts) so callers can echo. A new
_dequeue_pending_with_transcription drives the post-agent drain the same way.

Reapplied onto _prepare_inbound_message_text (inbound enrichment was extracted
from the inline dispatch block since the original PR).

Co-authored-by: Kristian Vastveit <kristian@agrointel.no>
This commit is contained in:
Kristian Vastveit 2026-06-08 15:16:10 +05:30 committed by kshitijk4poor
parent 00c46b8ff9
commit d55304c39f
2 changed files with 200 additions and 16 deletions

View file

@ -7748,10 +7748,28 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
)
if audio_paths:
message_text = await self._enrich_message_with_transcription(
message_text, _successful_transcripts = await self._enrich_message_with_transcription(
message_text,
audio_paths,
)
# Echo each successful transcript back to the user immediately,
# before the agent loop runs. Lets the user verify STT quality
# in real-time and see the raw whisper output verbatim.
if _successful_transcripts:
_echo_adapter = self.adapters.get(source.platform)
_echo_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
if _echo_adapter:
for _tx in _successful_transcripts:
try:
await _echo_adapter.send(
source.chat_id,
f'🎙️ "{_tx}"',
metadata=_echo_meta,
)
except Exception as _echo_exc:
logger.debug(
"Transcript echo failed (non-fatal): %s", _echo_exc,
)
_stt_fail_markers = (
"No STT provider",
"STT is disabled",
@ -11655,7 +11673,7 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
self,
user_text: str,
audio_paths: List[str],
) -> str:
) -> tuple[str, List[str]]:
"""
Auto-transcribe user voice/audio messages using the configured STT provider
and prepend the transcript to the message text.
@ -11665,7 +11683,13 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
audio_paths: List of local file paths to cached audio files.
Returns:
The enriched message string with transcriptions prepended.
A tuple of ``(enriched_text, successful_transcripts)``:
- ``enriched_text``: the message string with transcription wrappers
prepended (same as before).
- ``successful_transcripts``: the raw transcript strings for audio
clips that were successfully transcribed, in input order. Empty
list if every clip failed or STT is disabled. Callers can use
this to echo transcripts back to the user before the agent loop.
"""
if not getattr(self.config, "stt_enabled", True):
notes = []
@ -11679,24 +11703,26 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
else:
notes.append(f"[The user sent a voice message: {abs_path}]")
if not notes:
return user_text
return user_text, []
prefix = "\n\n".join(notes)
_placeholder = "(The user sent a message with no text content)"
if user_text and user_text.strip() == _placeholder:
return prefix
return prefix, []
if user_text:
return f"{prefix}\n\n{user_text}"
return prefix
return f"{prefix}\n\n{user_text}", []
return prefix, []
from tools.transcription_tools import transcribe_audio
enriched_parts = []
successful_transcripts: List[str] = []
for path in audio_paths:
try:
logger.debug("Transcribing user voice: %s", path)
result = await asyncio.to_thread(transcribe_audio, path)
if result["success"]:
transcript = result["transcript"]
successful_transcripts.append(transcript)
enriched_parts.append(
f'[The user sent a voice message~ '
f'Here\'s what they said: "{transcript}"]'
@ -11741,9 +11767,75 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
if user_text and user_text.strip() == _placeholder:
return prefix
if user_text:
return f"{prefix}\n\n{user_text}"
return prefix
return user_text
return f"{prefix}\n\n{user_text}", successful_transcripts
return prefix, successful_transcripts
return user_text, successful_transcripts
async def _dequeue_pending_with_transcription(
self,
adapter,
session_key: str,
source,
) -> str | None:
"""Dequeue a pending queued message, auto-transcribing audio media.
When a voice/audio message arrives during an active agent run, the
adapter stores the event in its pending queue and signals an interrupt
(see base.BaseAdapter.handle_message). The adapter path bypasses
_handle_message entirely, so the normal STT pipeline at message-receive
time never runs.
This helper fills that gap: when the dequeued event has audio media,
we transcribe inline, echo the raw transcript back to the user (same
"🎙️" format as the fresh-message path), and return enriched text.
Non-audio events fall back to _build_media_placeholder, matching the
original _dequeue_pending_text behavior.
"""
event = adapter.get_pending_message(session_key)
if not event:
return None
text = event.text or ""
audio_paths: List[str] = []
media_urls = getattr(event, "media_urls", None) or []
media_types = getattr(event, "media_types", None) or []
for i, path in enumerate(media_urls):
mtype = media_types[i] if i < len(media_types) else ""
is_audio = (
mtype.startswith("audio/")
or getattr(event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO)
)
if is_audio:
audio_paths.append(path)
if audio_paths:
enriched_text, successful_transcripts = await self._enrich_message_with_transcription(
text, audio_paths,
)
# Echo raw transcripts back to the user so voice interrupts
# feel identical to fresh voice messages.
if successful_transcripts:
echo_adapter = self.adapters.get(source.platform)
echo_meta = {"thread_id": source.thread_id} if source.thread_id else None
if echo_adapter:
for tx in successful_transcripts:
try:
await echo_adapter.send(
source.chat_id,
f'🎙️ "{tx}"',
metadata=echo_meta,
)
except Exception as echo_exc:
logger.debug(
"Transcript echo failed (non-fatal): %s", echo_exc,
)
return enriched_text or None
# Non-audio fallback: preserve original _dequeue_pending_text semantics.
if not text and media_urls:
text = _build_media_placeholder(event)
return text or None
def _build_process_event_source(self, evt: dict):
"""Resolve the canonical source for a synthetic background-process event.
@ -14591,7 +14683,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
# is lost — neither the interrupt path nor the dequeue
# path finds it.
_peek_event = _adapter._pending_messages.get(session_key)
pending_text = _peek_event.text if _peek_event else None
pending_text = None
if _peek_event is not None:
pending_text = _peek_event.text or ""
# Transcribe audio media BEFORE signaling the
# agent, so voice messages interrupt with the
# real transcript instead of an empty string
# (or file-path placeholder). Matches the UX
# of fresh voice messages including the
# 🎙️ echo back to the user.
_media_urls = getattr(_peek_event, "media_urls", None) or []
_media_types = getattr(_peek_event, "media_types", None) or []
_audio_paths = []
for _i, _path in enumerate(_media_urls):
_mtype = _media_types[_i] if _i < len(_media_types) else ""
_is_audio = (
_mtype.startswith("audio/")
or getattr(_peek_event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO)
)
if _is_audio:
_audio_paths.append(_path)
if _audio_paths:
try:
_enriched, _transcripts = await self._enrich_message_with_transcription(
pending_text, _audio_paths,
)
pending_text = _enriched
if _transcripts:
_echo_meta = {"thread_id": source.thread_id} if source.thread_id else None
for _tx in _transcripts:
try:
await _adapter.send(
source.chat_id,
f'🎙️ "{_tx}"',
metadata=_echo_meta,
)
except Exception as _echo_exc:
logger.debug(
"Voice-interrupt echo failed (non-fatal): %s",
_echo_exc,
)
except Exception as _trans_exc:
logger.warning(
"Voice-interrupt transcription failed: %s", _trans_exc,
)
elif not pending_text and _media_urls:
pending_text = _build_media_placeholder(_peek_event)
logger.debug("Interrupt detected from adapter, signaling agent...")
agent.interrupt(pending_text)
_interrupt_detected.set()
@ -14916,8 +15053,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
else:
pending = interrupt_message
elif pending_event:
pending = pending_event.text or _build_media_placeholder(pending_event)
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
# Transcribe audio media on the dequeued event BEFORE it is
# handed back as the next user turn, so queued/interrupting
# voice messages drain with the real transcript instead of
# a file-path placeholder. Echo each transcript back to the
# user (same 🎙️ format as fresh voice messages) so voice
# interrupts feel identical to text interrupts.
_pending_text = pending_event.text or ""
_media_urls = getattr(pending_event, "media_urls", None) or []
_media_types = getattr(pending_event, "media_types", None) or []
_audio_paths = []
for _i, _path in enumerate(_media_urls):
_mtype = _media_types[_i] if _i < len(_media_types) else ""
_is_audio = (
_mtype.startswith("audio/")
or getattr(pending_event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO)
)
if _is_audio:
_audio_paths.append(_path)
if _audio_paths:
try:
_enriched, _transcripts = await self._enrich_message_with_transcription(
_pending_text, _audio_paths,
)
pending = _enriched or None
if _transcripts:
_echo_meta = {"thread_id": source.thread_id} if source.thread_id else None
for _tx in _transcripts:
try:
await adapter.send(
source.chat_id,
f'🎙️ "{_tx}"',
metadata=_echo_meta,
)
except Exception as _echo_exc:
logger.debug(
"Voice-drain echo failed (non-fatal): %s", _echo_exc,
)
except Exception as _trans_exc:
logger.warning(
"Voice-drain transcription failed: %s", _trans_exc,
)
pending = _pending_text or _build_media_placeholder(pending_event)
else:
pending = _pending_text or _build_media_placeholder(pending_event)
if pending:
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
# Leftover /steer: if a steer arrived after the last tool batch
# (e.g. during the final API call), the agent couldn't inject it

View file

@ -47,7 +47,7 @@ async def test_enrich_message_with_transcription_surfaces_path_when_stt_disabled
"gateway.run._probe_audio_duration",
new=AsyncMock(return_value="0:12"),
):
result = await runner._enrich_message_with_transcription(
result, transcripts = await runner._enrich_message_with_transcription(
"caption",
["/tmp/voice.ogg"],
)
@ -56,6 +56,7 @@ async def test_enrich_message_with_transcription_surfaces_path_when_stt_disabled
assert "voice message" in result.lower()
assert "(duration: 0:12)" in result
assert "caption" in result
assert transcripts == []
@pytest.mark.asyncio
@ -69,13 +70,14 @@ async def test_enrich_message_with_transcription_omits_duration_on_probe_failure
"gateway.run._probe_audio_duration",
new=AsyncMock(return_value=None),
):
result = await runner._enrich_message_with_transcription(
result, transcripts = await runner._enrich_message_with_transcription(
"",
["/tmp/voice.ogg"],
)
assert "/tmp/voice.ogg" in result
assert "duration" not in result.lower()
assert transcripts == []
@pytest.mark.asyncio
@ -89,7 +91,7 @@ async def test_enrich_message_with_transcription_avoids_bogus_no_provider_messag
"tools.transcription_tools.transcribe_audio",
return_value={"success": False, "error": "VOICE_TOOLS_OPENAI_KEY not set"},
):
result = await runner._enrich_message_with_transcription(
result, transcripts = await runner._enrich_message_with_transcription(
"caption",
["/tmp/voice.ogg"],
)
@ -97,6 +99,7 @@ async def test_enrich_message_with_transcription_avoids_bogus_no_provider_messag
assert "No STT provider is configured" not in result
assert "trouble transcribing" in result
assert "caption" in result
assert transcripts == []
@pytest.mark.asyncio