mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Prevent Telegram polling handoffs and flood-control send failures
Telegram polling can inherit a stale webhook registration when a deployment switches transport modes, which leaves getUpdates idle even though the gateway starts cleanly. Outbound send also treats Telegram retry_after responses as terminal errors, so brief flood control can drop tool progress and replies. Constraint: Keep the PR narrowly scoped to upstream/main Telegram adapter behavior Rejected: Port OpenClaw's broader polling supervisor and offset persistence | too broad for an isolated fix PR Confidence: high Scope-risk: narrow Reversibility: clean Directive: Polling mode should clear webhook state before starting getUpdates, and send-path retry logic must distinguish flood control from timeouts Tested: uv run --extra dev pytest tests/gateway/test_telegram_* -q Not-tested: Live Telegram webhook-to-polling migration and real Bot API 429 behavior
This commit is contained in:
parent
74ff62f5ac
commit
1d2e34c7eb
3 changed files with 96 additions and 3 deletions
|
|
@ -601,6 +601,12 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
)
|
||||
else:
|
||||
# ── Polling mode (default) ───────────────────────────
|
||||
# Clear any stale webhook first so polling doesn't inherit a
|
||||
# previous webhook registration and silently stop receiving updates.
|
||||
delete_webhook = getattr(self._bot, "delete_webhook", None)
|
||||
if callable(delete_webhook):
|
||||
await delete_webhook(drop_pending_updates=False)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def _polling_error_callback(error: Exception) -> None:
|
||||
|
|
@ -856,6 +862,21 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
await asyncio.sleep(wait)
|
||||
else:
|
||||
raise
|
||||
except Exception as send_err:
|
||||
retry_after = getattr(send_err, "retry_after", None)
|
||||
if retry_after is not None or "retry after" in str(send_err).lower():
|
||||
if _send_attempt < 2:
|
||||
wait = float(retry_after) if retry_after is not None else 1.0
|
||||
logger.warning(
|
||||
"[%s] Telegram flood control on send (attempt %d/3), retrying in %.1fs: %s",
|
||||
self.name,
|
||||
_send_attempt + 1,
|
||||
wait,
|
||||
send_err,
|
||||
)
|
||||
await asyncio.sleep(wait)
|
||||
continue
|
||||
raise
|
||||
message_ids.append(str(msg.message_id))
|
||||
|
||||
return SendResult(
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ async def test_polling_conflict_retries_before_fatal(monkeypatch):
|
|||
stop=AsyncMock(),
|
||||
running=True,
|
||||
)
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock())
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock())
|
||||
app = SimpleNamespace(
|
||||
bot=bot,
|
||||
updater=updater,
|
||||
|
|
@ -99,6 +99,7 @@ async def test_polling_conflict_retries_before_fatal(monkeypatch):
|
|||
ok = await adapter.connect()
|
||||
|
||||
assert ok is True
|
||||
bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False)
|
||||
assert callable(captured["error_callback"])
|
||||
|
||||
conflict = type("Conflict", (Exception,), {})
|
||||
|
|
@ -153,7 +154,7 @@ async def test_polling_conflict_becomes_fatal_after_retries(monkeypatch):
|
|||
stop=AsyncMock(),
|
||||
running=True,
|
||||
)
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock())
|
||||
bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock())
|
||||
app = SimpleNamespace(
|
||||
bot=bot,
|
||||
updater=updater,
|
||||
|
|
@ -208,7 +209,7 @@ async def test_connect_marks_retryable_fatal_error_for_startup_network_failure(m
|
|||
builder = MagicMock()
|
||||
builder.token.return_value = builder
|
||||
app = SimpleNamespace(
|
||||
bot=SimpleNamespace(),
|
||||
bot=SimpleNamespace(delete_webhook=AsyncMock(), set_my_commands=AsyncMock()),
|
||||
updater=SimpleNamespace(),
|
||||
add_handler=MagicMock(),
|
||||
initialize=AsyncMock(side_effect=RuntimeError("Temporary failure in name resolution")),
|
||||
|
|
@ -225,6 +226,49 @@ async def test_connect_marks_retryable_fatal_error_for_startup_network_failure(m
|
|||
assert "Temporary failure in name resolution" in adapter.fatal_error_message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_clears_webhook_before_polling(monkeypatch):
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.acquire_scoped_lock",
|
||||
lambda scope, identity, metadata=None: (True, None),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.release_scoped_lock",
|
||||
lambda scope, identity: None,
|
||||
)
|
||||
|
||||
updater = SimpleNamespace(
|
||||
start_polling=AsyncMock(),
|
||||
stop=AsyncMock(),
|
||||
running=True,
|
||||
)
|
||||
bot = SimpleNamespace(
|
||||
delete_webhook=AsyncMock(),
|
||||
set_my_commands=AsyncMock(),
|
||||
)
|
||||
app = SimpleNamespace(
|
||||
bot=bot,
|
||||
updater=updater,
|
||||
add_handler=MagicMock(),
|
||||
initialize=AsyncMock(),
|
||||
start=AsyncMock(),
|
||||
)
|
||||
builder = MagicMock()
|
||||
builder.token.return_value = builder
|
||||
builder.build.return_value = app
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.telegram.Application",
|
||||
SimpleNamespace(builder=MagicMock(return_value=builder)),
|
||||
)
|
||||
|
||||
ok = await adapter.connect()
|
||||
|
||||
assert ok is True
|
||||
bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnect_skips_inactive_updater_and_app(monkeypatch):
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
|
|
|
|||
|
|
@ -37,6 +37,12 @@ class FakeTimedOut(FakeNetworkError):
|
|||
pass
|
||||
|
||||
|
||||
class FakeRetryAfter(Exception):
|
||||
def __init__(self, seconds):
|
||||
super().__init__(f"Retry after {seconds}")
|
||||
self.retry_after = seconds
|
||||
|
||||
|
||||
# Build a fake telegram module tree so the adapter's internal imports work
|
||||
_fake_telegram = types.ModuleType("telegram")
|
||||
_fake_telegram_error = types.ModuleType("telegram.error")
|
||||
|
|
@ -230,3 +236,25 @@ async def test_thread_fallback_only_fires_once():
|
|||
# Second chunk: should use thread_id=None directly (effective_thread_id
|
||||
# was cleared per-chunk but the metadata doesn't change between chunks)
|
||||
# The key point: the message was delivered despite the invalid thread
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_retries_retry_after_errors():
|
||||
"""Telegram flood control should back off and retry instead of failing fast."""
|
||||
adapter = _make_adapter()
|
||||
|
||||
attempt = [0]
|
||||
|
||||
async def mock_send_message(**kwargs):
|
||||
attempt[0] += 1
|
||||
if attempt[0] == 1:
|
||||
raise FakeRetryAfter(2)
|
||||
return SimpleNamespace(message_id=300)
|
||||
|
||||
adapter._bot = SimpleNamespace(send_message=mock_send_message)
|
||||
|
||||
result = await adapter.send(chat_id="123", content="test message")
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id == "300"
|
||||
assert attempt[0] == 2
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue