fix(telegram): restore typing indicator and thread routing for forum General topic

In Telegram forum-enabled groups, the General topic does not include
message_thread_id in incoming messages (it is None). This caused:
1. Messages in General losing thread context — replies went to wrong place
2. Typing indicator failing because thread_id=1 was rejected by Telegram

Fix: synthesize thread_id="1" for forum groups when message_thread_id
is None, then handle it correctly per operation:
- send: omit message_thread_id (Telegram rejects thread_id=1 for sends)
- typing: pass thread_id=1, retry without it on "thread not found"

Also centralizes thread_id extraction into _metadata_thread_id() across
all send methods (send, send_voice, send_image, send_document, send_video,
send_animation, send_photo), replacing ~10 duplicate patterns.

Salvaged from PR #7892 by @corazzione.
Closes #7877, closes #7519.
This commit is contained in:
Markus Corazzione 2026-04-15 22:25:54 -07:00 committed by Teknium
parent 3ff18ffe14
commit 0cf7d570e2
2 changed files with 164 additions and 30 deletions

View file

@ -134,6 +134,7 @@ class TelegramAdapter(BasePlatformAdapter):
# When a chunk is near this limit, a continuation is almost certain. # When a chunk is near this limit, a continuation is almost certain.
_SPLIT_THRESHOLD = 4000 _SPLIT_THRESHOLD = 4000
MEDIA_GROUP_WAIT_SECONDS = 0.8 MEDIA_GROUP_WAIT_SECONDS = 0.8
_GENERAL_TOPIC_THREAD_ID = "1"
def __init__(self, config: PlatformConfig): def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.TELEGRAM) super().__init__(config, Platform.TELEGRAM)
@ -178,6 +179,29 @@ class TelegramAdapter(BasePlatformAdapter):
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()} allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
return "*" in allowed_ids or user_id in allowed_ids return "*" in allowed_ids or user_id in allowed_ids
@classmethod
def _metadata_thread_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]:
if not metadata:
return None
thread_id = metadata.get("thread_id") or metadata.get("message_thread_id")
return str(thread_id) if thread_id is not None else None
@classmethod
def _message_thread_id_for_send(cls, thread_id: Optional[str]) -> Optional[int]:
if not thread_id or str(thread_id) == cls._GENERAL_TOPIC_THREAD_ID:
return None
return int(thread_id)
@classmethod
def _message_thread_id_for_typing(cls, thread_id: Optional[str]) -> Optional[int]:
if not thread_id:
return None
return int(thread_id)
@staticmethod
def _is_thread_not_found_error(error: Exception) -> bool:
return "thread not found" in str(error).lower()
def _fallback_ips(self) -> list[str]: def _fallback_ips(self) -> list[str]:
"""Return validated fallback IPs from config (populated by _apply_env_overrides).""" """Return validated fallback IPs from config (populated by _apply_env_overrides)."""
configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else [] configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else []
@ -849,7 +873,7 @@ class TelegramAdapter(BasePlatformAdapter):
] ]
message_ids = [] message_ids = []
thread_id = metadata.get("thread_id") if metadata else None thread_id = self._metadata_thread_id(metadata)
try: try:
from telegram.error import NetworkError as _NetErr from telegram.error import NetworkError as _NetErr
@ -869,7 +893,7 @@ class TelegramAdapter(BasePlatformAdapter):
for i, chunk in enumerate(chunks): for i, chunk in enumerate(chunks):
should_thread = self._should_thread_reply(reply_to, i) should_thread = self._should_thread_reply(reply_to, i)
reply_to_id = int(reply_to) if should_thread else None reply_to_id = int(reply_to) if should_thread else None
effective_thread_id = int(thread_id) if thread_id else None effective_thread_id = self._message_thread_id_for_send(thread_id)
msg = None msg = None
for _send_attempt in range(3): for _send_attempt in range(3):
@ -906,8 +930,7 @@ class TelegramAdapter(BasePlatformAdapter):
# (not transient network issues). Detect and handle # (not transient network issues). Detect and handle
# specific cases instead of blindly retrying. # specific cases instead of blindly retrying.
if _BadReq and isinstance(send_err, _BadReq): if _BadReq and isinstance(send_err, _BadReq):
err_lower = str(send_err).lower() if self._is_thread_not_found_error(send_err) and effective_thread_id is not None:
if "thread not found" in err_lower and effective_thread_id is not None:
# Thread doesn't exist — retry without # Thread doesn't exist — retry without
# message_thread_id so the message still # message_thread_id so the message still
# reaches the chat. # reaches the chat.
@ -917,6 +940,7 @@ class TelegramAdapter(BasePlatformAdapter):
) )
effective_thread_id = None effective_thread_id = None
continue continue
err_lower = str(send_err).lower()
if "message to be replied not found" in err_lower and reply_to_id is not None: if "message to be replied not found" in err_lower and reply_to_id is not None:
# Original message was deleted before we # Original message was deleted before we
# could reply — clear reply target and retry # could reply — clear reply target and retry
@ -1115,9 +1139,7 @@ class TelegramAdapter(BasePlatformAdapter):
) )
# Resolve thread context for thread replies # Resolve thread context for thread replies
thread_id = None thread_id = self._metadata_thread_id(metadata)
if metadata:
thread_id = metadata.get("thread_id") or metadata.get("message_thread_id")
# We'll use the message_id as part of callback_data to look up session_key # We'll use the message_id as part of callback_data to look up session_key
# Send a placeholder first, then update — or use a counter. # Send a placeholder first, then update — or use a counter.
@ -1145,8 +1167,9 @@ class TelegramAdapter(BasePlatformAdapter):
"reply_markup": keyboard, "reply_markup": keyboard,
**self._link_preview_kwargs(), **self._link_preview_kwargs(),
} }
if thread_id: message_thread_id = self._message_thread_id_for_send(thread_id)
kwargs["message_thread_id"] = int(thread_id) if message_thread_id is not None:
kwargs["message_thread_id"] = message_thread_id
msg = await self._bot.send_message(**kwargs) msg = await self._bot.send_message(**kwargs)
@ -1579,23 +1602,23 @@ class TelegramAdapter(BasePlatformAdapter):
with open(audio_path, "rb") as audio_file: with open(audio_path, "rb") as audio_file:
# .ogg files -> send as voice (round playable bubble) # .ogg files -> send as voice (round playable bubble)
if audio_path.endswith((".ogg", ".opus")): if audio_path.endswith((".ogg", ".opus")):
_voice_thread = metadata.get("thread_id") if metadata else None _voice_thread = self._metadata_thread_id(metadata)
msg = await self._bot.send_voice( msg = await self._bot.send_voice(
chat_id=int(chat_id), chat_id=int(chat_id),
voice=audio_file, voice=audio_file,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_voice_thread) if _voice_thread else None, message_thread_id=self._message_thread_id_for_send(_voice_thread),
) )
else: else:
# .mp3 and others -> send as audio file # .mp3 and others -> send as audio file
_audio_thread = metadata.get("thread_id") if metadata else None _audio_thread = self._metadata_thread_id(metadata)
msg = await self._bot.send_audio( msg = await self._bot.send_audio(
chat_id=int(chat_id), chat_id=int(chat_id),
audio=audio_file, audio=audio_file,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_audio_thread) if _audio_thread else None, message_thread_id=self._message_thread_id_for_send(_audio_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e: except Exception as e:
@ -1625,14 +1648,14 @@ class TelegramAdapter(BasePlatformAdapter):
if not os.path.exists(image_path): if not os.path.exists(image_path):
return SendResult(success=False, error=f"Image file not found: {image_path}") return SendResult(success=False, error=f"Image file not found: {image_path}")
_thread = metadata.get("thread_id") if metadata else None _thread = self._metadata_thread_id(metadata)
with open(image_path, "rb") as image_file: with open(image_path, "rb") as image_file:
msg = await self._bot.send_photo( msg = await self._bot.send_photo(
chat_id=int(chat_id), chat_id=int(chat_id),
photo=image_file, photo=image_file,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_thread) if _thread else None, message_thread_id=self._message_thread_id_for_send(_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e: except Exception as e:
@ -1663,7 +1686,7 @@ class TelegramAdapter(BasePlatformAdapter):
return SendResult(success=False, error=f"File not found: {file_path}") return SendResult(success=False, error=f"File not found: {file_path}")
display_name = file_name or os.path.basename(file_path) display_name = file_name or os.path.basename(file_path)
_thread = metadata.get("thread_id") if metadata else None _thread = self._metadata_thread_id(metadata)
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
msg = await self._bot.send_document( msg = await self._bot.send_document(
@ -1672,7 +1695,7 @@ class TelegramAdapter(BasePlatformAdapter):
filename=display_name, filename=display_name,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_thread) if _thread else None, message_thread_id=self._message_thread_id_for_send(_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e: except Exception as e:
@ -1696,14 +1719,14 @@ class TelegramAdapter(BasePlatformAdapter):
if not os.path.exists(video_path): if not os.path.exists(video_path):
return SendResult(success=False, error=f"Video file not found: {video_path}") return SendResult(success=False, error=f"Video file not found: {video_path}")
_thread = metadata.get("thread_id") if metadata else None _thread = self._metadata_thread_id(metadata)
with open(video_path, "rb") as f: with open(video_path, "rb") as f:
msg = await self._bot.send_video( msg = await self._bot.send_video(
chat_id=int(chat_id), chat_id=int(chat_id),
video=f, video=f,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_thread) if _thread else None, message_thread_id=self._message_thread_id_for_send(_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e: except Exception as e:
@ -1733,13 +1756,13 @@ class TelegramAdapter(BasePlatformAdapter):
try: try:
# Telegram can send photos directly from URLs (up to ~5MB) # Telegram can send photos directly from URLs (up to ~5MB)
_photo_thread = metadata.get("thread_id") if metadata else None _photo_thread = self._metadata_thread_id(metadata)
msg = await self._bot.send_photo( msg = await self._bot.send_photo(
chat_id=int(chat_id), chat_id=int(chat_id),
photo=image_url, photo=image_url,
caption=caption[:1024] if caption else None, # Telegram caption limit caption=caption[:1024] if caption else None, # Telegram caption limit
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_photo_thread) if _photo_thread else None, message_thread_id=self._message_thread_id_for_send(_photo_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e: except Exception as e:
@ -1762,6 +1785,7 @@ class TelegramAdapter(BasePlatformAdapter):
photo=image_data, photo=image_data,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=self._message_thread_id_for_send(_photo_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e2: except Exception as e2:
@ -1787,13 +1811,13 @@ class TelegramAdapter(BasePlatformAdapter):
return SendResult(success=False, error="Not connected") return SendResult(success=False, error="Not connected")
try: try:
_anim_thread = metadata.get("thread_id") if metadata else None _anim_thread = self._metadata_thread_id(metadata)
msg = await self._bot.send_animation( msg = await self._bot.send_animation(
chat_id=int(chat_id), chat_id=int(chat_id),
animation=animation_url, animation=animation_url,
caption=caption[:1024] if caption else None, caption=caption[:1024] if caption else None,
reply_to_message_id=int(reply_to) if reply_to else None, reply_to_message_id=int(reply_to) if reply_to else None,
message_thread_id=int(_anim_thread) if _anim_thread else None, message_thread_id=self._message_thread_id_for_send(_anim_thread),
) )
return SendResult(success=True, message_id=str(msg.message_id)) return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e: except Exception as e:
@ -1810,12 +1834,23 @@ class TelegramAdapter(BasePlatformAdapter):
"""Send typing indicator.""" """Send typing indicator."""
if self._bot: if self._bot:
try: try:
_typing_thread = metadata.get("thread_id") if metadata else None _typing_thread = self._metadata_thread_id(metadata)
await self._bot.send_chat_action( message_thread_id = self._message_thread_id_for_typing(_typing_thread)
chat_id=int(chat_id), try:
action="typing", await self._bot.send_chat_action(
message_thread_id=int(_typing_thread) if _typing_thread else None, chat_id=int(chat_id),
) action="typing",
message_thread_id=message_thread_id,
)
except Exception as e:
if message_thread_id is not None and self._is_thread_not_found_error(e):
await self._bot.send_chat_action(
chat_id=int(chat_id),
action="typing",
message_thread_id=None,
)
else:
raise
except Exception as e: except Exception as e:
# Typing failures are non-fatal; log at debug level only. # Typing failures are non-fatal; log at debug level only.
logger.debug( logger.debug(
@ -2760,7 +2795,9 @@ class TelegramAdapter(BasePlatformAdapter):
# Resolve DM topic name and skill binding # Resolve DM topic name and skill binding
thread_id_raw = message.message_thread_id thread_id_raw = message.message_thread_id
thread_id_str = str(thread_id_raw) if thread_id_raw else None thread_id_str = str(thread_id_raw) if thread_id_raw is not None else None
if chat_type == "group" and thread_id_str is None and getattr(chat, "is_forum", False):
thread_id_str = self._GENERAL_TOPIC_THREAD_ID
chat_topic = None chat_topic = None
topic_skill = None topic_skill = None

View file

@ -45,6 +45,11 @@ class FakeRetryAfter(Exception):
# Build a fake telegram module tree so the adapter's internal imports work # Build a fake telegram module tree so the adapter's internal imports work
_fake_telegram = types.ModuleType("telegram") _fake_telegram = types.ModuleType("telegram")
_fake_telegram.Update = object
_fake_telegram.Bot = object
_fake_telegram.Message = object
_fake_telegram.InlineKeyboardButton = object
_fake_telegram.InlineKeyboardMarkup = object
_fake_telegram_error = types.ModuleType("telegram.error") _fake_telegram_error = types.ModuleType("telegram.error")
_fake_telegram_error.NetworkError = FakeNetworkError _fake_telegram_error.NetworkError = FakeNetworkError
_fake_telegram_error.BadRequest = FakeBadRequest _fake_telegram_error.BadRequest = FakeBadRequest
@ -52,7 +57,21 @@ _fake_telegram_error.TimedOut = FakeTimedOut
_fake_telegram.error = _fake_telegram_error _fake_telegram.error = _fake_telegram_error
_fake_telegram_constants = types.ModuleType("telegram.constants") _fake_telegram_constants = types.ModuleType("telegram.constants")
_fake_telegram_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2") _fake_telegram_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2")
_fake_telegram_constants.ChatType = SimpleNamespace(
GROUP="group",
SUPERGROUP="supergroup",
CHANNEL="channel",
)
_fake_telegram.constants = _fake_telegram_constants _fake_telegram.constants = _fake_telegram_constants
_fake_telegram_ext = types.ModuleType("telegram.ext")
_fake_telegram_ext.Application = object
_fake_telegram_ext.CommandHandler = object
_fake_telegram_ext.CallbackQueryHandler = object
_fake_telegram_ext.MessageHandler = object
_fake_telegram_ext.ContextTypes = SimpleNamespace(DEFAULT_TYPE=object)
_fake_telegram_ext.filters = object
_fake_telegram_request = types.ModuleType("telegram.request")
_fake_telegram_request.HTTPXRequest = object
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@ -61,6 +80,8 @@ def _inject_fake_telegram(monkeypatch):
monkeypatch.setitem(sys.modules, "telegram", _fake_telegram) monkeypatch.setitem(sys.modules, "telegram", _fake_telegram)
monkeypatch.setitem(sys.modules, "telegram.error", _fake_telegram_error) monkeypatch.setitem(sys.modules, "telegram.error", _fake_telegram_error)
monkeypatch.setitem(sys.modules, "telegram.constants", _fake_telegram_constants) monkeypatch.setitem(sys.modules, "telegram.constants", _fake_telegram_constants)
monkeypatch.setitem(sys.modules, "telegram.ext", _fake_telegram_ext)
monkeypatch.setitem(sys.modules, "telegram.request", _fake_telegram_request)
def _make_adapter(): def _make_adapter():
@ -68,6 +89,7 @@ def _make_adapter():
config = PlatformConfig(enabled=True, token="fake-token") config = PlatformConfig(enabled=True, token="fake-token")
adapter = object.__new__(TelegramAdapter) adapter = object.__new__(TelegramAdapter)
adapter.config = config
adapter._config = config adapter._config = config
adapter._platform = Platform.TELEGRAM adapter._platform = Platform.TELEGRAM
adapter._connected = True adapter._connected = True
@ -82,6 +104,81 @@ def _make_adapter():
return adapter return adapter
def test_forum_general_topic_without_message_thread_id_keeps_thread_context():
"""Forum General-topic messages should keep synthetic thread context."""
from gateway.platforms import telegram as telegram_mod
adapter = _make_adapter()
message = SimpleNamespace(
text="hello from General",
caption=None,
chat=SimpleNamespace(
id=-100123,
type=telegram_mod.ChatType.SUPERGROUP,
is_forum=True,
title="Forum group",
),
from_user=SimpleNamespace(id=456, full_name="Alice"),
message_thread_id=None,
reply_to_message=None,
message_id=10,
date=None,
)
event = adapter._build_message_event(message, msg_type=SimpleNamespace(value="text"))
assert event.source.chat_id == "-100123"
assert event.source.chat_type == "group"
assert event.source.thread_id == "1"
@pytest.mark.asyncio
async def test_send_omits_general_topic_thread_id():
"""Telegram sends to forum General should omit message_thread_id=1."""
adapter = _make_adapter()
call_log = []
async def mock_send_message(**kwargs):
call_log.append(dict(kwargs))
return SimpleNamespace(message_id=42)
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(
chat_id="-100123",
content="test message",
metadata={"thread_id": "1"},
)
assert result.success is True
assert len(call_log) == 1
assert call_log[0]["chat_id"] == -100123
assert call_log[0]["text"] == "test message"
assert call_log[0]["reply_to_message_id"] is None
assert call_log[0]["message_thread_id"] is None
@pytest.mark.asyncio
async def test_send_typing_retries_without_general_thread_when_not_found():
"""Typing for forum General should fall back if Telegram rejects thread 1."""
adapter = _make_adapter()
call_log = []
async def mock_send_chat_action(**kwargs):
call_log.append(dict(kwargs))
if kwargs.get("message_thread_id") == 1:
raise FakeBadRequest("Message thread not found")
adapter._bot = SimpleNamespace(send_chat_action=mock_send_chat_action)
await adapter.send_typing("-100123", metadata={"thread_id": "1"})
assert call_log == [
{"chat_id": -100123, "action": "typing", "message_thread_id": 1},
{"chat_id": -100123, "action": "typing", "message_thread_id": None},
]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_retries_without_thread_on_thread_not_found(): async def test_send_retries_without_thread_on_thread_not_found():
"""When message_thread_id causes 'thread not found', retry without it.""" """When message_thread_id causes 'thread not found', retry without it."""