fix(telegram): preserve DM topic routing via reply fallback

This commit is contained in:
Jhin Lee 2026-05-08 21:33:20 -04:00 committed by kshitij
parent 28b5bd7e93
commit b3239572f0
6 changed files with 1331 additions and 152 deletions

View file

@ -571,6 +571,7 @@ from gateway.platforms.base import (
EphemeralReply,
MessageEvent,
MessageType,
_reply_anchor_for_event,
merge_pending_message_event,
)
from gateway.restart import (
@ -2406,7 +2407,8 @@ class GatewayRunner:
if not adapter:
return True
thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
reply_anchor = self._reply_anchor_for_event(event)
thread_meta = self._thread_metadata_for_source(event.source, reply_anchor)
if self._queue_during_drain_enabled():
self._queue_or_replace_pending_event(session_key, event)
message = f"⏳ Gateway {self._status_action_gerund()} — queued for the next turn after it comes back."
@ -2416,7 +2418,13 @@ class GatewayRunner:
await adapter._send_with_retry(
chat_id=event.source.chat_id,
content=message,
reply_to=event.message_id,
reply_to=(
reply_anchor
if event.source.platform == Platform.TELEGRAM
and event.source.chat_type == "dm"
and event.source.thread_id
else (None if event.source.platform == Platform.TELEGRAM and event.source.thread_id else event.message_id)
),
metadata=thread_meta,
)
return True
@ -2553,12 +2561,19 @@ class GatewayRunner:
except Exception as _onb_err:
logger.debug("Failed to apply busy-input onboarding hint: %s", _onb_err)
thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
reply_anchor = self._reply_anchor_for_event(event)
thread_meta = self._thread_metadata_for_source(event.source, reply_anchor)
try:
await adapter._send_with_retry(
chat_id=event.source.chat_id,
content=message,
reply_to=event.message_id,
reply_to=(
reply_anchor
if event.source.platform == Platform.TELEGRAM
and event.source.chat_type == "dm"
and event.source.thread_id
else (None if event.source.platform == Platform.TELEGRAM and event.source.thread_id else event.message_id)
),
metadata=thread_meta,
)
except Exception as e:
@ -5063,7 +5078,7 @@ class GatewayRunner:
if config and hasattr(config, "get_notice_delivery"):
notice_delivery = config.get_notice_delivery(source.platform)
metadata = {"thread_id": source.thread_id} if getattr(source, "thread_id", None) else None
metadata = self._thread_metadata_for_source(source)
if notice_delivery == "private" and getattr(source, "user_id", None):
try:
result = await adapter.send_private_notice(
@ -6158,7 +6173,7 @@ class GatewayRunner:
)
if any(marker in message_text for marker in _stt_fail_markers):
_stt_adapter = self.adapters.get(source.platform)
_stt_meta = {"thread_id": source.thread_id} if source.thread_id else None
_stt_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
if _stt_adapter:
try:
_stt_msg = (
@ -6679,7 +6694,7 @@ class GatewayRunner:
f"{_compress_token_threshold:,}",
)
_hyg_meta = {"thread_id": source.thread_id} if source.thread_id else None
_hyg_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
try:
from run_agent import AIAgent
@ -6908,7 +6923,7 @@ class GatewayRunner:
session_id=session_entry.session_id,
session_key=session_key,
run_generation=run_generation,
event_message_id=event.message_id,
event_message_id=self._reply_anchor_for_event(event),
channel_prompt=event.channel_prompt,
)
@ -7249,7 +7264,11 @@ class GatewayRunner:
try:
_foot_adapter = self.adapters.get(source.platform)
if _foot_adapter:
await _foot_adapter.send(source.chat_id, _footer_line)
await _foot_adapter.send(
source.chat_id,
_footer_line,
metadata=self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)),
)
except Exception as _e:
logger.debug("trailing footer send failed: %s", _e)
return None
@ -8264,7 +8283,7 @@ class GatewayRunner:
lines.append("_(session only — use `/model <name> --global` to persist)_")
return "\n".join(lines)
metadata = {"thread_id": source.thread_id} if source.thread_id else None
metadata = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
result = await adapter.send_model_picker(
chat_id=source.chat_id,
providers=providers,
@ -8685,7 +8704,7 @@ class GatewayRunner:
try:
metadata = self._thread_metadata_for_source(source)
except Exception:
metadata = {"thread_id": source.thread_id} if getattr(source, "thread_id", None) else None
metadata = None
result = await adapter.send(source.chat_id, message, metadata=metadata)
if result is not None and not getattr(result, "success", True):
@ -9250,13 +9269,15 @@ class GatewayRunner:
and adapter.is_in_voice_channel(guild_id)):
await adapter.play_in_voice_channel(guild_id, actual_path)
elif adapter and hasattr(adapter, "send_voice"):
reply_anchor = self._reply_anchor_for_event(event)
thread_meta = self._thread_metadata_for_source(event.source, reply_anchor)
send_kwargs: Dict[str, Any] = {
"chat_id": event.source.chat_id,
"audio_path": actual_path,
"reply_to": event.message_id,
"reply_to": reply_anchor,
}
if event.source.thread_id:
send_kwargs["metadata"] = {"thread_id": event.source.thread_id}
if thread_meta:
send_kwargs["metadata"] = thread_meta
await adapter.send_voice(**send_kwargs)
except Exception as e:
logger.warning("Auto voice reply failed: %s", e, exc_info=True)
@ -9293,7 +9314,7 @@ class GatewayRunner:
_, cleaned = adapter.extract_images(response)
local_files, _ = adapter.extract_local_files(cleaned)
_thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
_thread_meta = self._thread_metadata_for_source(event.source, self._reply_anchor_for_event(event))
from gateway.platforms.base import should_send_media_as_audio
@ -9457,9 +9478,16 @@ class GatewayRunner:
source = event.source
task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{os.urandom(3).hex()}"
event_message_id = self._reply_anchor_for_event(event)
# Fire-and-forget the background task
_task = asyncio.create_task(
self._run_background_task(prompt, source, task_id)
self._run_background_task(
prompt,
source,
task_id,
event_message_id=event_message_id,
)
)
self._background_tasks.add(_task)
_task.add_done_callback(self._background_tasks.discard)
@ -9468,7 +9496,11 @@ class GatewayRunner:
return f'🔄 Background task started: "{preview}"\nTask ID: {task_id}\nYou can keep chatting — results will appear when done.'
async def _run_background_task(
self, prompt: str, source: "SessionSource", task_id: str
self,
prompt: str,
source: "SessionSource",
task_id: str,
event_message_id: Optional[str] = None,
) -> None:
"""Execute a background agent task and deliver the result to the chat."""
from run_agent import AIAgent
@ -9478,7 +9510,7 @@ class GatewayRunner:
logger.warning("No adapter for platform %s in background task %s", source.platform, task_id)
return
_thread_metadata = {"thread_id": source.thread_id} if source.thread_id else None
_thread_metadata = self._thread_metadata_for_source(source, event_message_id)
try:
user_config = _load_gateway_config()
@ -11293,7 +11325,7 @@ class GatewayRunner:
_slash_confirm_mod.register(session_key, confirm_id, command, handler)
adapter = self.adapters.get(source.platform)
metadata = self._thread_metadata_for_source(source)
metadata = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
used_buttons = False
if adapter is not None:
@ -11333,12 +11365,30 @@ class GatewayRunner:
except Exception:
return {}
def _thread_metadata_for_source(self, source) -> Optional[Dict[str, Any]]:
def _thread_metadata_for_source(
self,
source,
reply_to_message_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Build the metadata dict platforms need for thread-aware replies."""
thread_id = getattr(source, "thread_id", None)
if thread_id is None:
return None
return {"thread_id": thread_id}
metadata: Dict[str, Any] = {"thread_id": thread_id}
if (
getattr(source, "platform", None) == Platform.TELEGRAM
and getattr(source, "chat_type", None) == "dm"
):
metadata["telegram_dm_topic_reply_fallback"] = True
anchor = reply_to_message_id or getattr(source, "message_id", None)
if anchor is not None:
metadata["telegram_reply_to_message_id"] = str(anchor)
return metadata
@staticmethod
def _reply_anchor_for_event(event: MessageEvent) -> Optional[str]:
"""Return the platform-specific reply anchor for GatewayRunner sends."""
return _reply_anchor_for_event(event)
# ------------------------------------------------------------------
@ -13131,10 +13181,7 @@ class GatewayRunner:
else bool(_plat_streaming)
)
if source.thread_id:
_thread_metadata: Optional[Dict[str, Any]] = {"thread_id": source.thread_id}
else:
_thread_metadata = None
_thread_metadata: Optional[Dict[str, Any]] = self._thread_metadata_for_source(source, event_message_id)
if _streaming_enabled:
try:
@ -13564,8 +13611,8 @@ class GatewayRunner:
#
# Threading metadata is platform-specific:
# - Slack DM threading needs event_message_id fallback (reply thread)
# - Telegram uses message_thread_id only for forum topics; passing a
# normal DM/group message id as thread_id causes send failures
# - Telegram forum topics use message_thread_id; Hermes-created private
# DM topic lanes require both thread metadata and a reply anchor
# - Feishu only honors reply_in_thread when sending a reply, so topic
# progress uses the triggering event message as the reply target
# - Other platforms should use explicit source.thread_id only
@ -13573,7 +13620,11 @@ class GatewayRunner:
_progress_thread_id = source.thread_id or event_message_id
else:
_progress_thread_id = source.thread_id
_progress_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None
_progress_metadata = (
self._thread_metadata_for_source(source, event_message_id)
if _progress_thread_id == source.thread_id
else {"thread_id": _progress_thread_id}
) if _progress_thread_id else None
_progress_reply_to = (
event_message_id
if source.platform == Platform.FEISHU and source.thread_id and event_message_id
@ -13833,7 +13884,7 @@ class GatewayRunner:
"reply_to_message_id": event_message_id,
}
else:
_status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None
_status_thread_metadata = self._thread_metadata_for_source(source, event_message_id) if _progress_thread_id else None
def _status_callback_sync(event_type: str, message: str) -> None:
if not _status_adapter or not _run_still_current():
@ -15118,7 +15169,7 @@ class GatewayRunner:
)
if next_message is None:
return result
next_message_id = getattr(pending_event, "message_id", None)
next_message_id = self._reply_anchor_for_event(pending_event)
next_channel_prompt = getattr(pending_event, "channel_prompt", None)
# Restart typing indicator so the user sees activity while