mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
Merge pull request #41984 from kshitijk4poor/salvage/6600-stale-streaming-worker
fix(gateway): transcribe voice messages during active agent runs (salvage #6600, voice half)
This commit is contained in:
commit
c3055d6185
3 changed files with 201 additions and 16 deletions
207
gateway/run.py
207
gateway/run.py
|
|
@ -7748,10 +7748,28 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
)
|
||||
|
||||
if audio_paths:
|
||||
message_text = await self._enrich_message_with_transcription(
|
||||
message_text, _successful_transcripts = await self._enrich_message_with_transcription(
|
||||
message_text,
|
||||
audio_paths,
|
||||
)
|
||||
# Echo each successful transcript back to the user immediately,
|
||||
# before the agent loop runs. Lets the user verify STT quality
|
||||
# in real-time and see the raw whisper output verbatim.
|
||||
if _successful_transcripts:
|
||||
_echo_adapter = self.adapters.get(source.platform)
|
||||
_echo_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
|
||||
if _echo_adapter:
|
||||
for _tx in _successful_transcripts:
|
||||
try:
|
||||
await _echo_adapter.send(
|
||||
source.chat_id,
|
||||
f'🎙️ "{_tx}"',
|
||||
metadata=_echo_meta,
|
||||
)
|
||||
except Exception as _echo_exc:
|
||||
logger.debug(
|
||||
"Transcript echo failed (non-fatal): %s", _echo_exc,
|
||||
)
|
||||
_stt_fail_markers = (
|
||||
"No STT provider",
|
||||
"STT is disabled",
|
||||
|
|
@ -11655,7 +11673,7 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
self,
|
||||
user_text: str,
|
||||
audio_paths: List[str],
|
||||
) -> str:
|
||||
) -> tuple[str, List[str]]:
|
||||
"""
|
||||
Auto-transcribe user voice/audio messages using the configured STT provider
|
||||
and prepend the transcript to the message text.
|
||||
|
|
@ -11665,7 +11683,13 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
audio_paths: List of local file paths to cached audio files.
|
||||
|
||||
Returns:
|
||||
The enriched message string with transcriptions prepended.
|
||||
A tuple of ``(enriched_text, successful_transcripts)``:
|
||||
- ``enriched_text``: the message string with transcription wrappers
|
||||
prepended (same as before).
|
||||
- ``successful_transcripts``: the raw transcript strings for audio
|
||||
clips that were successfully transcribed, in input order. Empty
|
||||
list if every clip failed or STT is disabled. Callers can use
|
||||
this to echo transcripts back to the user before the agent loop.
|
||||
"""
|
||||
if not getattr(self.config, "stt_enabled", True):
|
||||
notes = []
|
||||
|
|
@ -11679,24 +11703,26 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
else:
|
||||
notes.append(f"[The user sent a voice message: {abs_path}]")
|
||||
if not notes:
|
||||
return user_text
|
||||
return user_text, []
|
||||
prefix = "\n\n".join(notes)
|
||||
_placeholder = "(The user sent a message with no text content)"
|
||||
if user_text and user_text.strip() == _placeholder:
|
||||
return prefix
|
||||
return prefix, []
|
||||
if user_text:
|
||||
return f"{prefix}\n\n{user_text}"
|
||||
return prefix
|
||||
return f"{prefix}\n\n{user_text}", []
|
||||
return prefix, []
|
||||
|
||||
from tools.transcription_tools import transcribe_audio
|
||||
|
||||
enriched_parts = []
|
||||
successful_transcripts: List[str] = []
|
||||
for path in audio_paths:
|
||||
try:
|
||||
logger.debug("Transcribing user voice: %s", path)
|
||||
result = await asyncio.to_thread(transcribe_audio, path)
|
||||
if result["success"]:
|
||||
transcript = result["transcript"]
|
||||
successful_transcripts.append(transcript)
|
||||
enriched_parts.append(
|
||||
f'[The user sent a voice message~ '
|
||||
f'Here\'s what they said: "{transcript}"]'
|
||||
|
|
@ -11741,9 +11767,75 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
if user_text and user_text.strip() == _placeholder:
|
||||
return prefix
|
||||
if user_text:
|
||||
return f"{prefix}\n\n{user_text}"
|
||||
return prefix
|
||||
return user_text
|
||||
return f"{prefix}\n\n{user_text}", successful_transcripts
|
||||
return prefix, successful_transcripts
|
||||
return user_text, successful_transcripts
|
||||
|
||||
async def _dequeue_pending_with_transcription(
|
||||
self,
|
||||
adapter,
|
||||
session_key: str,
|
||||
source,
|
||||
) -> str | None:
|
||||
"""Dequeue a pending queued message, auto-transcribing audio media.
|
||||
|
||||
When a voice/audio message arrives during an active agent run, the
|
||||
adapter stores the event in its pending queue and signals an interrupt
|
||||
(see base.BaseAdapter.handle_message). The adapter path bypasses
|
||||
_handle_message entirely, so the normal STT pipeline at message-receive
|
||||
time never runs.
|
||||
|
||||
This helper fills that gap: when the dequeued event has audio media,
|
||||
we transcribe inline, echo the raw transcript back to the user (same
|
||||
"🎙️" format as the fresh-message path), and return enriched text.
|
||||
Non-audio events fall back to _build_media_placeholder, matching the
|
||||
original _dequeue_pending_text behavior.
|
||||
"""
|
||||
event = adapter.get_pending_message(session_key)
|
||||
if not event:
|
||||
return None
|
||||
|
||||
text = event.text or ""
|
||||
|
||||
audio_paths: List[str] = []
|
||||
media_urls = getattr(event, "media_urls", None) or []
|
||||
media_types = getattr(event, "media_types", None) or []
|
||||
for i, path in enumerate(media_urls):
|
||||
mtype = media_types[i] if i < len(media_types) else ""
|
||||
is_audio = (
|
||||
mtype.startswith("audio/")
|
||||
or getattr(event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO)
|
||||
)
|
||||
if is_audio:
|
||||
audio_paths.append(path)
|
||||
|
||||
if audio_paths:
|
||||
enriched_text, successful_transcripts = await self._enrich_message_with_transcription(
|
||||
text, audio_paths,
|
||||
)
|
||||
# Echo raw transcripts back to the user so voice interrupts
|
||||
# feel identical to fresh voice messages.
|
||||
if successful_transcripts:
|
||||
echo_adapter = self.adapters.get(source.platform)
|
||||
echo_meta = {"thread_id": source.thread_id} if source.thread_id else None
|
||||
if echo_adapter:
|
||||
for tx in successful_transcripts:
|
||||
try:
|
||||
await echo_adapter.send(
|
||||
source.chat_id,
|
||||
f'🎙️ "{tx}"',
|
||||
metadata=echo_meta,
|
||||
)
|
||||
except Exception as echo_exc:
|
||||
logger.debug(
|
||||
"Transcript echo failed (non-fatal): %s", echo_exc,
|
||||
)
|
||||
return enriched_text or None
|
||||
|
||||
# Non-audio fallback: preserve original _dequeue_pending_text semantics.
|
||||
if not text and media_urls:
|
||||
text = _build_media_placeholder(event)
|
||||
return text or None
|
||||
|
||||
def _build_process_event_source(self, evt: dict):
|
||||
"""Resolve the canonical source for a synthetic background-process event.
|
||||
|
|
@ -14591,7 +14683,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
# is lost — neither the interrupt path nor the dequeue
|
||||
# path finds it.
|
||||
_peek_event = _adapter._pending_messages.get(session_key)
|
||||
pending_text = _peek_event.text if _peek_event else None
|
||||
pending_text = None
|
||||
if _peek_event is not None:
|
||||
pending_text = _peek_event.text or ""
|
||||
# Transcribe audio media BEFORE signaling the
|
||||
# agent, so voice messages interrupt with the
|
||||
# real transcript instead of an empty string
|
||||
# (or file-path placeholder). Matches the UX
|
||||
# of fresh voice messages including the
|
||||
# 🎙️ echo back to the user.
|
||||
_media_urls = getattr(_peek_event, "media_urls", None) or []
|
||||
_media_types = getattr(_peek_event, "media_types", None) or []
|
||||
_audio_paths = []
|
||||
for _i, _path in enumerate(_media_urls):
|
||||
_mtype = _media_types[_i] if _i < len(_media_types) else ""
|
||||
_is_audio = (
|
||||
_mtype.startswith("audio/")
|
||||
or getattr(_peek_event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO)
|
||||
)
|
||||
if _is_audio:
|
||||
_audio_paths.append(_path)
|
||||
if _audio_paths:
|
||||
try:
|
||||
_enriched, _transcripts = await self._enrich_message_with_transcription(
|
||||
pending_text, _audio_paths,
|
||||
)
|
||||
pending_text = _enriched
|
||||
if _transcripts:
|
||||
_echo_meta = {"thread_id": source.thread_id} if source.thread_id else None
|
||||
for _tx in _transcripts:
|
||||
try:
|
||||
await _adapter.send(
|
||||
source.chat_id,
|
||||
f'🎙️ "{_tx}"',
|
||||
metadata=_echo_meta,
|
||||
)
|
||||
except Exception as _echo_exc:
|
||||
logger.debug(
|
||||
"Voice-interrupt echo failed (non-fatal): %s",
|
||||
_echo_exc,
|
||||
)
|
||||
except Exception as _trans_exc:
|
||||
logger.warning(
|
||||
"Voice-interrupt transcription failed: %s", _trans_exc,
|
||||
)
|
||||
elif not pending_text and _media_urls:
|
||||
pending_text = _build_media_placeholder(_peek_event)
|
||||
logger.debug("Interrupt detected from adapter, signaling agent...")
|
||||
agent.interrupt(pending_text)
|
||||
_interrupt_detected.set()
|
||||
|
|
@ -14916,8 +15053,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
|
|||
else:
|
||||
pending = interrupt_message
|
||||
elif pending_event:
|
||||
pending = pending_event.text or _build_media_placeholder(pending_event)
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
# Transcribe audio media on the dequeued event BEFORE it is
|
||||
# handed back as the next user turn, so queued/interrupting
|
||||
# voice messages drain with the real transcript instead of
|
||||
# a file-path placeholder. Echo each transcript back to the
|
||||
# user (same 🎙️ format as fresh voice messages) so voice
|
||||
# interrupts feel identical to text interrupts.
|
||||
_pending_text = pending_event.text or ""
|
||||
_media_urls = getattr(pending_event, "media_urls", None) or []
|
||||
_media_types = getattr(pending_event, "media_types", None) or []
|
||||
_audio_paths = []
|
||||
for _i, _path in enumerate(_media_urls):
|
||||
_mtype = _media_types[_i] if _i < len(_media_types) else ""
|
||||
_is_audio = (
|
||||
_mtype.startswith("audio/")
|
||||
or getattr(pending_event, "message_type", None) in (MessageType.VOICE, MessageType.AUDIO)
|
||||
)
|
||||
if _is_audio:
|
||||
_audio_paths.append(_path)
|
||||
if _audio_paths:
|
||||
try:
|
||||
_enriched, _transcripts = await self._enrich_message_with_transcription(
|
||||
_pending_text, _audio_paths,
|
||||
)
|
||||
pending = _enriched or None
|
||||
if _transcripts:
|
||||
_echo_meta = {"thread_id": source.thread_id} if source.thread_id else None
|
||||
for _tx in _transcripts:
|
||||
try:
|
||||
await adapter.send(
|
||||
source.chat_id,
|
||||
f'🎙️ "{_tx}"',
|
||||
metadata=_echo_meta,
|
||||
)
|
||||
except Exception as _echo_exc:
|
||||
logger.debug(
|
||||
"Voice-drain echo failed (non-fatal): %s", _echo_exc,
|
||||
)
|
||||
except Exception as _trans_exc:
|
||||
logger.warning(
|
||||
"Voice-drain transcription failed: %s", _trans_exc,
|
||||
)
|
||||
pending = _pending_text or _build_media_placeholder(pending_event)
|
||||
else:
|
||||
pending = _pending_text or _build_media_placeholder(pending_event)
|
||||
if pending:
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
|
||||
# Leftover /steer: if a steer arrived after the last tool batch
|
||||
# (e.g. during the final API call), the agent couldn't inject it
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ AUTHOR_MAP = {
|
|||
"804436395@qq.com": "LaPhilosophie",
|
||||
"maxmitcham@mac.home": "maxtrigify",
|
||||
"ccook@nvms.com": "ccook1963",
|
||||
"kristian@agrointel.no": "kristianvast",
|
||||
"thomas.paquette@gmail.com": "RyTsYdUp",
|
||||
"techxacm@gmail.com": "ProgramCaiCai",
|
||||
"266365592+bmoore210@users.noreply.github.com": "bmoore210",
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ async def test_enrich_message_with_transcription_surfaces_path_when_stt_disabled
|
|||
"gateway.run._probe_audio_duration",
|
||||
new=AsyncMock(return_value="0:12"),
|
||||
):
|
||||
result = await runner._enrich_message_with_transcription(
|
||||
result, transcripts = await runner._enrich_message_with_transcription(
|
||||
"caption",
|
||||
["/tmp/voice.ogg"],
|
||||
)
|
||||
|
|
@ -56,6 +56,7 @@ async def test_enrich_message_with_transcription_surfaces_path_when_stt_disabled
|
|||
assert "voice message" in result.lower()
|
||||
assert "(duration: 0:12)" in result
|
||||
assert "caption" in result
|
||||
assert transcripts == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -69,13 +70,14 @@ async def test_enrich_message_with_transcription_omits_duration_on_probe_failure
|
|||
"gateway.run._probe_audio_duration",
|
||||
new=AsyncMock(return_value=None),
|
||||
):
|
||||
result = await runner._enrich_message_with_transcription(
|
||||
result, transcripts = await runner._enrich_message_with_transcription(
|
||||
"",
|
||||
["/tmp/voice.ogg"],
|
||||
)
|
||||
|
||||
assert "/tmp/voice.ogg" in result
|
||||
assert "duration" not in result.lower()
|
||||
assert transcripts == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -89,7 +91,7 @@ async def test_enrich_message_with_transcription_avoids_bogus_no_provider_messag
|
|||
"tools.transcription_tools.transcribe_audio",
|
||||
return_value={"success": False, "error": "VOICE_TOOLS_OPENAI_KEY not set"},
|
||||
):
|
||||
result = await runner._enrich_message_with_transcription(
|
||||
result, transcripts = await runner._enrich_message_with_transcription(
|
||||
"caption",
|
||||
["/tmp/voice.ogg"],
|
||||
)
|
||||
|
|
@ -97,6 +99,7 @@ async def test_enrich_message_with_transcription_avoids_bogus_no_provider_messag
|
|||
assert "No STT provider is configured" not in result
|
||||
assert "trouble transcribing" in result
|
||||
assert "caption" in result
|
||||
assert transcripts == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue