mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Merge branch 'main' of github.com:NousResearch/hermes-agent into feat/ink-refactor
This commit is contained in:
commit
ec553fdb49
93 changed files with 3531 additions and 1330 deletions
458
gateway/run.py
458
gateway/run.py
|
|
@ -352,19 +352,14 @@ def _build_media_placeholder(event) -> str:
|
|||
return "\n".join(parts)
|
||||
|
||||
|
||||
def _dequeue_pending_text(adapter, session_key: str) -> str | None:
|
||||
"""Consume and return the text of a pending queued message.
|
||||
def _dequeue_pending_event(adapter, session_key: str) -> MessageEvent | None:
|
||||
"""Consume and return the full pending event for a session.
|
||||
|
||||
Preserves media context for captionless photo/document events by
|
||||
building a placeholder so the message isn't silently dropped.
|
||||
Queued follow-ups must preserve their media metadata so they can re-enter
|
||||
the normal image/STT/document preprocessing path instead of being reduced
|
||||
to a placeholder string.
|
||||
"""
|
||||
event = adapter.get_pending_message(session_key)
|
||||
if not event:
|
||||
return None
|
||||
text = event.text
|
||||
if not text and getattr(event, "media_urls", None):
|
||||
text = _build_media_placeholder(event)
|
||||
return text
|
||||
return adapter.get_pending_message(session_key)
|
||||
|
||||
|
||||
def _check_unavailable_skill(command_name: str) -> str | None:
|
||||
|
|
@ -1465,7 +1460,18 @@ class GatewayRunner:
|
|||
logger.info("Recovered %s background process(es) from previous run", recovered)
|
||||
except Exception as e:
|
||||
logger.warning("Process checkpoint recovery: %s", e)
|
||||
|
||||
|
||||
# Suspend sessions that were active when the gateway last exited.
|
||||
# This prevents stuck sessions from being blindly resumed on restart,
|
||||
# which can create an unrecoverable loop (#7536). Suspended sessions
|
||||
# auto-reset on the next incoming message, giving the user a clean start.
|
||||
try:
|
||||
suspended = self.session_store.suspend_recently_active()
|
||||
if suspended:
|
||||
logger.info("Suspended %d in-flight session(s) from previous run", suspended)
|
||||
except Exception as e:
|
||||
logger.warning("Session suspension on startup failed: %s", e)
|
||||
|
||||
connected_count = 0
|
||||
enabled_platform_count = 0
|
||||
startup_nonretryable_errors: list[str] = []
|
||||
|
|
@ -2221,6 +2227,13 @@ class GatewayRunner:
|
|||
# are system-generated and must skip user authorization.
|
||||
if getattr(event, "internal", False):
|
||||
pass
|
||||
elif source.user_id is None:
|
||||
# Messages with no user identity (Telegram service messages,
|
||||
# channel forwards, anonymous admin actions) cannot be
|
||||
# authorized — drop silently instead of triggering the pairing
|
||||
# flow with a None user_id.
|
||||
logger.debug("Ignoring message with no user_id from %s", source.platform.value)
|
||||
return None
|
||||
elif not self._is_user_authorized(source):
|
||||
logger.warning("Unauthorized user: %s (%s) on %s", source.user_id, source.user_name, source.platform.value)
|
||||
# In DMs: offer pairing code. In groups: silently ignore.
|
||||
|
|
@ -2370,8 +2383,11 @@ class GatewayRunner:
|
|||
self._pending_messages.pop(_quick_key, None)
|
||||
if _quick_key in self._running_agents:
|
||||
del self._running_agents[_quick_key]
|
||||
logger.info("HARD STOP for session %s — session lock released", _quick_key[:20])
|
||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
||||
# Mark session suspended so the next message starts fresh
|
||||
# instead of resuming the stuck context (#7536).
|
||||
self.session_store.suspend_session(_quick_key)
|
||||
logger.info("HARD STOP for session %s — suspended, session lock released", _quick_key[:20])
|
||||
return "⚡ Force-stopped. The session is suspended — your next message will start fresh."
|
||||
|
||||
# /reset and /new must bypass the running-agent guard so they
|
||||
# actually dispatch as commands instead of being queued as user
|
||||
|
|
@ -2761,6 +2777,162 @@ class GatewayRunner:
|
|||
del self._running_agents[_quick_key]
|
||||
self._running_agents_ts.pop(_quick_key, None)
|
||||
|
||||
async def _prepare_inbound_message_text(
|
||||
self,
|
||||
*,
|
||||
event: MessageEvent,
|
||||
source: SessionSource,
|
||||
history: List[Dict[str, Any]],
|
||||
) -> Optional[str]:
|
||||
"""Prepare inbound event text for the agent.
|
||||
|
||||
Keep the normal inbound path and the queued follow-up path on the same
|
||||
preprocessing pipeline so sender attribution, image enrichment, STT,
|
||||
document notes, reply context, and @ references all behave the same.
|
||||
"""
|
||||
history = history or []
|
||||
message_text = event.text or ""
|
||||
|
||||
_is_shared_thread = (
|
||||
source.chat_type != "dm"
|
||||
and source.thread_id
|
||||
and not getattr(self.config, "thread_sessions_per_user", False)
|
||||
)
|
||||
if _is_shared_thread and source.user_name:
|
||||
message_text = f"[{source.user_name}] {message_text}"
|
||||
|
||||
if event.media_urls:
|
||||
image_paths = []
|
||||
audio_paths = []
|
||||
for i, path in enumerate(event.media_urls):
|
||||
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
||||
if mtype.startswith("image/") or event.message_type == MessageType.PHOTO:
|
||||
image_paths.append(path)
|
||||
if mtype.startswith("audio/") or event.message_type in (MessageType.VOICE, MessageType.AUDIO):
|
||||
audio_paths.append(path)
|
||||
|
||||
if image_paths:
|
||||
message_text = await self._enrich_message_with_vision(
|
||||
message_text,
|
||||
image_paths,
|
||||
)
|
||||
|
||||
if audio_paths:
|
||||
message_text = await self._enrich_message_with_transcription(
|
||||
message_text,
|
||||
audio_paths,
|
||||
)
|
||||
_stt_fail_markers = (
|
||||
"No STT provider",
|
||||
"STT is disabled",
|
||||
"can't listen",
|
||||
"VOICE_TOOLS_OPENAI_KEY",
|
||||
)
|
||||
if any(marker in message_text for marker in _stt_fail_markers):
|
||||
_stt_adapter = self.adapters.get(source.platform)
|
||||
_stt_meta = {"thread_id": source.thread_id} if source.thread_id else None
|
||||
if _stt_adapter:
|
||||
try:
|
||||
_stt_msg = (
|
||||
"🎤 I received your voice message but can't transcribe it — "
|
||||
"no speech-to-text provider is configured.\n\n"
|
||||
"To enable voice: install faster-whisper "
|
||||
"(`pip install faster-whisper` in the Hermes venv) "
|
||||
"and set `stt.enabled: true` in config.yaml, "
|
||||
"then /restart the gateway."
|
||||
)
|
||||
if self._has_setup_skill():
|
||||
_stt_msg += "\n\nFor full setup instructions, type: `/skill hermes-agent-setup`"
|
||||
await _stt_adapter.send(
|
||||
source.chat_id,
|
||||
_stt_msg,
|
||||
metadata=_stt_meta,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if event.media_urls and event.message_type == MessageType.DOCUMENT:
|
||||
import mimetypes as _mimetypes
|
||||
|
||||
_TEXT_EXTENSIONS = {".txt", ".md", ".csv", ".log", ".json", ".xml", ".yaml", ".yml", ".toml", ".ini", ".cfg"}
|
||||
for i, path in enumerate(event.media_urls):
|
||||
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
||||
if mtype in ("", "application/octet-stream"):
|
||||
import os as _os2
|
||||
|
||||
_ext = _os2.path.splitext(path)[1].lower()
|
||||
if _ext in _TEXT_EXTENSIONS:
|
||||
mtype = "text/plain"
|
||||
else:
|
||||
guessed, _ = _mimetypes.guess_type(path)
|
||||
if guessed:
|
||||
mtype = guessed
|
||||
if not mtype.startswith(("application/", "text/")):
|
||||
continue
|
||||
|
||||
import os as _os
|
||||
import re as _re
|
||||
|
||||
basename = _os.path.basename(path)
|
||||
parts = basename.split("_", 2)
|
||||
display_name = parts[2] if len(parts) >= 3 else basename
|
||||
display_name = _re.sub(r'[^\w.\- ]', '_', display_name)
|
||||
|
||||
if mtype.startswith("text/"):
|
||||
context_note = (
|
||||
f"[The user sent a text document: '{display_name}'. "
|
||||
f"Its content has been included below. "
|
||||
f"The file is also saved at: {path}]"
|
||||
)
|
||||
else:
|
||||
context_note = (
|
||||
f"[The user sent a document: '{display_name}'. "
|
||||
f"The file is saved at: {path}. "
|
||||
f"Ask the user what they'd like you to do with it.]"
|
||||
)
|
||||
message_text = f"{context_note}\n\n{message_text}"
|
||||
|
||||
if getattr(event, "reply_to_text", None) and event.reply_to_message_id:
|
||||
reply_snippet = event.reply_to_text[:500]
|
||||
found_in_history = any(
|
||||
reply_snippet[:200] in (msg.get("content") or "")
|
||||
for msg in history
|
||||
if msg.get("role") in ("assistant", "user", "tool")
|
||||
)
|
||||
if not found_in_history:
|
||||
message_text = f'[Replying to: "{reply_snippet}"]\n\n{message_text}'
|
||||
|
||||
if "@" in message_text:
|
||||
try:
|
||||
from agent.context_references import preprocess_context_references_async
|
||||
from agent.model_metadata import get_model_context_length
|
||||
|
||||
_msg_cwd = os.environ.get("MESSAGING_CWD", os.path.expanduser("~"))
|
||||
_msg_ctx_len = get_model_context_length(
|
||||
self._model,
|
||||
base_url=self._base_url or "",
|
||||
)
|
||||
_ctx_result = await preprocess_context_references_async(
|
||||
message_text,
|
||||
cwd=_msg_cwd,
|
||||
context_length=_msg_ctx_len,
|
||||
allowed_root=_msg_cwd,
|
||||
)
|
||||
if _ctx_result.blocked:
|
||||
_adapter = self.adapters.get(source.platform)
|
||||
if _adapter:
|
||||
await _adapter.send(
|
||||
source.chat_id,
|
||||
"\n".join(_ctx_result.warnings) or "Context injection refused.",
|
||||
)
|
||||
return None
|
||||
if _ctx_result.expanded:
|
||||
message_text = _ctx_result.message
|
||||
except Exception as exc:
|
||||
logger.debug("@ context reference expansion failed: %s", exc)
|
||||
|
||||
return message_text
|
||||
|
||||
async def _handle_message_with_agent(self, event, source, _quick_key: str):
|
||||
"""Inner handler that runs under the _running_agents sentinel guard."""
|
||||
_msg_start_time = time.time()
|
||||
|
|
@ -2812,7 +2984,9 @@ class GatewayRunner:
|
|||
# so the agent knows this is a fresh conversation (not an intentional /reset).
|
||||
if getattr(session_entry, 'was_auto_reset', False):
|
||||
reset_reason = getattr(session_entry, 'auto_reset_reason', None) or 'idle'
|
||||
if reset_reason == "daily":
|
||||
if reset_reason == "suspended":
|
||||
context_note = "[System note: The user's previous session was stopped and suspended. This is a fresh conversation with no prior context.]"
|
||||
elif reset_reason == "daily":
|
||||
context_note = "[System note: The user's session was automatically reset by the daily schedule. This is a fresh conversation with no prior context.]"
|
||||
else:
|
||||
context_note = "[System note: The user's previous session expired due to inactivity. This is a fresh conversation with no prior context.]"
|
||||
|
|
@ -2829,7 +3003,9 @@ class GatewayRunner:
|
|||
)
|
||||
platform_name = source.platform.value if source.platform else ""
|
||||
had_activity = getattr(session_entry, 'reset_had_activity', False)
|
||||
should_notify = (
|
||||
# Suspended sessions always notify (they were explicitly stopped
|
||||
# or crashed mid-operation) — skip the policy check.
|
||||
should_notify = reset_reason == "suspended" or (
|
||||
policy.notify
|
||||
and had_activity
|
||||
and platform_name not in policy.notify_exclude_platforms
|
||||
|
|
@ -2837,7 +3013,9 @@ class GatewayRunner:
|
|||
if should_notify:
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter:
|
||||
if reset_reason == "daily":
|
||||
if reset_reason == "suspended":
|
||||
reason_text = "previous session was stopped or interrupted"
|
||||
elif reset_reason == "daily":
|
||||
reason_text = f"daily schedule at {policy.at_hour}:00"
|
||||
else:
|
||||
hours = policy.idle_minutes // 60
|
||||
|
|
@ -3195,149 +3373,13 @@ class GatewayRunner:
|
|||
# attachments (documents, audio, etc.) are not sent to the vision
|
||||
# tool even when they appear in the same message.
|
||||
# -----------------------------------------------------------------
|
||||
message_text = event.text or ""
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
# Sender attribution for shared thread sessions.
|
||||
#
|
||||
# When multiple users share a single thread session (the default for
|
||||
# threads), prefix each message with [sender name] so the agent can
|
||||
# tell participants apart. Skip for DMs (single-user by nature) and
|
||||
# when per-user thread isolation is explicitly enabled.
|
||||
# -----------------------------------------------------------------
|
||||
_is_shared_thread = (
|
||||
source.chat_type != "dm"
|
||||
and source.thread_id
|
||||
and not getattr(self.config, "thread_sessions_per_user", False)
|
||||
message_text = await self._prepare_inbound_message_text(
|
||||
event=event,
|
||||
source=source,
|
||||
history=history,
|
||||
)
|
||||
if _is_shared_thread and source.user_name:
|
||||
message_text = f"[{source.user_name}] {message_text}"
|
||||
|
||||
if event.media_urls:
|
||||
image_paths = []
|
||||
for i, path in enumerate(event.media_urls):
|
||||
# Check media_types if available; otherwise infer from message type
|
||||
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
||||
is_image = (
|
||||
mtype.startswith("image/")
|
||||
or event.message_type == MessageType.PHOTO
|
||||
)
|
||||
if is_image:
|
||||
image_paths.append(path)
|
||||
if image_paths:
|
||||
message_text = await self._enrich_message_with_vision(
|
||||
message_text, image_paths
|
||||
)
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
# Auto-transcribe voice/audio messages sent by the user
|
||||
# -----------------------------------------------------------------
|
||||
if event.media_urls:
|
||||
audio_paths = []
|
||||
for i, path in enumerate(event.media_urls):
|
||||
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
||||
is_audio = (
|
||||
mtype.startswith("audio/")
|
||||
or event.message_type in (MessageType.VOICE, MessageType.AUDIO)
|
||||
)
|
||||
if is_audio:
|
||||
audio_paths.append(path)
|
||||
if audio_paths:
|
||||
message_text = await self._enrich_message_with_transcription(
|
||||
message_text, audio_paths
|
||||
)
|
||||
# If STT failed, send a direct message to the user so they
|
||||
# know voice isn't configured — don't rely on the agent to
|
||||
# relay the error clearly.
|
||||
_stt_fail_markers = (
|
||||
"No STT provider",
|
||||
"STT is disabled",
|
||||
"can't listen",
|
||||
"VOICE_TOOLS_OPENAI_KEY",
|
||||
)
|
||||
if any(m in message_text for m in _stt_fail_markers):
|
||||
_stt_adapter = self.adapters.get(source.platform)
|
||||
_stt_meta = {"thread_id": source.thread_id} if source.thread_id else None
|
||||
if _stt_adapter:
|
||||
try:
|
||||
_stt_msg = (
|
||||
"🎤 I received your voice message but can't transcribe it — "
|
||||
"no speech-to-text provider is configured.\n\n"
|
||||
"To enable voice: install faster-whisper "
|
||||
"(`pip install faster-whisper` in the Hermes venv) "
|
||||
"and set `stt.enabled: true` in config.yaml, "
|
||||
"then /restart the gateway."
|
||||
)
|
||||
# Point to setup skill if it's installed
|
||||
if self._has_setup_skill():
|
||||
_stt_msg += "\n\nFor full setup instructions, type: `/skill hermes-agent-setup`"
|
||||
await _stt_adapter.send(
|
||||
source.chat_id, _stt_msg,
|
||||
metadata=_stt_meta,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
# Enrich document messages with context notes for the agent
|
||||
# -----------------------------------------------------------------
|
||||
if event.media_urls and event.message_type == MessageType.DOCUMENT:
|
||||
import mimetypes as _mimetypes
|
||||
_TEXT_EXTENSIONS = {".txt", ".md", ".csv", ".log", ".json", ".xml", ".yaml", ".yml", ".toml", ".ini", ".cfg"}
|
||||
for i, path in enumerate(event.media_urls):
|
||||
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
||||
# Fall back to extension-based detection when MIME type is unreliable.
|
||||
if mtype in ("", "application/octet-stream"):
|
||||
import os as _os2
|
||||
_ext = _os2.path.splitext(path)[1].lower()
|
||||
if _ext in _TEXT_EXTENSIONS:
|
||||
mtype = "text/plain"
|
||||
else:
|
||||
guessed, _ = _mimetypes.guess_type(path)
|
||||
if guessed:
|
||||
mtype = guessed
|
||||
if not mtype.startswith(("application/", "text/")):
|
||||
continue
|
||||
# Extract display filename by stripping the doc_{uuid12}_ prefix
|
||||
import os as _os
|
||||
basename = _os.path.basename(path)
|
||||
# Format: doc_<12hex>_<original_filename>
|
||||
parts = basename.split("_", 2)
|
||||
display_name = parts[2] if len(parts) >= 3 else basename
|
||||
# Sanitize to prevent prompt injection via filenames
|
||||
import re as _re
|
||||
display_name = _re.sub(r'[^\w.\- ]', '_', display_name)
|
||||
|
||||
if mtype.startswith("text/"):
|
||||
context_note = (
|
||||
f"[The user sent a text document: '{display_name}'. "
|
||||
f"Its content has been included below. "
|
||||
f"The file is also saved at: {path}]"
|
||||
)
|
||||
else:
|
||||
context_note = (
|
||||
f"[The user sent a document: '{display_name}'. "
|
||||
f"The file is saved at: {path}. "
|
||||
f"Ask the user what they'd like you to do with it.]"
|
||||
)
|
||||
message_text = f"{context_note}\n\n{message_text}"
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
# Inject reply context when user replies to a message not in history.
|
||||
# Telegram (and other platforms) let users reply to specific messages,
|
||||
# but if the quoted message is from a previous session, cron delivery,
|
||||
# or background task, the agent has no context about what's being
|
||||
# referenced. Prepend the quoted text so the agent understands. (#1594)
|
||||
# -----------------------------------------------------------------
|
||||
if getattr(event, 'reply_to_text', None) and event.reply_to_message_id:
|
||||
reply_snippet = event.reply_to_text[:500]
|
||||
found_in_history = any(
|
||||
reply_snippet[:200] in (msg.get("content") or "")
|
||||
for msg in history
|
||||
if msg.get("role") in ("assistant", "user", "tool")
|
||||
)
|
||||
if not found_in_history:
|
||||
message_text = f'[Replying to: "{reply_snippet}"]\n\n{message_text}'
|
||||
if message_text is None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Emit agent:start hook
|
||||
|
|
@ -3349,30 +3391,6 @@ class GatewayRunner:
|
|||
}
|
||||
await self.hooks.emit("agent:start", hook_ctx)
|
||||
|
||||
# Expand @ context references (@file:, @folder:, @diff, etc.)
|
||||
if "@" in message_text:
|
||||
try:
|
||||
from agent.context_references import preprocess_context_references_async
|
||||
from agent.model_metadata import get_model_context_length
|
||||
_msg_cwd = os.environ.get("MESSAGING_CWD", os.path.expanduser("~"))
|
||||
_msg_ctx_len = get_model_context_length(
|
||||
self._model, base_url=self._base_url or "")
|
||||
_ctx_result = await preprocess_context_references_async(
|
||||
message_text, cwd=_msg_cwd,
|
||||
context_length=_msg_ctx_len, allowed_root=_msg_cwd)
|
||||
if _ctx_result.blocked:
|
||||
_adapter = self.adapters.get(source.platform)
|
||||
if _adapter:
|
||||
await _adapter.send(
|
||||
source.chat_id,
|
||||
"\n".join(_ctx_result.warnings) or "Context injection refused.",
|
||||
)
|
||||
return
|
||||
if _ctx_result.expanded:
|
||||
message_text = _ctx_result.message
|
||||
except Exception as exc:
|
||||
logger.debug("@ context reference expansion failed: %s", exc)
|
||||
|
||||
# Run the agent
|
||||
agent_result = await self._run_agent(
|
||||
message=message_text,
|
||||
|
|
@ -4010,25 +4028,31 @@ class GatewayRunner:
|
|||
handles /stop before this method is reached. This handler fires
|
||||
only through normal command dispatch (no running agent) or as a
|
||||
fallback. Force-clean the session lock in all cases for safety.
|
||||
|
||||
When there IS a running/pending agent, the session is also marked
|
||||
as *suspended* so the next message starts a fresh session instead
|
||||
of resuming the stuck context (#7536).
|
||||
"""
|
||||
source = event.source
|
||||
session_entry = self.session_store.get_or_create_session(source)
|
||||
session_key = session_entry.session_key
|
||||
|
||||
|
||||
agent = self._running_agents.get(session_key)
|
||||
if agent is _AGENT_PENDING_SENTINEL:
|
||||
# Force-clean the sentinel so the session is unlocked.
|
||||
if session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
logger.info("HARD STOP (pending) for session %s — sentinel cleared", session_key[:20])
|
||||
return "⚡ Force-stopped. The agent was still starting — session unlocked."
|
||||
self.session_store.suspend_session(session_key)
|
||||
logger.info("HARD STOP (pending) for session %s — suspended, sentinel cleared", session_key[:20])
|
||||
return "⚡ Force-stopped. The agent was still starting — your next message will start fresh."
|
||||
if agent:
|
||||
agent.interrupt("Stop requested")
|
||||
# Force-clean the session lock so a truly hung agent doesn't
|
||||
# keep it locked forever.
|
||||
if session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
||||
self.session_store.suspend_session(session_key)
|
||||
return "⚡ Force-stopped. Your next message will start a fresh session."
|
||||
else:
|
||||
return "No active task to stop."
|
||||
|
||||
|
|
@ -6694,6 +6718,8 @@ class GatewayRunner:
|
|||
chat_id=context.source.chat_id,
|
||||
chat_name=context.source.chat_name or "",
|
||||
thread_id=str(context.source.thread_id) if context.source.thread_id else "",
|
||||
user_id=str(context.source.user_id) if context.source.user_id else "",
|
||||
user_name=str(context.source.user_name) if context.source.user_name else "",
|
||||
)
|
||||
|
||||
def _clear_session_env(self, tokens: list) -> None:
|
||||
|
|
@ -6906,6 +6932,8 @@ class GatewayRunner:
|
|||
platform_name = watcher.get("platform", "")
|
||||
chat_id = watcher.get("chat_id", "")
|
||||
thread_id = watcher.get("thread_id", "")
|
||||
user_id = watcher.get("user_id", "")
|
||||
user_name = watcher.get("user_name", "")
|
||||
agent_notify = watcher.get("notify_on_complete", False)
|
||||
notify_mode = self._load_background_notifications_mode()
|
||||
|
||||
|
|
@ -6961,6 +6989,8 @@ class GatewayRunner:
|
|||
platform=_platform_enum,
|
||||
chat_id=chat_id,
|
||||
thread_id=thread_id or None,
|
||||
user_id=user_id or None,
|
||||
user_name=user_name or None,
|
||||
)
|
||||
synth_event = MessageEvent(
|
||||
text=synth_text,
|
||||
|
|
@ -8115,17 +8145,16 @@ class GatewayRunner:
|
|||
|
||||
# Get pending message from adapter.
|
||||
# Use session_key (not source.chat_id) to match adapter's storage keys.
|
||||
pending_event = None
|
||||
pending = None
|
||||
if result and adapter and session_key:
|
||||
if result.get("interrupted"):
|
||||
pending = _dequeue_pending_text(adapter, session_key)
|
||||
if not pending and result.get("interrupt_message"):
|
||||
pending = result.get("interrupt_message")
|
||||
else:
|
||||
pending = _dequeue_pending_text(adapter, session_key)
|
||||
if pending:
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
|
||||
pending_event = _dequeue_pending_event(adapter, session_key)
|
||||
if result.get("interrupted") and not pending_event and result.get("interrupt_message"):
|
||||
pending = result.get("interrupt_message")
|
||||
elif pending_event:
|
||||
pending = pending_event.text or _build_media_placeholder(pending_event)
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
|
||||
# Safety net: if the pending text is a slash command (e.g. "/stop",
|
||||
# "/new"), discard it — commands should never be passed to the agent
|
||||
# as user input. The primary fix is in base.py (commands bypass the
|
||||
|
|
@ -8143,27 +8172,29 @@ class GatewayRunner:
|
|||
"commands must not be passed as agent input",
|
||||
_pending_cmd_word,
|
||||
)
|
||||
pending_event = None
|
||||
pending = None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if self._draining and pending:
|
||||
if self._draining and (pending_event or pending):
|
||||
logger.info(
|
||||
"Discarding pending follow-up for session %s during gateway %s",
|
||||
session_key[:20] if session_key else "?",
|
||||
self._status_action_label(),
|
||||
)
|
||||
pending_event = None
|
||||
pending = None
|
||||
|
||||
if pending:
|
||||
if pending_event or pending:
|
||||
logger.debug("Processing pending message: '%s...'", pending[:40])
|
||||
|
||||
|
||||
# Clear the adapter's interrupt event so the next _run_agent call
|
||||
# doesn't immediately re-trigger the interrupt before the new agent
|
||||
# even makes its first API call (this was causing an infinite loop).
|
||||
if adapter and hasattr(adapter, '_active_sessions') and session_key and session_key in adapter._active_sessions:
|
||||
adapter._active_sessions[session_key].clear()
|
||||
|
||||
|
||||
# Cap recursion depth to prevent resource exhaustion when the
|
||||
# user sends multiple messages while the agent keeps failing. (#816)
|
||||
if _interrupt_depth >= self._MAX_INTERRUPT_DEPTH:
|
||||
|
|
@ -8172,9 +8203,10 @@ class GatewayRunner:
|
|||
"queueing message instead of recursing.",
|
||||
_interrupt_depth, session_key,
|
||||
)
|
||||
# Queue the pending message for normal processing on next turn
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter and hasattr(adapter, 'queue_message'):
|
||||
if adapter and pending_event:
|
||||
merge_pending_message_event(adapter._pending_messages, session_key, pending_event)
|
||||
elif adapter and hasattr(adapter, 'queue_message'):
|
||||
adapter.queue_message(session_key, pending)
|
||||
return result_holder[0] or {"final_response": response, "messages": history}
|
||||
|
||||
|
|
@ -8189,23 +8221,37 @@ class GatewayRunner:
|
|||
if first_response and not _already_streamed:
|
||||
try:
|
||||
await adapter.send(source.chat_id, first_response,
|
||||
metadata=getattr(event, "metadata", None))
|
||||
metadata={"thread_id": source.thread_id} if source.thread_id else None)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to send first response before queued message: %s", e)
|
||||
# else: interrupted — discard the interrupted response ("Operation
|
||||
# interrupted." is just noise; the user already knows they sent a
|
||||
# new message).
|
||||
|
||||
# Process the pending message with updated history
|
||||
updated_history = result.get("messages", history)
|
||||
next_source = source
|
||||
next_message = pending
|
||||
next_message_id = None
|
||||
if pending_event is not None:
|
||||
next_source = getattr(pending_event, "source", None) or source
|
||||
next_message = await self._prepare_inbound_message_text(
|
||||
event=pending_event,
|
||||
source=next_source,
|
||||
history=updated_history,
|
||||
)
|
||||
if next_message is None:
|
||||
return result
|
||||
next_message_id = getattr(pending_event, "message_id", None)
|
||||
|
||||
return await self._run_agent(
|
||||
message=pending,
|
||||
message=next_message,
|
||||
context_prompt=context_prompt,
|
||||
history=updated_history,
|
||||
source=source,
|
||||
source=next_source,
|
||||
session_id=session_id,
|
||||
session_key=session_key,
|
||||
_interrupt_depth=_interrupt_depth + 1,
|
||||
event_message_id=next_message_id,
|
||||
)
|
||||
finally:
|
||||
# Stop progress sender, interrupt monitor, and notification task
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue