diff --git a/gateway/run.py b/gateway/run.py index 70838f3492a..8d512bf3b3c 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7748,10 +7748,28 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): ) if audio_paths: - message_text = await self._enrich_message_with_transcription( + message_text, _successful_transcripts = await self._enrich_message_with_transcription( message_text, audio_paths, ) + # Echo each successful transcript back to the user immediately, + # before the agent loop runs. Lets the user verify STT quality + # in real-time and see the raw whisper output verbatim. + if _successful_transcripts: + _echo_adapter = self.adapters.get(source.platform) + _echo_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)) + if _echo_adapter: + for _tx in _successful_transcripts: + try: + await _echo_adapter.send( + source.chat_id, + f'🎙️ "{_tx}"', + metadata=_echo_meta, + ) + except Exception as _echo_exc: + logger.debug( + "Transcript echo failed (non-fatal): %s", _echo_exc, + ) _stt_fail_markers = ( "No STT provider", "STT is disabled", @@ -11655,7 +11673,7 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): self, user_text: str, audio_paths: List[str], - ) -> str: + ) -> tuple[str, List[str]]: """ Auto-transcribe user voice/audio messages using the configured STT provider and prepend the transcript to the message text. @@ -11665,7 +11683,13 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): audio_paths: List of local file paths to cached audio files. Returns: - The enriched message string with transcriptions prepended. + A tuple of ``(enriched_text, successful_transcripts)``: + - ``enriched_text``: the message string with transcription wrappers + prepended (same as before). + - ``successful_transcripts``: the raw transcript strings for audio + clips that were successfully transcribed, in input order. Empty + list if every clip failed or STT is disabled. Callers can use + this to echo transcripts back to the user before the agent loop. """ if not getattr(self.config, "stt_enabled", True): notes = [] @@ -11679,24 +11703,26 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): else: notes.append(f"[The user sent a voice message: {abs_path}]") if not notes: - return user_text + return user_text, [] prefix = "\n\n".join(notes) _placeholder = "(The user sent a message with no text content)" if user_text and user_text.strip() == _placeholder: - return prefix + return prefix, [] if user_text: - return f"{prefix}\n\n{user_text}" - return prefix + return f"{prefix}\n\n{user_text}", [] + return prefix, [] from tools.transcription_tools import transcribe_audio enriched_parts = [] + successful_transcripts: List[str] = [] for path in audio_paths: try: logger.debug("Transcribing user voice: %s", path) result = await asyncio.to_thread(transcribe_audio, path) if result["success"]: transcript = result["transcript"] + successful_transcripts.append(transcript) enriched_parts.append( f'[The user sent a voice message~ ' f'Here\'s what they said: "{transcript}"]' @@ -11741,9 +11767,75 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): if user_text and user_text.strip() == _placeholder: return prefix if user_text: - return f"{prefix}\n\n{user_text}" - return prefix - return user_text + return f"{prefix}\n\n{user_text}", successful_transcripts + return prefix, successful_transcripts + return user_text, successful_transcripts + + async def _dequeue_pending_with_transcription( + self, + adapter, + session_key: str, + source, + ) -> str | None: + """Dequeue a pending queued message, auto-transcribing audio media. + + When a voice/audio message arrives during an active agent run, the + adapter stores the event in its pending queue and signals an interrupt + (see base.BaseAdapter.handle_message). The adapter path bypasses + _handle_message entirely, so the normal STT pipeline at message-receive + time never runs. + + This helper fills that gap: when the dequeued event has audio media, + we transcribe inline, echo the raw transcript back to the user (same + "🎙️" format as the fresh-message path), and return enriched text. + Non-audio events fall back to _build_media_placeholder, matching the + original _dequeue_pending_text behavior. + """ + event = adapter.get_pending_message(session_key) + if not event: + return None + + text = event.text or "" + + audio_paths: List[str] = [] + media_urls = getattr(event, "media_urls", None) or [] + media_types = getattr(event, "media_types", None) or [] + for i, path in enumerate(media_urls): + mtype = media_types[i] if i < len(media_types) else "" + is_audio = ( + mtype.startswith("audio/") + or getattr(event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO) + ) + if is_audio: + audio_paths.append(path) + + if audio_paths: + enriched_text, successful_transcripts = await self._enrich_message_with_transcription( + text, audio_paths, + ) + # Echo raw transcripts back to the user so voice interrupts + # feel identical to fresh voice messages. + if successful_transcripts: + echo_adapter = self.adapters.get(source.platform) + echo_meta = {"thread_id": source.thread_id} if source.thread_id else None + if echo_adapter: + for tx in successful_transcripts: + try: + await echo_adapter.send( + source.chat_id, + f'🎙️ "{tx}"', + metadata=echo_meta, + ) + except Exception as echo_exc: + logger.debug( + "Transcript echo failed (non-fatal): %s", echo_exc, + ) + return enriched_text or None + + # Non-audio fallback: preserve original _dequeue_pending_text semantics. + if not text and media_urls: + text = _build_media_placeholder(event) + return text or None def _build_process_event_source(self, evt: dict): """Resolve the canonical source for a synthetic background-process event. @@ -14591,7 +14683,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): # is lost — neither the interrupt path nor the dequeue # path finds it. _peek_event = _adapter._pending_messages.get(session_key) - pending_text = _peek_event.text if _peek_event else None + pending_text = None + if _peek_event is not None: + pending_text = _peek_event.text or "" + # Transcribe audio media BEFORE signaling the + # agent, so voice messages interrupt with the + # real transcript instead of an empty string + # (or file-path placeholder). Matches the UX + # of fresh voice messages including the + # 🎙️ echo back to the user. + _media_urls = getattr(_peek_event, "media_urls", None) or [] + _media_types = getattr(_peek_event, "media_types", None) or [] + _audio_paths = [] + for _i, _path in enumerate(_media_urls): + _mtype = _media_types[_i] if _i < len(_media_types) else "" + _is_audio = ( + _mtype.startswith("audio/") + or getattr(_peek_event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO) + ) + if _is_audio: + _audio_paths.append(_path) + if _audio_paths: + try: + _enriched, _transcripts = await self._enrich_message_with_transcription( + pending_text, _audio_paths, + ) + pending_text = _enriched + if _transcripts: + _echo_meta = {"thread_id": source.thread_id} if source.thread_id else None + for _tx in _transcripts: + try: + await _adapter.send( + source.chat_id, + f'🎙️ "{_tx}"', + metadata=_echo_meta, + ) + except Exception as _echo_exc: + logger.debug( + "Voice-interrupt echo failed (non-fatal): %s", + _echo_exc, + ) + except Exception as _trans_exc: + logger.warning( + "Voice-interrupt transcription failed: %s", _trans_exc, + ) + elif not pending_text and _media_urls: + pending_text = _build_media_placeholder(_peek_event) logger.debug("Interrupt detected from adapter, signaling agent...") agent.interrupt(pending_text) _interrupt_detected.set() @@ -14916,8 +15053,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin): else: pending = interrupt_message elif pending_event: - pending = pending_event.text or _build_media_placeholder(pending_event) - logger.debug("Processing queued message after agent completion: '%s...'", pending[:40]) + # Transcribe audio media on the dequeued event BEFORE it is + # handed back as the next user turn, so queued/interrupting + # voice messages drain with the real transcript instead of + # a file-path placeholder. Echo each transcript back to the + # user (same 🎙️ format as fresh voice messages) so voice + # interrupts feel identical to text interrupts. + _pending_text = pending_event.text or "" + _media_urls = getattr(pending_event, "media_urls", None) or [] + _media_types = getattr(pending_event, "media_types", None) or [] + _audio_paths = [] + for _i, _path in enumerate(_media_urls): + _mtype = _media_types[_i] if _i < len(_media_types) else "" + _is_audio = ( + _mtype.startswith("audio/") + or getattr(pending_event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO) + ) + if _is_audio: + _audio_paths.append(_path) + if _audio_paths: + try: + _enriched, _transcripts = await self._enrich_message_with_transcription( + _pending_text, _audio_paths, + ) + pending = _enriched or None + if _transcripts: + _echo_meta = {"thread_id": source.thread_id} if source.thread_id else None + for _tx in _transcripts: + try: + await adapter.send( + source.chat_id, + f'🎙️ "{_tx}"', + metadata=_echo_meta, + ) + except Exception as _echo_exc: + logger.debug( + "Voice-drain echo failed (non-fatal): %s", _echo_exc, + ) + except Exception as _trans_exc: + logger.warning( + "Voice-drain transcription failed: %s", _trans_exc, + ) + pending = _pending_text or _build_media_placeholder(pending_event) + else: + pending = _pending_text or _build_media_placeholder(pending_event) + if pending: + logger.debug("Processing queued message after agent completion: '%s...'", pending[:40]) # Leftover /steer: if a steer arrived after the last tool batch # (e.g. during the final API call), the agent couldn't inject it diff --git a/tests/gateway/test_stt_config.py b/tests/gateway/test_stt_config.py index 44dd5950f3c..004dd907eb6 100644 --- a/tests/gateway/test_stt_config.py +++ b/tests/gateway/test_stt_config.py @@ -47,7 +47,7 @@ async def test_enrich_message_with_transcription_surfaces_path_when_stt_disabled "gateway.run._probe_audio_duration", new=AsyncMock(return_value="0:12"), ): - result = await runner._enrich_message_with_transcription( + result, transcripts = await runner._enrich_message_with_transcription( "caption", ["/tmp/voice.ogg"], ) @@ -56,6 +56,7 @@ async def test_enrich_message_with_transcription_surfaces_path_when_stt_disabled assert "voice message" in result.lower() assert "(duration: 0:12)" in result assert "caption" in result + assert transcripts == [] @pytest.mark.asyncio @@ -69,13 +70,14 @@ async def test_enrich_message_with_transcription_omits_duration_on_probe_failure "gateway.run._probe_audio_duration", new=AsyncMock(return_value=None), ): - result = await runner._enrich_message_with_transcription( + result, transcripts = await runner._enrich_message_with_transcription( "", ["/tmp/voice.ogg"], ) assert "/tmp/voice.ogg" in result assert "duration" not in result.lower() + assert transcripts == [] @pytest.mark.asyncio @@ -89,7 +91,7 @@ async def test_enrich_message_with_transcription_avoids_bogus_no_provider_messag "tools.transcription_tools.transcribe_audio", return_value={"success": False, "error": "VOICE_TOOLS_OPENAI_KEY not set"}, ): - result = await runner._enrich_message_with_transcription( + result, transcripts = await runner._enrich_message_with_transcription( "caption", ["/tmp/voice.ogg"], ) @@ -97,6 +99,7 @@ async def test_enrich_message_with_transcription_avoids_bogus_no_provider_messag assert "No STT provider is configured" not in result assert "trouble transcribing" in result assert "caption" in result + assert transcripts == [] @pytest.mark.asyncio