mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
Add CLI Telegram QR onboarding
Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
parent
8a9ded5b21
commit
6bf55a473e
4 changed files with 833 additions and 16 deletions
|
|
@ -4382,6 +4382,35 @@ def _setup_standard_platform(platform: dict):
|
|||
if not prompt_yes_no(f" Reconfigure {label}?", False):
|
||||
return
|
||||
|
||||
auto_token_saved = False
|
||||
auto_owner_user_id = None
|
||||
if platform.get("key") == "telegram":
|
||||
print()
|
||||
print_info(" Telegram can be configured automatically with a managed bot:")
|
||||
print_info(" [1] Automatic (scan QR → confirm in Telegram → done)")
|
||||
print_info(" [2] Manual BotFather token")
|
||||
choice = prompt(" Choice [1/2]", default="1")
|
||||
if choice.strip() == "1":
|
||||
try:
|
||||
from hermes_cli.telegram_managed_bot import (
|
||||
auto_setup_telegram_bot_result,
|
||||
is_valid_telegram_bot_token,
|
||||
)
|
||||
except ImportError:
|
||||
print_warning(" Automatic setup is unavailable in this install.")
|
||||
else:
|
||||
result = auto_setup_telegram_bot_result()
|
||||
if result and is_valid_telegram_bot_token(result.token):
|
||||
save_env_value(token_var, result.token)
|
||||
print_success(" Saved TELEGRAM_BOT_TOKEN")
|
||||
auto_token_saved = True
|
||||
auto_owner_user_id = result.owner_user_id
|
||||
else:
|
||||
if result:
|
||||
print_warning(" Automatic setup returned an invalid Telegram token.")
|
||||
print()
|
||||
print_info(" Falling back to manual setup...")
|
||||
|
||||
allowed_val_set = None # Track if user set an allowlist (for home channel offer)
|
||||
|
||||
for var in platform["vars"]:
|
||||
|
|
@ -4391,8 +4420,30 @@ def _setup_standard_platform(platform: dict):
|
|||
if existing and var["name"] != token_var:
|
||||
print_info(f" Current: {existing}")
|
||||
|
||||
if auto_token_saved and var["name"] == token_var:
|
||||
print_info(" Token saved by automatic setup.")
|
||||
continue
|
||||
|
||||
# Allowlist fields get special handling for the deny-by-default security model
|
||||
if var.get("is_allowlist"):
|
||||
if "TELEGRAM" in var["name"] and auto_owner_user_id:
|
||||
detected_id = str(auto_owner_user_id)
|
||||
print_success(f" Detected your Telegram user ID: {detected_id}")
|
||||
if prompt_yes_no(" Allow this Telegram account to use the bot?", True):
|
||||
extra = prompt(
|
||||
" Additional allowed user IDs (comma-separated, optional)",
|
||||
password=False,
|
||||
)
|
||||
ids = [detected_id]
|
||||
for uid in extra.replace(" ", "").split(","):
|
||||
if uid and uid not in ids:
|
||||
ids.append(uid)
|
||||
cleaned = ",".join(ids)
|
||||
save_env_value(var["name"], cleaned)
|
||||
print_success(" Saved — only these users can interact with the bot.")
|
||||
allowed_val_set = cleaned
|
||||
continue
|
||||
|
||||
print_info(" The gateway DENIES all users by default for security.")
|
||||
print_info(" Enter user IDs to create an allowlist, or leave empty")
|
||||
print_info(" and you'll be asked about open access next.")
|
||||
|
|
|
|||
|
|
@ -1637,6 +1637,52 @@ def setup_agent_settings(config: dict):
|
|||
# =============================================================================
|
||||
|
||||
|
||||
_TELEGRAM_BOT_TOKEN_RE = re.compile(r"^\d+:[A-Za-z0-9_-]{30,}$")
|
||||
|
||||
|
||||
def _is_valid_telegram_bot_token(token: str) -> bool:
|
||||
return bool(_TELEGRAM_BOT_TOKEN_RE.match(token))
|
||||
|
||||
|
||||
def _setup_telegram_auto_result():
|
||||
"""Attempt automatic Telegram bot creation via managed QR onboarding."""
|
||||
try:
|
||||
from hermes_cli.telegram_managed_bot import auto_setup_telegram_bot_result
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
profile_name: str | None = None
|
||||
try:
|
||||
hermes_home = str(get_hermes_home())
|
||||
if "/profiles/" in hermes_home:
|
||||
profile_name = hermes_home.rstrip("/").rsplit("/", 1)[-1]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return auto_setup_telegram_bot_result(profile_name=profile_name)
|
||||
|
||||
|
||||
def _setup_telegram_auto() -> str | None:
|
||||
"""Attempt automatic Telegram bot creation and return only the token."""
|
||||
result = _setup_telegram_auto_result()
|
||||
return result.token if result else None
|
||||
|
||||
|
||||
def _prompt_telegram_bot_token() -> str | None:
|
||||
print_info("Create a bot via @BotFather on Telegram")
|
||||
while True:
|
||||
token = prompt("Telegram bot token", password=True)
|
||||
if not token:
|
||||
return None
|
||||
if not _is_valid_telegram_bot_token(token):
|
||||
print_error(
|
||||
"Invalid token format. Expected: <numeric_id>:<alphanumeric_hash> "
|
||||
"(e.g., 123456789:ABCdefGHI-jklMNOpqrSTUvwxYZ)"
|
||||
)
|
||||
continue
|
||||
return token
|
||||
|
||||
|
||||
def _setup_telegram():
|
||||
"""Configure Telegram bot credentials and allowlist."""
|
||||
print_header("Telegram")
|
||||
|
|
@ -1655,20 +1701,40 @@ def _setup_telegram():
|
|||
print_success("Telegram allowlist configured")
|
||||
return
|
||||
|
||||
print_info("Create a bot via @BotFather on Telegram")
|
||||
import re
|
||||
print_info("How would you like to create your Telegram bot?")
|
||||
print()
|
||||
print_info(" [1] Automatic (recommended)")
|
||||
print_info(" Scan a QR code → confirm in Telegram → done.")
|
||||
print_info(" No token copy-paste needed.")
|
||||
print()
|
||||
print_info(" [2] Manual")
|
||||
print_info(" Create a bot via @BotFather yourself and paste the token.")
|
||||
print()
|
||||
|
||||
while True:
|
||||
token = prompt("Telegram bot token", password=True)
|
||||
choice = prompt("Choice [1/2]", default="1")
|
||||
token = None
|
||||
setup_result = None
|
||||
|
||||
if choice.strip() == "1":
|
||||
setup_result = _setup_telegram_auto_result()
|
||||
if setup_result:
|
||||
token = setup_result.token
|
||||
if not _is_valid_telegram_bot_token(token):
|
||||
print_error("Automatic setup returned an invalid Telegram bot token.")
|
||||
token = None
|
||||
setup_result = None
|
||||
else:
|
||||
token = None
|
||||
if not token:
|
||||
return
|
||||
if not re.match(r"^\d+:[A-Za-z0-9_-]{30,}$", token):
|
||||
print_error(
|
||||
"Invalid token format. Expected: <numeric_id>:<alphanumeric_hash> "
|
||||
"(e.g., 123456789:ABCdefGHI-jklMNOpqrSTUvwxYZ)"
|
||||
)
|
||||
continue
|
||||
break
|
||||
print()
|
||||
print_info("Falling back to manual setup...")
|
||||
print()
|
||||
|
||||
if not token:
|
||||
token = _prompt_telegram_bot_token()
|
||||
if not token:
|
||||
return
|
||||
|
||||
save_env_value("TELEGRAM_BOT_TOKEN", token)
|
||||
print_success("Telegram token saved")
|
||||
|
||||
|
|
@ -1678,11 +1744,30 @@ def _setup_telegram():
|
|||
print_info(" 1. Message @userinfobot on Telegram")
|
||||
print_info(" 2. It will reply with your numeric ID (e.g., 123456789)")
|
||||
print()
|
||||
allowed_users = prompt(
|
||||
"Allowed user IDs (comma-separated, leave empty for open access)"
|
||||
)
|
||||
|
||||
detected_user_id = getattr(setup_result, "owner_user_id", None)
|
||||
if detected_user_id:
|
||||
detected_id = str(detected_user_id)
|
||||
print_success(f"Detected your Telegram user ID: {detected_id}")
|
||||
if prompt_yes_no("Allow this Telegram account to use the bot?", True):
|
||||
extra = prompt("Additional allowed user IDs (comma-separated, optional)")
|
||||
ids = [detected_id]
|
||||
for uid in extra.replace(" ", "").split(","):
|
||||
if uid and uid not in ids:
|
||||
ids.append(uid)
|
||||
allowed_users = ",".join(ids)
|
||||
else:
|
||||
allowed_users = prompt(
|
||||
"Allowed user IDs (comma-separated, leave empty for open access)"
|
||||
)
|
||||
else:
|
||||
allowed_users = prompt(
|
||||
"Allowed user IDs (comma-separated, leave empty for open access)"
|
||||
)
|
||||
|
||||
if allowed_users:
|
||||
save_env_value("TELEGRAM_ALLOWED_USERS", allowed_users.replace(" ", ""))
|
||||
allowed_users = allowed_users.replace(" ", "")
|
||||
save_env_value("TELEGRAM_ALLOWED_USERS", allowed_users)
|
||||
print_success("Telegram allowlist configured - only listed users can use the bot")
|
||||
else:
|
||||
print_info("⚠️ No allowlist set - anyone who finds your bot can use it!")
|
||||
|
|
|
|||
358
hermes_cli/telegram_managed_bot.py
Normal file
358
hermes_cli/telegram_managed_bot.py
Normal file
|
|
@ -0,0 +1,358 @@
|
|||
"""Telegram Managed Bot onboarding client.
|
||||
|
||||
Uses Telegram's Managed Bots feature to create a user-owned child bot without
|
||||
manual BotFather token copy-paste. Hermes talks only to the Nous onboarding
|
||||
service; the raw Telegram token is saved locally after one-time retrieval.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import secrets
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
# Default pairing API base URL (Nous-hosted Cloudflare Worker).
|
||||
# Override for PoC/staging with TELEGRAM_ONBOARDING_URL.
|
||||
DEFAULT_API_URL = "https://setup.hermes-agent.nousresearch.com"
|
||||
TELEGRAM_ONBOARDING_URL_ENV = "TELEGRAM_ONBOARDING_URL"
|
||||
|
||||
# The Nous-hosted manager bot username (without @). The backend returns the
|
||||
# actual deep link, so this is only used by local helpers/tests.
|
||||
DEFAULT_MANAGER_BOT = "HermesSetupBot"
|
||||
|
||||
DEFAULT_BOT_NAME = "Hermes Agent"
|
||||
DEFAULT_POLL_TIMEOUT = 180
|
||||
POLL_INTERVAL = 2
|
||||
|
||||
_USERNAME_SLUG_ALPHABET = "abcdefghijklmnopqrstuvwxyz234567"
|
||||
_TELEGRAM_BOT_TOKEN_RE = re.compile(r"^\d+:[A-Za-z0-9_-]{30,}$")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TelegramPairing:
|
||||
"""Pairing record returned by the Telegram onboarding service."""
|
||||
|
||||
pairing_id: str
|
||||
poll_token: str
|
||||
suggested_username: str
|
||||
deep_link: str
|
||||
qr_payload: str
|
||||
expires_at: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TelegramBotSetupResult:
|
||||
"""Successful Telegram onboarding result returned by the setup service."""
|
||||
|
||||
token: str
|
||||
bot_username: str | None = None
|
||||
owner_user_id: int | None = None
|
||||
|
||||
|
||||
def _api_url(api_url: str | None = None) -> str:
|
||||
"""Resolve the onboarding API URL, honoring the PoC env override."""
|
||||
return (
|
||||
api_url or os.environ.get(TELEGRAM_ONBOARDING_URL_ENV) or DEFAULT_API_URL
|
||||
).rstrip("/")
|
||||
|
||||
|
||||
def is_valid_telegram_bot_token(token: object) -> bool:
|
||||
"""Return True when *token* has Telegram's bot-token shape."""
|
||||
return isinstance(token, str) and bool(_TELEGRAM_BOT_TOKEN_RE.match(token))
|
||||
|
||||
|
||||
def _parse_owner_user_id(value: object) -> int | None:
|
||||
if isinstance(value, bool):
|
||||
return None
|
||||
if isinstance(value, int):
|
||||
return value if value > 0 else None
|
||||
if isinstance(value, str) and value.isdecimal():
|
||||
parsed = int(value)
|
||||
return parsed if parsed > 0 else None
|
||||
return None
|
||||
|
||||
|
||||
def render_qr_terminal(url: str) -> str:
|
||||
"""Render a URL as a QR code string suitable for terminal output."""
|
||||
try:
|
||||
import io
|
||||
|
||||
import qrcode # type: ignore[import-untyped]
|
||||
|
||||
qr = qrcode.QRCode(
|
||||
version=None,
|
||||
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
||||
box_size=1,
|
||||
border=1,
|
||||
)
|
||||
qr.add_data(url)
|
||||
qr.make(fit=True)
|
||||
|
||||
buf = io.StringIO()
|
||||
qr.print_ascii(out=buf, invert=True)
|
||||
return buf.getvalue()
|
||||
except ImportError:
|
||||
return ""
|
||||
|
||||
|
||||
def print_qr_code(url: str, *, include_link: bool = True) -> None:
|
||||
"""Print a QR code to stdout, with URL fallback if qrcode is missing."""
|
||||
qr_text = render_qr_terminal(url)
|
||||
if qr_text:
|
||||
print(qr_text)
|
||||
else:
|
||||
print(" (Install 'qrcode' for a scannable QR code: pip install qrcode)")
|
||||
if include_link:
|
||||
print(f" Link: {url}")
|
||||
|
||||
|
||||
def generate_username_slug(length: int = 16) -> str:
|
||||
"""Generate a base32-ish slug for Telegram username correlation.
|
||||
|
||||
Sixteen characters from a 32-symbol alphabet gives 80 bits of entropy while
|
||||
keeping ``hermes_<slug>_bot`` under Telegram's 32-character username limit.
|
||||
"""
|
||||
return "".join(secrets.choice(_USERNAME_SLUG_ALPHABET) for _ in range(length))
|
||||
|
||||
|
||||
def generate_bot_username(profile_name: Optional[str] = None) -> str:
|
||||
"""Generate a secure suggested bot username like ``hermes_<slug>_bot``.
|
||||
|
||||
``profile_name`` is accepted for backward compatibility with the original
|
||||
PoC, but is intentionally not embedded in the username. The username has to
|
||||
carry enough entropy for backend correlation.
|
||||
"""
|
||||
_ = profile_name
|
||||
return f"hermes_{generate_username_slug()}_bot"
|
||||
|
||||
|
||||
def generate_deep_link(
|
||||
manager_bot: str = DEFAULT_MANAGER_BOT,
|
||||
suggested_username: Optional[str] = None,
|
||||
suggested_name: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Build a ``t.me/newbot`` deep link for managed bot creation."""
|
||||
manager = manager_bot.lstrip("@")
|
||||
username = suggested_username or generate_bot_username()
|
||||
base_url = (
|
||||
"https://t.me/newbot/"
|
||||
f"{urllib.parse.quote(manager)}/"
|
||||
f"{urllib.parse.quote(username)}"
|
||||
)
|
||||
|
||||
if suggested_name:
|
||||
params = urllib.parse.urlencode({"name": suggested_name})
|
||||
return f"{base_url}?{params}"
|
||||
return base_url
|
||||
|
||||
|
||||
def generate_pairing_nonce() -> str:
|
||||
"""Generate a legacy-compatible random nonce string.
|
||||
|
||||
The new protocol uses service-created ``pairing_id`` + bearer
|
||||
``poll_token`` instead of a path nonce, but this helper is harmless and
|
||||
still useful for callers/tests that need a generic random id.
|
||||
"""
|
||||
return secrets.token_hex(16)
|
||||
|
||||
|
||||
def create_pairing(
|
||||
api_url: str | None = None,
|
||||
bot_name: str = DEFAULT_BOT_NAME,
|
||||
timeout: float = 10.0,
|
||||
) -> TelegramPairing | None:
|
||||
"""Create a Telegram onboarding pairing.
|
||||
|
||||
``POST /v1/telegram/pairings`` returns the deep link, QR payload, public
|
||||
pairing id, and secret poll token. The token is only used as a bearer
|
||||
credential while polling.
|
||||
"""
|
||||
try:
|
||||
resp = httpx.post(
|
||||
f"{_api_url(api_url)}/v1/telegram/pairings",
|
||||
json={"bot_name": bot_name},
|
||||
timeout=timeout,
|
||||
)
|
||||
if resp.status_code not in (200, 201):
|
||||
return None
|
||||
data = resp.json()
|
||||
except (httpx.HTTPError, ValueError):
|
||||
return None
|
||||
|
||||
required = ("pairing_id", "poll_token", "suggested_username", "deep_link")
|
||||
if not all(isinstance(data.get(key), str) and data.get(key) for key in required):
|
||||
return None
|
||||
|
||||
qr_payload = data.get("qr_payload") or data["deep_link"]
|
||||
if not isinstance(qr_payload, str):
|
||||
return None
|
||||
|
||||
expires_at = data.get("expires_at")
|
||||
return TelegramPairing(
|
||||
pairing_id=data["pairing_id"],
|
||||
poll_token=data["poll_token"],
|
||||
suggested_username=data["suggested_username"],
|
||||
deep_link=data["deep_link"],
|
||||
qr_payload=qr_payload,
|
||||
expires_at=expires_at if isinstance(expires_at, str) else None,
|
||||
)
|
||||
|
||||
|
||||
def poll_pairing_result_once(
|
||||
api_url: str | None,
|
||||
pairing: TelegramPairing,
|
||||
timeout: float = 10.0,
|
||||
) -> TelegramBotSetupResult | None:
|
||||
"""Poll the onboarding service once. Returns setup metadata when ready."""
|
||||
resp = httpx.get(
|
||||
f"{_api_url(api_url)}/v1/telegram/pairings/{pairing.pairing_id}",
|
||||
headers={"Authorization": f"Bearer {pairing.poll_token}"},
|
||||
timeout=timeout,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
|
||||
data = resp.json()
|
||||
if data.get("status") != "ready":
|
||||
return None
|
||||
token = data.get("token")
|
||||
if not is_valid_telegram_bot_token(token):
|
||||
return None
|
||||
|
||||
bot_username = data.get("bot_username")
|
||||
return TelegramBotSetupResult(
|
||||
token=token,
|
||||
bot_username=bot_username
|
||||
if isinstance(bot_username, str) and bot_username
|
||||
else None,
|
||||
owner_user_id=_parse_owner_user_id(data.get("owner_user_id")),
|
||||
)
|
||||
|
||||
|
||||
def poll_pairing_once(
|
||||
api_url: str | None,
|
||||
pairing: TelegramPairing,
|
||||
timeout: float = 10.0,
|
||||
) -> str | None:
|
||||
"""Poll the onboarding service once. Returns the token when ready."""
|
||||
result = poll_pairing_result_once(api_url, pairing, timeout=timeout)
|
||||
return result.token if result else None
|
||||
|
||||
|
||||
def poll_for_setup_result(
|
||||
api_url: str | None,
|
||||
pairing: TelegramPairing,
|
||||
timeout: float = DEFAULT_POLL_TIMEOUT,
|
||||
interval: float = POLL_INTERVAL,
|
||||
) -> Optional[TelegramBotSetupResult]:
|
||||
"""Poll the pairing API until setup metadata is available or timeout."""
|
||||
deadline = time.monotonic() + timeout
|
||||
while time.monotonic() < deadline:
|
||||
try:
|
||||
result = poll_pairing_result_once(api_url, pairing)
|
||||
if result:
|
||||
return result
|
||||
except (httpx.HTTPError, ValueError):
|
||||
pass
|
||||
time.sleep(interval)
|
||||
return None
|
||||
|
||||
|
||||
def poll_for_token(
|
||||
api_url: str | None,
|
||||
pairing: TelegramPairing,
|
||||
timeout: float = DEFAULT_POLL_TIMEOUT,
|
||||
interval: float = POLL_INTERVAL,
|
||||
) -> Optional[str]:
|
||||
"""Poll the pairing API until the bot token is available or timeout."""
|
||||
result = poll_for_setup_result(api_url, pairing, timeout=timeout, interval=interval)
|
||||
return result.token if result else None
|
||||
|
||||
|
||||
def auto_setup_telegram_bot_result(
|
||||
api_url: str | None = None,
|
||||
manager_bot: str = DEFAULT_MANAGER_BOT,
|
||||
profile_name: Optional[str] = None,
|
||||
poll_timeout: float = DEFAULT_POLL_TIMEOUT,
|
||||
) -> Optional[TelegramBotSetupResult]:
|
||||
"""Run the full automatic Telegram bot creation flow."""
|
||||
_ = manager_bot, profile_name
|
||||
resolved_api_url = _api_url(api_url)
|
||||
print()
|
||||
print(f" Contacting Hermes Telegram onboarding service: {resolved_api_url}")
|
||||
sys.stdout.flush()
|
||||
pairing = create_pairing(resolved_api_url)
|
||||
if not pairing:
|
||||
print(" ✗ Could not reach the Hermes Telegram onboarding service.")
|
||||
print(" Try the manual setup instead, or check your network.")
|
||||
return None
|
||||
|
||||
print(" ✓ Pairing created")
|
||||
print(" Rendering QR code...")
|
||||
sys.stdout.flush()
|
||||
print()
|
||||
print(" Scan this QR code with your phone, or open the link below:")
|
||||
print()
|
||||
print_qr_code(pairing.qr_payload, include_link=False)
|
||||
print()
|
||||
print(f" Link: {pairing.deep_link}")
|
||||
print()
|
||||
print(" When Telegram opens, tap 'Create Bot' to confirm.")
|
||||
print(" (You can edit the bot display name before confirming)")
|
||||
print()
|
||||
|
||||
spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
|
||||
start = time.monotonic()
|
||||
deadline = start + poll_timeout
|
||||
idx = 0
|
||||
|
||||
while time.monotonic() < deadline:
|
||||
char = spinner_chars[idx % len(spinner_chars)]
|
||||
elapsed = int(time.monotonic() - start)
|
||||
remaining = max(0, int(poll_timeout - elapsed))
|
||||
sys.stdout.write(
|
||||
f"\r {char} Waiting for bot creation... ({remaining}s remaining) "
|
||||
)
|
||||
sys.stdout.flush()
|
||||
idx += 1
|
||||
|
||||
try:
|
||||
result = poll_pairing_result_once(resolved_api_url, pairing)
|
||||
if result:
|
||||
sys.stdout.write(
|
||||
"\r ✓ Bot created successfully! \n"
|
||||
)
|
||||
sys.stdout.flush()
|
||||
return result
|
||||
except (httpx.HTTPError, ValueError):
|
||||
pass
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
sys.stdout.write("\r ✗ Timed out waiting for bot creation. \n")
|
||||
sys.stdout.flush()
|
||||
print(" The bot may still be created — check Telegram.")
|
||||
print(" You can paste the token manually below, or re-run setup.")
|
||||
return None
|
||||
|
||||
|
||||
def auto_setup_telegram_bot(
|
||||
api_url: str | None = None,
|
||||
manager_bot: str = DEFAULT_MANAGER_BOT,
|
||||
profile_name: Optional[str] = None,
|
||||
poll_timeout: float = DEFAULT_POLL_TIMEOUT,
|
||||
) -> Optional[str]:
|
||||
"""Run automatic Telegram bot creation and return only the bot token."""
|
||||
result = auto_setup_telegram_bot_result(
|
||||
api_url=api_url,
|
||||
manager_bot=manager_bot,
|
||||
profile_name=profile_name,
|
||||
poll_timeout=poll_timeout,
|
||||
)
|
||||
return result.token if result else None
|
||||
323
tests/hermes_cli/test_telegram_managed_bot.py
Normal file
323
tests/hermes_cli/test_telegram_managed_bot.py
Normal file
|
|
@ -0,0 +1,323 @@
|
|||
"""Tests for hermes_cli.telegram_managed_bot — QR codes, deep links, pairing."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from hermes_cli.telegram_managed_bot import (
|
||||
DEFAULT_MANAGER_BOT,
|
||||
TELEGRAM_ONBOARDING_URL_ENV,
|
||||
TelegramBotSetupResult,
|
||||
TelegramPairing,
|
||||
create_pairing,
|
||||
generate_bot_username,
|
||||
generate_deep_link,
|
||||
generate_pairing_nonce,
|
||||
poll_for_setup_result,
|
||||
poll_for_token,
|
||||
print_qr_code,
|
||||
render_qr_terminal,
|
||||
)
|
||||
|
||||
|
||||
VALID_TOKEN = "123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZabcdef"
|
||||
SECOND_VALID_TOKEN = "987654321:abcdefghijklmnopqrstuvwxyzABCDEF"
|
||||
|
||||
|
||||
class TestGenerateBotUsername:
|
||||
def test_secure_default_format(self):
|
||||
name = generate_bot_username()
|
||||
assert name.startswith("hermes_")
|
||||
assert name.endswith("_bot")
|
||||
assert len(name) == len("hermes_") + 16 + len("_bot")
|
||||
assert len(name) <= 32
|
||||
|
||||
def test_profile_name_not_embedded(self):
|
||||
name = generate_bot_username("work")
|
||||
assert "work" not in name
|
||||
assert name.startswith("hermes_")
|
||||
assert name.endswith("_bot")
|
||||
|
||||
def test_slug_uses_telegram_safe_base32_chars(self):
|
||||
name = generate_bot_username()
|
||||
slug = name.removeprefix("hermes_").removesuffix("_bot")
|
||||
assert len(slug) == 16
|
||||
assert set(slug) <= set("abcdefghijklmnopqrstuvwxyz234567")
|
||||
|
||||
def test_uniqueness(self):
|
||||
names = {generate_bot_username() for _ in range(20)}
|
||||
assert len(names) == 20
|
||||
|
||||
|
||||
class TestGenerateDeepLink:
|
||||
def test_basic_format(self):
|
||||
link = generate_deep_link(
|
||||
manager_bot="TestBot",
|
||||
suggested_username="my_bot",
|
||||
)
|
||||
assert link == "https://t.me/newbot/TestBot/my_bot"
|
||||
|
||||
def test_with_name(self):
|
||||
link = generate_deep_link(
|
||||
manager_bot="@TestBot",
|
||||
suggested_username="my_bot",
|
||||
suggested_name="My Agent",
|
||||
)
|
||||
assert "https://t.me/newbot/TestBot/my_bot?" in link
|
||||
assert "name=My+Agent" in link
|
||||
|
||||
def test_defaults(self):
|
||||
link = generate_deep_link()
|
||||
assert f"https://t.me/newbot/{DEFAULT_MANAGER_BOT}/" in link
|
||||
assert "hermes_" in link
|
||||
|
||||
def test_name_url_encoded(self):
|
||||
link = generate_deep_link(
|
||||
manager_bot="Bot",
|
||||
suggested_username="test_bot",
|
||||
suggested_name="Hermes & Friends",
|
||||
)
|
||||
assert "Hermes+%26+Friends" in link
|
||||
|
||||
|
||||
class TestPairingNonce:
|
||||
def test_length(self):
|
||||
nonce = generate_pairing_nonce()
|
||||
assert len(nonce) == 32
|
||||
|
||||
def test_hex_chars(self):
|
||||
nonce = generate_pairing_nonce()
|
||||
assert all(c in "0123456789abcdef" for c in nonce)
|
||||
|
||||
def test_uniqueness(self):
|
||||
nonces = {generate_pairing_nonce() for _ in range(100)}
|
||||
assert len(nonces) == 100
|
||||
|
||||
|
||||
class TestQRCode:
|
||||
def test_render_returns_string(self):
|
||||
result = render_qr_terminal("https://example.com")
|
||||
if result:
|
||||
assert isinstance(result, str)
|
||||
assert len(result) > 10
|
||||
|
||||
def test_render_graceful_without_qrcode(self):
|
||||
with patch.dict("sys.modules", {"qrcode": None}):
|
||||
render_qr_terminal("https://example.com")
|
||||
|
||||
def test_print_qr_code_with_url(self, capsys):
|
||||
print_qr_code("https://t.me/newbot/Bot/test_bot")
|
||||
captured = capsys.readouterr()
|
||||
assert "https://t.me/newbot/Bot/test_bot" in captured.out
|
||||
|
||||
|
||||
class TestCreatePairing:
|
||||
def test_success(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 201
|
||||
mock_resp.json.return_value = {
|
||||
"pairing_id": "abcdefghijklmnop",
|
||||
"poll_token": "secret-token",
|
||||
"suggested_username": "hermes_abcdefghijklmnop_bot",
|
||||
"deep_link": "https://t.me/newbot/HermesSetupBot/hermes_abcdefghijklmnop_bot?name=Hermes+Agent",
|
||||
"qr_payload": "https://t.me/newbot/HermesSetupBot/hermes_abcdefghijklmnop_bot?name=Hermes+Agent",
|
||||
"expires_at": "2026-05-18T00:00:00.000Z",
|
||||
}
|
||||
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.httpx.post", return_value=mock_resp
|
||||
) as post:
|
||||
pairing = create_pairing("https://api.example.com", bot_name="Hermes Agent")
|
||||
|
||||
assert pairing == TelegramPairing(
|
||||
pairing_id="abcdefghijklmnop",
|
||||
poll_token="secret-token",
|
||||
suggested_username="hermes_abcdefghijklmnop_bot",
|
||||
deep_link="https://t.me/newbot/HermesSetupBot/hermes_abcdefghijklmnop_bot?name=Hermes+Agent",
|
||||
qr_payload="https://t.me/newbot/HermesSetupBot/hermes_abcdefghijklmnop_bot?name=Hermes+Agent",
|
||||
expires_at="2026-05-18T00:00:00.000Z",
|
||||
)
|
||||
post.assert_called_once_with(
|
||||
"https://api.example.com/v1/telegram/pairings",
|
||||
json={"bot_name": "Hermes Agent"},
|
||||
timeout=10.0,
|
||||
)
|
||||
|
||||
def test_failure_status(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 500
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.httpx.post", return_value=mock_resp
|
||||
):
|
||||
assert create_pairing("https://api.example.com") is None
|
||||
|
||||
def test_invalid_payload(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 201
|
||||
mock_resp.json.return_value = {"pairing_id": "missing-poll-token"}
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.httpx.post", return_value=mock_resp
|
||||
):
|
||||
assert create_pairing("https://api.example.com") is None
|
||||
|
||||
def test_uses_env_override(self, monkeypatch):
|
||||
monkeypatch.setenv(TELEGRAM_ONBOARDING_URL_ENV, "https://worker.example")
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 500
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.httpx.post", return_value=mock_resp
|
||||
) as post:
|
||||
create_pairing()
|
||||
assert post.call_args.args[0] == "https://worker.example/v1/telegram/pairings"
|
||||
|
||||
|
||||
class TestPollForToken:
|
||||
def pairing(self):
|
||||
return TelegramPairing(
|
||||
pairing_id="abcdefghijklmnop",
|
||||
poll_token="secret-token",
|
||||
suggested_username="hermes_abcdefghijklmnop_bot",
|
||||
deep_link="https://t.me/newbot/HermesSetupBot/hermes_abcdefghijklmnop_bot",
|
||||
qr_payload="https://t.me/newbot/HermesSetupBot/hermes_abcdefghijklmnop_bot",
|
||||
)
|
||||
|
||||
def test_immediate_success(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {
|
||||
"bot_username": "hermes_abcdefghijklmnop_bot",
|
||||
"owner_user_id": 42,
|
||||
"status": "ready",
|
||||
"token": VALID_TOKEN,
|
||||
}
|
||||
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.httpx.get", return_value=mock_resp
|
||||
) as get:
|
||||
with patch("hermes_cli.telegram_managed_bot.time.sleep"):
|
||||
token = poll_for_token(
|
||||
"https://api.example.com", self.pairing(), timeout=5
|
||||
)
|
||||
|
||||
assert token == VALID_TOKEN
|
||||
assert (
|
||||
get.call_args.args[0]
|
||||
== "https://api.example.com/v1/telegram/pairings/abcdefghijklmnop"
|
||||
)
|
||||
assert get.call_args.kwargs["headers"] == {
|
||||
"Authorization": "Bearer secret-token"
|
||||
}
|
||||
|
||||
def test_setup_result_includes_owner_user_id(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {
|
||||
"bot_username": "hermes_abcdefghijklmnop_bot",
|
||||
"owner_user_id": 42,
|
||||
"status": "ready",
|
||||
"token": VALID_TOKEN,
|
||||
}
|
||||
|
||||
with patch("hermes_cli.telegram_managed_bot.httpx.get", return_value=mock_resp):
|
||||
with patch("hermes_cli.telegram_managed_bot.time.sleep"):
|
||||
result = poll_for_setup_result(
|
||||
"https://api.example.com", self.pairing(), timeout=5
|
||||
)
|
||||
|
||||
assert result == TelegramBotSetupResult(
|
||||
token=VALID_TOKEN,
|
||||
bot_username="hermes_abcdefghijklmnop_bot",
|
||||
owner_user_id=42,
|
||||
)
|
||||
|
||||
def test_setup_result_accepts_string_owner_user_id(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {
|
||||
"bot_username": "hermes_abcdefghijklmnop_bot",
|
||||
"owner_user_id": "42",
|
||||
"status": "ready",
|
||||
"token": VALID_TOKEN,
|
||||
}
|
||||
|
||||
with patch("hermes_cli.telegram_managed_bot.httpx.get", return_value=mock_resp):
|
||||
result = poll_for_setup_result(
|
||||
"https://api.example.com", self.pairing(), timeout=5
|
||||
)
|
||||
|
||||
assert result == TelegramBotSetupResult(
|
||||
token=VALID_TOKEN,
|
||||
bot_username="hermes_abcdefghijklmnop_bot",
|
||||
owner_user_id=42,
|
||||
)
|
||||
|
||||
def test_invalid_ready_token_returns_none(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {
|
||||
"bot_username": "hermes_abcdefghijklmnop_bot",
|
||||
"owner_user_id": 42,
|
||||
"status": "ready",
|
||||
"token": "not-a-real-token",
|
||||
}
|
||||
|
||||
with patch("hermes_cli.telegram_managed_bot.httpx.get", return_value=mock_resp):
|
||||
with patch("hermes_cli.telegram_managed_bot.time.sleep"):
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.time.monotonic"
|
||||
) as mock_time:
|
||||
mock_time.side_effect = [0, 0, 999]
|
||||
assert (
|
||||
poll_for_token(
|
||||
"https://api.example.com", self.pairing(), timeout=1
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
def test_timeout_returns_none(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {"status": "waiting"}
|
||||
|
||||
with patch("hermes_cli.telegram_managed_bot.httpx.get", return_value=mock_resp):
|
||||
with patch("hermes_cli.telegram_managed_bot.time.sleep"):
|
||||
with patch(
|
||||
"hermes_cli.telegram_managed_bot.time.monotonic"
|
||||
) as mock_time:
|
||||
mock_time.side_effect = [0, 0, 999]
|
||||
token = poll_for_token(
|
||||
"https://api.example.com", self.pairing(), timeout=1
|
||||
)
|
||||
assert token is None
|
||||
|
||||
def test_eventual_success(self):
|
||||
not_ready = MagicMock()
|
||||
not_ready.status_code = 200
|
||||
not_ready.json.return_value = {"status": "waiting"}
|
||||
|
||||
ready = MagicMock()
|
||||
ready.status_code = 200
|
||||
ready.json.return_value = {"status": "ready", "token": SECOND_VALID_TOKEN}
|
||||
|
||||
call_count = 0
|
||||
|
||||
def fake_get(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count < 3:
|
||||
return not_ready
|
||||
return ready
|
||||
|
||||
with patch("hermes_cli.telegram_managed_bot.httpx.get", side_effect=fake_get):
|
||||
with patch("hermes_cli.telegram_managed_bot.time.sleep"):
|
||||
token = poll_for_token(
|
||||
"https://api.example.com", self.pairing(), timeout=30
|
||||
)
|
||||
assert token == SECOND_VALID_TOKEN
|
||||
|
||||
|
||||
class TestSetupTelegramAuto:
|
||||
def test_setup_helper_exists(self):
|
||||
from hermes_cli.setup import _setup_telegram_auto
|
||||
|
||||
assert callable(_setup_telegram_auto)
|
||||
Loading…
Add table
Add a link
Reference in a new issue