feat(gateway): wire clarify tool with inline keyboard buttons on Telegram (#24199)

The clarify tool returned 'not available in this execution context' for
every gateway-mode agent because gateway/run.py never passed
clarify_callback into the AIAgent constructor. Schema actively encouraged
calling it; users never saw the question.

Changes:

- tools/clarify_gateway.py — new event-based primitive mirroring
  tools/approval.py: register/wait_for_response/resolve_gateway_clarify
  with per-session FIFO, threading.Event blocking with 1s heartbeat
  slices (so the inactivity watchdog keeps ticking), and
  clear_session for boundary cleanup.

- gateway/platforms/base.py — abstract send_clarify with a numbered-text
  fallback so every adapter (Discord, Slack, WhatsApp, Signal, Matrix,
  etc.) gets a working clarify out of the box. Plus an active-session
  bypass: when the agent is blocked on a text-awaiting clarify, the next
  non-command message routes inline to the runner's intercept instead
  of being queued + triggering an interrupt. Same shape as the /approve
  deadlock fix from PR #4926.

- gateway/platforms/telegram.py — concrete send_clarify renders one
  inline button per choice plus '✏️ Other (type answer)'. cl: callback
  handler resolves numeric choices immediately, flips to text-capture
  mode for Other, with the same authorization guards as exec/slash
  approvals.

- gateway/run.py — clarify_callback wired at the cached-agent per-turn
  callback assignment site (only the user-facing agent path; cron and
  hygiene-compress agents have no human attached). Bridges sync→async
  via run_coroutine_threadsafe, blocks with the configured timeout, and
  returns a '[user did not respond within Xm]' sentinel on timeout so
  the agent adapts rather than pinning the running-agent guard. Text-
  intercept added to _handle_message before slash-confirm intercept
  (skipping slash commands). clear_session called in the run's finally
  to cancel any orphan entries.

- hermes_cli/config.py — agent.clarify_timeout default 600s.

- website/docs/user-guide/messaging/telegram.md — Interactive Prompts
  section.

Tests:

- tests/tools/test_clarify_gateway.py (14 tests) — full primitive
  coverage: button resolve, open-ended auto-await, Other flip, timeout
  None, unknown-id idempotency, clear_session cancellation, FIFO
  ordering, register/unregister notify, config default.

- tests/gateway/test_telegram_clarify_buttons.py (12 tests) — render
  paths (multi-choice/open-ended/long-label/HTML-escape/not-connected),
  callback dispatch (numeric resolve/Other flip/already-resolved/
  unauthorized/invalid-token), and base-adapter text fallback.

Out of scope: bot-to-bot, guest mode, checklists, poll media, live
photos. Closes #24191.
This commit is contained in:
Teknium 2026-05-12 16:33:33 -07:00 committed by GitHub
parent 76bbb94be4
commit 29d7c244c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1347 additions and 0 deletions

View file

@ -1743,6 +1743,55 @@ class BasePlatformAdapter(ABC):
""" """
return SendResult(success=False, error="Not supported") return SendResult(success=False, error="Not supported")
async def send_clarify(
self,
chat_id: str,
question: str,
choices: Optional[list],
clarify_id: str,
session_key: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a clarify prompt to the user.
Two render modes:
* **Multiple choice** (``choices`` is a non-empty list) adapters
that override this should render inline buttons (one per choice
plus a final "Other" / free-text option). Button callbacks
MUST resolve via
``tools.clarify_gateway.resolve_gateway_clarify(clarify_id, response)``
with the chosen string. Picking the "Other" button calls
``mark_awaiting_text(clarify_id)`` so the next message in the
session is captured as the response.
* **Open-ended** (``choices`` is None or empty) render the
question as a plain text message; the next user message in the
session is captured by the gateway's text-intercept and
resolves the clarify automatically (see
``GatewayRunner._maybe_intercept_clarify_text``).
The default implementation falls back to a numbered text list,
which works on every platform the user replies with a number
("2") or with the literal choice text, and the gateway intercepts
and resolves. Adapters with native button UIs (Telegram, Discord)
SHOULD override this for a richer UX.
"""
if choices:
lines = [f"{question}", ""]
for i, choice in enumerate(choices, start=1):
lines.append(f" {i}. {choice}")
lines.append("")
lines.append("Reply with the number, the option text, or your own answer.")
text = "\n".join(lines)
else:
text = f"{question}"
return await self.send(
chat_id=chat_id,
content=text,
metadata=metadata,
)
async def send_private_notice( async def send_private_notice(
self, self,
chat_id: str, chat_id: str,
@ -2831,6 +2880,58 @@ class BasePlatformAdapter(ABC):
logger.error("[%s] Command '/%s' dispatch failed: %s", self.name, cmd, e, exc_info=True) logger.error("[%s] Command '/%s' dispatch failed: %s", self.name, cmd, e, exc_info=True)
return return
# Clarify text-capture bypass: if the agent is blocked on a
# clarify_tool call awaiting a free-form text response (open-
# ended clarify, or user picked "Other"), the next non-command
# message in this session MUST reach the runner so the
# clarify-intercept can resolve it and unblock the agent.
#
# Without this bypass: the message gets queued in
# _pending_messages AND triggers an interrupt, killing the
# agent run mid-clarify and discarding the user's answer.
# Same shape as the /approve deadlock fix (PR #4926) — both
# cases are "agent thread blocked on Event.wait, message must
# reach the resolver before being treated as a new turn."
if not cmd:
try:
from tools import clarify_gateway as _clarify_mod
_has_text_clarify = (
_clarify_mod.get_pending_for_session(session_key) is not None
)
except Exception:
_has_text_clarify = False
if _has_text_clarify:
logger.debug(
"[%s] Routing message to clarify text-intercept for %s",
self.name, session_key,
)
try:
_thread_meta = _thread_metadata_for_source(
event.source, _reply_anchor_for_event(event)
)
response = await self._message_handler(event)
_text, _eph_ttl = self._unwrap_ephemeral(response)
if _text:
_r = await self._send_with_retry(
chat_id=event.source.chat_id,
content=_text,
reply_to=_reply_anchor_for_event(event),
metadata=_thread_meta,
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
chat_id=event.source.chat_id,
message_id=_r.message_id,
ttl_seconds=_eph_ttl,
)
except Exception as e:
logger.error(
"[%s] Clarify text-intercept dispatch failed: %s",
self.name, e, exc_info=True,
)
return
if self._busy_session_handler is not None: if self._busy_session_handler is not None:
try: try:
if await self._busy_session_handler(event, session_key): if await self._busy_session_handler(event, session_key):

View file

@ -427,6 +427,9 @@ class TelegramAdapter(BasePlatformAdapter):
# Slash-confirm button state: confirm_id → session_key (for /reload-mcp # Slash-confirm button state: confirm_id → session_key (for /reload-mcp
# and any other slash-confirm prompts; see GatewayRunner._request_slash_confirm). # and any other slash-confirm prompts; see GatewayRunner._request_slash_confirm).
self._slash_confirm_state: Dict[str, str] = {} self._slash_confirm_state: Dict[str, str] = {}
# Clarify button state: clarify_id → session_key (for the clarify tool's
# multiple-choice prompts; see GatewayRunner clarify_callback wiring).
self._clarify_state: Dict[str, str] = {}
# Notification mode for message sends. # Notification mode for message sends.
# "important" — only final responses, approvals, and slash confirmations # "important" — only final responses, approvals, and slash confirmations
# trigger notifications; tool progress, streaming, status # trigger notifications; tool progress, streaming, status
@ -2215,6 +2218,80 @@ class TelegramAdapter(BasePlatformAdapter):
logger.warning("[%s] send_slash_confirm failed: %s", self.name, e) logger.warning("[%s] send_slash_confirm failed: %s", self.name, e)
return SendResult(success=False, error=str(e)) return SendResult(success=False, error=str(e))
async def send_clarify(
self,
chat_id: str,
question: str,
choices: Optional[list],
clarify_id: str,
session_key: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Render a clarify prompt with one inline button per choice.
Multi-choice mode (``choices`` non-empty): renders one button per
option plus a final "✏️ Other (type answer)" button. Picking the
"Other" button flips the entry into text-capture mode so the next
message becomes the response.
Open-ended mode (``choices`` empty): renders the question as plain
text no buttons. The next message in the session is captured by
the gateway's text-intercept and resolves the clarify.
"""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
text = f"{_html.escape(question)}"
thread_id = self._metadata_thread_id(metadata)
kwargs: Dict[str, Any] = {
"chat_id": int(chat_id),
"text": text,
"parse_mode": ParseMode.HTML,
**self._link_preview_kwargs(),
}
if choices:
# Telegram caps callback_data at 64 bytes; keep "cl:<id>:<idx>"
# short. Button label is also capped (~64 chars in practice).
rows = []
for idx, choice in enumerate(choices):
label = str(choice)
if len(label) > 60:
label = label[:57] + "..."
rows.append([
InlineKeyboardButton(
f"{idx + 1}. {label}",
callback_data=f"cl:{clarify_id}:{idx}",
)
])
rows.append([
InlineKeyboardButton(
"✏️ Other (type answer)",
callback_data=f"cl:{clarify_id}:other",
)
])
kwargs["reply_markup"] = InlineKeyboardMarkup(rows)
reply_to_id = self._reply_to_message_id_for_send(None, metadata)
kwargs["reply_to_message_id"] = reply_to_id
kwargs.update(
self._thread_kwargs_for_send(
chat_id,
thread_id,
metadata,
reply_to_message_id=reply_to_id,
)
)
msg = await self._send_message_with_thread_fallback(**kwargs)
self._clarify_state[clarify_id] = session_key
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
logger.warning("[%s] send_clarify failed: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def send_model_picker( async def send_model_picker(
self, self,
chat_id: str, chat_id: str,
@ -2700,6 +2777,111 @@ class TelegramAdapter(BasePlatformAdapter):
logger.error("[%s] slash-confirm callback failed: %s", self.name, exc, exc_info=True) logger.error("[%s] slash-confirm callback failed: %s", self.name, exc, exc_info=True)
return return
# --- Clarify callbacks (cl:clarify_id:idx | cl:clarify_id:other) ---
if data.startswith("cl:"):
parts = data.split(":", 2)
if len(parts) == 3:
clarify_id = parts[1]
choice_token = parts[2]
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(
caller_id,
chat_id=query_chat_id,
chat_type=str(query_chat_type) if query_chat_type is not None else None,
thread_id=str(query_thread_id) if query_thread_id is not None else None,
user_name=query_user_name,
):
await query.answer(text="⛔ You are not authorized to answer this prompt.")
return
session_key = self._clarify_state.get(clarify_id)
if not session_key:
await query.answer(text="This prompt has already been resolved.")
return
user_display = getattr(query.from_user, "first_name", "User")
if choice_token == "other":
# Flip into text-capture mode and tell the user to type
# their answer. The gateway's text-intercept will pick
# up the next message in this session and resolve the
# clarify. Do NOT pop _clarify_state yet — we still
# need it if the user is slow to respond and the entry
# is cleared by something else.
try:
from tools.clarify_gateway import mark_awaiting_text
mark_awaiting_text(clarify_id)
except Exception as exc:
logger.warning("[%s] mark_awaiting_text failed: %s", self.name, exc)
await query.answer(text="✏️ Type your answer in the chat.")
try:
await query.edit_message_text(
text=f"{query.message.text or ''}\n\n<i>Awaiting typed response from {_html.escape(user_display)}…</i>",
parse_mode=ParseMode.HTML,
reply_markup=None,
)
except Exception:
pass
return
# Numeric choice → resolve immediately with the chosen text
try:
idx = int(choice_token)
except (ValueError, TypeError):
await query.answer(text="Invalid choice.")
return
# Look up the choice text from the entry registered in the
# clarify primitive. Fall back to the index if the entry
# has been cleaned up (race with timeout / session reset).
resolved_text: Optional[str] = None
try:
from tools.clarify_gateway import _entries as _clarify_entries # type: ignore
entry = _clarify_entries.get(clarify_id)
if entry and entry.choices and 0 <= idx < len(entry.choices):
resolved_text = entry.choices[idx]
except Exception:
resolved_text = None
if resolved_text is None:
# Race: entry vanished. Echo the index as a number so
# the agent at least sees an intentional response
# rather than nothing.
resolved_text = f"choice {idx + 1}"
# Pop state and resolve
self._clarify_state.pop(clarify_id, None)
try:
from tools.clarify_gateway import resolve_gateway_clarify
resolved = resolve_gateway_clarify(clarify_id, resolved_text)
except Exception as exc:
logger.error("[%s] resolve_gateway_clarify failed: %s", self.name, exc)
resolved = False
await query.answer(text=f"{resolved_text[:60]}")
try:
await query.edit_message_text(
text=f"{_html.escape(query.message.text or '')}\n\n<b>{_html.escape(user_display)}:</b> {_html.escape(resolved_text)}",
parse_mode=ParseMode.HTML,
reply_markup=None,
)
except Exception:
pass
if resolved:
logger.info(
"Telegram clarify button resolved (id=%s, choice=%r, user=%s)",
clarify_id, resolved_text, user_display,
)
else:
logger.warning(
"Telegram clarify button: resolve_gateway_clarify returned False (id=%s)",
clarify_id,
)
return
# --- Update prompt callbacks --- # --- Update prompt callbacks ---
if not data.startswith("update_prompt:"): if not data.startswith("update_prompt:"):
return return

View file

@ -5828,6 +5828,37 @@ class GatewayRunner:
) )
_update_prompts.pop(_quick_key, None) _update_prompts.pop(_quick_key, None)
# Intercept messages that are responses to a pending clarify
# request that is awaiting free-form text (either an open-ended
# clarify with no choices, or one where the user picked the
# "Other" button). The first non-empty user message in the
# session resolves the clarify and unblocks the agent thread —
# we do NOT route it to the agent as a new turn.
try:
from tools import clarify_gateway as _clarify_mod
_pending_clarify = _clarify_mod.get_pending_for_session(_quick_key)
except Exception:
_pending_clarify = None
if _pending_clarify is not None:
_raw_clarify_reply = (event.text or "").strip()
# Skip slash commands — the user clearly wanted to issue a
# command, not answer the clarify. Leave the clarify pending
# so the user can retry; if it times out, the agent unblocks
# with an empty response.
if _raw_clarify_reply and not _raw_clarify_reply.startswith("/"):
_resolved = _clarify_mod.resolve_gateway_clarify(
_pending_clarify.clarify_id, _raw_clarify_reply,
)
if _resolved:
logger.info(
"Gateway intercepted clarify text response (session=%s, id=%s)",
_quick_key, _pending_clarify.clarify_id,
)
# Acknowledge with empty string so adapters that emit
# the agent's response don't double-post. The agent
# itself will produce the next user-facing message.
return ""
# Intercept messages that are responses to a pending /reload-mcp # Intercept messages that are responses to a pending /reload-mcp
# (or future) slash-confirm prompt. Recognized confirm replies are # (or future) slash-confirm prompt. Recognized confirm replies are
# /approve, /always, /cancel (plus short aliases). Anything else # /approve, /always, /cancel (plus short aliases). Anything else
@ -14957,6 +14988,76 @@ class GatewayRunner:
if _pdc is not None: if _pdc is not None:
_pdc[session_key] = _release_bg_review_messages _pdc[session_key] = _release_bg_review_messages
# ------------------------------------------------------------------
# Clarify callback: present a clarify prompt and block on a response.
#
# Runs on the agent's worker thread (see clarify_tool's synchronous
# callback contract). Bridges sync→async by scheduling the
# adapter's send_clarify on the gateway event loop, then blocks on
# the clarify primitive's threading.Event with a configurable
# timeout. Returns the user's response string, or a sentinel
# explaining that no response arrived (so the agent can adapt
# rather than hang forever).
# ------------------------------------------------------------------
def _clarify_callback_sync(question: str, choices) -> str:
from tools import clarify_gateway as _clarify_mod
import uuid as _uuid
if not _status_adapter:
return ""
clarify_id = _uuid.uuid4().hex[:10]
_clarify_mod.register(
clarify_id=clarify_id,
session_key=session_key or "",
question=question,
choices=list(choices) if choices else None,
)
# Pause typing — like approval, we don't want a "thinking..."
# status to obscure the prompt or block the user from typing
# an "Other" response on platforms that disable input while
# typing is active (Slack Assistant API).
try:
_status_adapter.pause_typing_for_chat(_status_chat_id)
except Exception:
pass
send_ok = False
try:
fut = asyncio.run_coroutine_threadsafe(
_status_adapter.send_clarify(
chat_id=_status_chat_id,
question=question,
choices=list(choices) if choices else None,
clarify_id=clarify_id,
session_key=session_key or "",
metadata=_status_thread_metadata,
),
_loop_for_step,
)
result = fut.result(timeout=15)
send_ok = bool(getattr(result, "success", False))
except Exception as exc:
logger.warning("Clarify send failed: %s", exc)
send_ok = False
if not send_ok:
# Couldn't deliver the prompt — clean up and return
# sentinel so the agent can fall back to a sensible
# default rather than hanging.
_clarify_mod.clear_session(session_key or "")
return "[clarify prompt could not be delivered]"
timeout = _clarify_mod.get_clarify_timeout()
response = _clarify_mod.wait_for_response(clarify_id, timeout=float(timeout))
if response is None or response == "":
# Timeout or session-boundary cancellation
return f"[user did not respond within {int(timeout / 60)}m]"
return response
agent.clarify_callback = _clarify_callback_sync
# Store agent reference for interrupt support # Store agent reference for interrupt support
agent_holder[0] = agent agent_holder[0] = agent
# Capture the full tool definitions for transcript logging # Capture the full tool definitions for transcript logging
@ -15228,6 +15329,14 @@ class GatewayRunner:
result = agent.run_conversation(_run_message, conversation_history=agent_history, task_id=session_id) result = agent.run_conversation(_run_message, conversation_history=agent_history, task_id=session_id)
finally: finally:
unregister_gateway_notify(_approval_session_key) unregister_gateway_notify(_approval_session_key)
# Cancel any pending clarify entries so blocked agent
# threads don't hang past the end of the run (interrupt,
# completion, gateway shutdown). Idempotent.
try:
from tools.clarify_gateway import clear_session as _clear_clarify_session
_clear_clarify_session(_approval_session_key)
except Exception:
pass
reset_current_session_key(_approval_session_token) reset_current_session_key(_approval_session_token)
result_holder[0] = result result_holder[0] = result

View file

@ -477,6 +477,12 @@ DEFAULT_CONFIG = {
# threshold before escalating to a full timeout. The warning fires # threshold before escalating to a full timeout. The warning fires
# once per run and does not interrupt the agent. 0 = disable warning. # once per run and does not interrupt the agent. 0 = disable warning.
"gateway_timeout_warning": 900, "gateway_timeout_warning": 900,
# Maximum time (seconds) the gateway will block an agent waiting for
# a clarify-tool response from the user. Hit this and the agent
# unblocks with "[user did not respond within Xm]" so it can adapt
# rather than pinning the running-agent guard forever. CLI clarify
# blocks indefinitely (input() is synchronous) and ignores this.
"clarify_timeout": 600,
# Periodic "still working" notification interval (seconds). # Periodic "still working" notification interval (seconds).
# Sends a status message every N seconds so the user knows the # Sends a status message every N seconds so the user knows the
# agent hasn't died during long tasks. 0 = disable notifications. # agent hasn't died during long tasks. 0 = disable notifications.

View file

@ -0,0 +1,451 @@
"""Tests for Telegram inline keyboard clarify buttons.
Mirrors test_telegram_approval_buttons.py for the new ``send_clarify`` and
``cl:`` callback dispatch added in feat/clarify-gateway-buttons.
"""
import asyncio
import os
import sys
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Ensure the repo root is importable
# ---------------------------------------------------------------------------
_repo = str(Path(__file__).resolve().parents[2])
if _repo not in sys.path:
sys.path.insert(0, _repo)
# ---------------------------------------------------------------------------
# Minimal Telegram mock so TelegramAdapter can be imported (mirrors
# test_telegram_approval_buttons.py)
# ---------------------------------------------------------------------------
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
mod.constants.ParseMode.MARKDOWN = "Markdown"
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
mod.constants.ParseMode.HTML = "HTML"
mod.constants.ChatType.PRIVATE = "private"
mod.constants.ChatType.GROUP = "group"
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.error.NetworkError = type("NetworkError", (OSError,), {})
mod.error.TimedOut = type("TimedOut", (OSError,), {})
mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
sys.modules.setdefault("telegram.error", mod.error)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter
from gateway.config import Platform, PlatformConfig
def _make_adapter(extra=None):
config = PlatformConfig(enabled=True, token="test-token", extra=extra or {})
adapter = TelegramAdapter(config)
adapter._bot = AsyncMock()
adapter._app = MagicMock()
return adapter
def _clear_clarify_state():
from tools import clarify_gateway as cm
with cm._lock:
cm._entries.clear()
cm._session_index.clear()
cm._notify_cbs.clear()
# ===========================================================================
# send_clarify — render
# ===========================================================================
class TestTelegramSendClarify:
"""Verify the rendered prompt has buttons or none, and stores state."""
def setup_method(self):
_clear_clarify_state()
@pytest.mark.asyncio
async def test_multi_choice_renders_buttons_and_other(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 100
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
result = await adapter.send_clarify(
chat_id="12345",
question="Which option?",
choices=["alpha", "beta", "gamma"],
clarify_id="cid1",
session_key="sk1",
)
assert result.success is True
assert result.message_id == "100"
kwargs = adapter._bot.send_message.call_args[1]
assert kwargs["chat_id"] == 12345
assert "Which option?" in kwargs["text"]
# InlineKeyboardMarkup with N+1 buttons (3 choices + Other)
markup = kwargs["reply_markup"]
assert markup is not None
# Mocked InlineKeyboardMarkup — just verify it was constructed
# with rows. We check state instead of poking the mock structure.
assert "cid1" in adapter._clarify_state
assert adapter._clarify_state["cid1"] == "sk1"
@pytest.mark.asyncio
async def test_open_ended_no_keyboard(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 101
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
result = await adapter.send_clarify(
chat_id="12345",
question="What is your name?",
choices=None,
clarify_id="cid2",
session_key="sk2",
)
assert result.success is True
kwargs = adapter._bot.send_message.call_args[1]
# No reply_markup means no buttons — open-ended path
assert "reply_markup" not in kwargs
assert "What is your name?" in kwargs["text"]
assert adapter._clarify_state["cid2"] == "sk2"
@pytest.mark.asyncio
async def test_not_connected(self):
adapter = _make_adapter()
adapter._bot = None
result = await adapter.send_clarify(
chat_id="12345",
question="?",
choices=["a"],
clarify_id="cid3",
session_key="sk3",
)
assert result.success is False
@pytest.mark.asyncio
async def test_truncates_long_choice_label(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 102
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
long_choice = "x" * 200 # > 60 char cap
result = await adapter.send_clarify(
chat_id="12345",
question="?",
choices=[long_choice],
clarify_id="cid4",
session_key="sk4",
)
assert result.success is True
# The truncation logic replaces with "..." past 57 chars; we don't
# inspect the mock's button labels directly (auto-MagicMock), but
# we can verify the call didn't raise on absurdly long input.
@pytest.mark.asyncio
async def test_html_escapes_question(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 103
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
await adapter.send_clarify(
chat_id="12345",
question="<script>alert(1)</script>",
choices=["x"],
clarify_id="cid5",
session_key="sk5",
)
kwargs = adapter._bot.send_message.call_args[1]
# Must NOT contain raw <script> — html.escape should have neutralized
assert "<script>" not in kwargs["text"]
assert "&lt;script&gt;" in kwargs["text"]
# ===========================================================================
# Callback dispatch — _handle_callback_query routing for cl:* prefixes
# ===========================================================================
class TestTelegramClarifyCallback:
"""Verify clicking a button resolves the clarify primitive."""
def setup_method(self):
_clear_clarify_state()
@pytest.mark.asyncio
async def test_numeric_choice_resolves_with_choice_text(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
# Pre-register a clarify entry so the callback can look up the choice text
cm.register("cidA", "sk-cb", "Pick", ["red", "green", "blue"])
adapter._clarify_state["cidA"] = "sk-cb"
query = AsyncMock()
query.data = "cl:cidA:1" # green
query.message = MagicMock()
query.message.chat_id = 12345
query.message.text = "Pick"
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
# State popped
assert "cidA" not in adapter._clarify_state
# Wait shouldn't be needed — resolve_gateway_clarify is sync.
# The entry's response should be set.
# We test by reading the entry's response directly.
with cm._lock:
entry = cm._entries.get("cidA")
# Entry might be popped by wait_for_response, but here we never
# called wait — so it's still in _entries with response set.
assert entry is not None
assert entry.response == "green"
assert entry.event.is_set()
query.answer.assert_called_once()
query.edit_message_text.assert_called_once()
@pytest.mark.asyncio
async def test_other_button_flips_to_text_mode(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
cm.register("cidB", "sk-cb-other", "Pick", ["x", "y"])
adapter._clarify_state["cidB"] = "sk-cb-other"
query = AsyncMock()
query.data = "cl:cidB:other"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.text = "Pick"
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
# Entry should now be in text-capture mode
pending = cm.get_pending_for_session("sk-cb-other")
assert pending is not None
assert pending.clarify_id == "cidB"
assert pending.awaiting_text is True
# State NOT popped — the user still needs to type their answer
assert "cidB" in adapter._clarify_state
# Entry NOT yet resolved
with cm._lock:
entry = cm._entries.get("cidB")
assert entry is not None
assert not entry.event.is_set()
@pytest.mark.asyncio
async def test_already_resolved(self):
adapter = _make_adapter()
# No state for cidGone
query = AsyncMock()
query.data = "cl:cidGone:0"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
query.answer.assert_called_once()
# Should NOT resolve anything
assert "already" in query.answer.call_args[1]["text"].lower()
@pytest.mark.asyncio
async def test_unauthorized_user_rejected(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
cm.register("cidC", "sk-auth", "Pick", ["a", "b"])
adapter._clarify_state["cidC"] = "sk-auth"
# Hook up a runner that says NOT authorized
class _DenyRunner:
async def _handle_message(self, event):
return None
def _is_user_authorized(self, source):
return False
adapter._message_handler = _DenyRunner()._handle_message
query = AsyncMock()
query.data = "cl:cidC:0"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.chat.type = "private"
query.message.text = "Pick"
query.from_user = MagicMock()
query.from_user.id = "999"
query.from_user.first_name = "Mallory"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
await adapter._handle_callback_query(update, context)
# Must not resolve, must answer with not-authorized message
with cm._lock:
entry = cm._entries.get("cidC")
assert entry is not None
assert not entry.event.is_set()
query.answer.assert_called_once()
assert "not authorized" in query.answer.call_args[1]["text"].lower()
# State preserved
assert adapter._clarify_state["cidC"] == "sk-auth"
@pytest.mark.asyncio
async def test_invalid_choice_token(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
cm.register("cidD", "sk-inv", "Q?", ["a"])
adapter._clarify_state["cidD"] = "sk-inv"
query = AsyncMock()
query.data = "cl:cidD:not-a-number"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.text = "Q?"
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
with cm._lock:
entry = cm._entries.get("cidD")
assert entry is not None
assert not entry.event.is_set()
query.answer.assert_called_once()
assert "invalid" in query.answer.call_args[1]["text"].lower()
# ===========================================================================
# Base adapter fallback render — text numbered list
# ===========================================================================
class TestBaseAdapterClarifyFallback:
"""Adapters without button overrides should render numbered text."""
@pytest.mark.asyncio
async def test_numbered_text_fallback(self):
from gateway.platforms.base import BasePlatformAdapter, SendResult
# Subclass just enough to instantiate
class _Stub(BasePlatformAdapter):
name = "stub"
def __init__(self):
# Skip base __init__ — we're not exercising it
self.sent: list = []
async def connect(self): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kw):
self.sent.append({"chat_id": chat_id, "content": content})
return SendResult(success=True, message_id="1")
async def edit(self, *a, **k): return SendResult(success=False)
async def get_history(self, *a, **k): return []
async def get_chat_info(self, *a, **k): return {}
adapter = _Stub()
result = await adapter.send_clarify(
chat_id="c",
question="Pick a fruit",
choices=["apple", "banana"],
clarify_id="x",
session_key="s",
)
assert result.success is True
assert len(adapter.sent) == 1
text = adapter.sent[0]["content"]
assert "Pick a fruit" in text
assert "1." in text and "apple" in text
assert "2." in text and "banana" in text
@pytest.mark.asyncio
async def test_open_ended_fallback_renders_question_only(self):
from gateway.platforms.base import BasePlatformAdapter, SendResult
class _Stub(BasePlatformAdapter):
name = "stub"
def __init__(self):
self.sent: list = []
async def connect(self): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kw):
self.sent.append(content)
return SendResult(success=True, message_id="1")
async def edit(self, *a, **k): return SendResult(success=False)
async def get_history(self, *a, **k): return []
async def get_chat_info(self, *a, **k): return {}
adapter = _Stub()
await adapter.send_clarify(
chat_id="c",
question="Free form?",
choices=None,
clarify_id="x",
session_key="s",
)
assert "Free form?" in adapter.sent[0]
# No numbered list — choices were empty
assert "1." not in adapter.sent[0]

View file

@ -0,0 +1,207 @@
"""Tests for the gateway-side clarify primitive (tools/clarify_gateway.py).
The clarify tool needs to ask the user a question and block the agent
thread until they respond. These tests cover the module-level state
machine: register, wait, resolve via button, resolve via text-fallback,
"Other"-button text-capture flip, timeout, session boundary cleanup.
"""
from __future__ import annotations
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import pytest
def _clear_clarify_state():
"""Reset module-level state between tests."""
from tools import clarify_gateway as cm
with cm._lock:
cm._entries.clear()
cm._session_index.clear()
cm._notify_cbs.clear()
class TestClarifyPrimitive:
"""Core register/wait/resolve mechanics."""
def setup_method(self):
_clear_clarify_state()
def test_button_choice_resolves_wait(self):
"""resolve_gateway_clarify unblocks wait_for_response with the chosen string."""
from tools import clarify_gateway as cm
cm.register("id1", "sk1", "Pick one", ["A", "B", "C"])
def resolver():
time.sleep(0.05)
cm.resolve_gateway_clarify("id1", "B")
threading.Thread(target=resolver).start()
result = cm.wait_for_response("id1", timeout=2.0)
assert result == "B"
def test_open_ended_auto_awaits_text(self):
"""Clarify with no choices is in text-capture mode immediately."""
from tools import clarify_gateway as cm
entry = cm.register("id2", "sk2", "Free form?", None)
assert entry.awaiting_text is True
# get_pending_for_session returns the entry so the gateway
# text-intercept can find it.
pending = cm.get_pending_for_session("sk2")
assert pending is not None
assert pending.clarify_id == "id2"
def test_button_choice_does_not_auto_await(self):
"""Multi-choice clarify should NOT be in text-capture mode initially."""
from tools import clarify_gateway as cm
entry = cm.register("id3", "sk3", "Pick", ["X", "Y"])
assert entry.awaiting_text is False
assert cm.get_pending_for_session("sk3") is None
def test_other_button_flips_to_text_mode(self):
"""mark_awaiting_text makes get_pending_for_session find the entry."""
from tools import clarify_gateway as cm
cm.register("id4", "sk4", "Pick", ["X", "Y"])
assert cm.get_pending_for_session("sk4") is None
flipped = cm.mark_awaiting_text("id4")
assert flipped is True
pending = cm.get_pending_for_session("sk4")
assert pending is not None
assert pending.clarify_id == "id4"
def test_mark_awaiting_text_unknown_id(self):
"""mark_awaiting_text on a non-existent id returns False."""
from tools import clarify_gateway as cm
assert cm.mark_awaiting_text("nope") is False
def test_timeout_returns_none(self):
"""wait_for_response returns None when no resolve fires within the timeout."""
from tools import clarify_gateway as cm
cm.register("id5", "sk5", "Q?", ["A"])
result = cm.wait_for_response("id5", timeout=0.2)
assert result is None
def test_resolve_unknown_id_returns_false(self):
"""resolve_gateway_clarify is idempotent on unknown ids."""
from tools import clarify_gateway as cm
assert cm.resolve_gateway_clarify("nope", "anything") is False
def test_resolve_after_wait_completes_is_noop(self):
"""A late resolve on a finished entry doesn't blow up."""
from tools import clarify_gateway as cm
cm.register("id6", "sk6", "Q?", ["A"])
# Time out, entry gets cleaned up
cm.wait_for_response("id6", timeout=0.1)
# Late button click — should not raise
result = cm.resolve_gateway_clarify("id6", "A")
assert result is False
def test_clear_session_cancels_pending_entries(self):
"""clear_session unblocks blocked threads with empty response."""
from tools import clarify_gateway as cm
cm.register("id7", "sk7", "Q?", ["A"])
def waiter():
return cm.wait_for_response("id7", timeout=10.0)
with ThreadPoolExecutor(1) as pool:
fut = pool.submit(waiter)
time.sleep(0.05)
cancelled = cm.clear_session("sk7")
assert cancelled == 1
result = fut.result(timeout=2.0)
# clear_session sets response="" then the wait returns it
assert result == ""
def test_has_pending(self):
from tools import clarify_gateway as cm
cm.register("id8", "sk8", "Q?", ["A"])
assert cm.has_pending("sk8") is True
assert cm.has_pending("nonexistent") is False
def test_notify_register_unregister_clears_pending(self):
"""unregister_notify cancels any pending clarify so threads unwind."""
from tools import clarify_gateway as cm
cm.register("id9", "sk9", "Q?", ["A"])
def waiter():
return cm.wait_for_response("id9", timeout=10.0)
with ThreadPoolExecutor(1) as pool:
fut = pool.submit(waiter)
time.sleep(0.05)
cm.register_notify("sk9", lambda entry: None)
cm.unregister_notify("sk9")
# unregister_notify calls clear_session; thread unwinds
result = fut.result(timeout=2.0)
assert result == ""
def test_session_index_isolation(self):
"""Entries from different sessions don't leak across get_pending lookups."""
from tools import clarify_gateway as cm
cm.register("idA", "alpha", "Q?", None) # auto-await text
cm.register("idB", "beta", "Q?", None) # auto-await text
a = cm.get_pending_for_session("alpha")
b = cm.get_pending_for_session("beta")
assert a is not None and a.clarify_id == "idA"
assert b is not None and b.clarify_id == "idB"
def test_clarify_timeout_config_default(self):
"""get_clarify_timeout returns 600 by default."""
from tools import clarify_gateway as cm
timeout = cm.get_clarify_timeout()
# Default 600s OR whatever is in the user's loaded config.
# Floor check: must be a positive int, not crashed.
assert isinstance(timeout, int)
assert timeout > 0
class TestGatewayTextIntercept:
"""The gateway's _handle_message intercepts text replies to pending clarifies."""
def setup_method(self):
_clear_clarify_state()
def test_get_pending_for_session_returns_oldest_text_awaiting(self):
"""When two clarifies are pending, get_pending_for_session returns the
first that is awaiting_text (the older one if both)."""
from tools import clarify_gateway as cm
# Older multi-choice (not awaiting text)
cm.register("first", "sk", "Q1?", ["A"])
# Newer open-ended (awaiting text)
cm.register("second", "sk", "Q2?", None)
pending = cm.get_pending_for_session("sk")
# The newer one is awaiting text; the older isn't.
assert pending is not None
assert pending.clarify_id == "second"
# Now flip the first to text mode too. Both are awaiting text,
# FIFO returns the older one.
cm.mark_awaiting_text("first")
pending2 = cm.get_pending_for_session("sk")
assert pending2 is not None
assert pending2.clarify_id == "first"

278
tools/clarify_gateway.py Normal file
View file

@ -0,0 +1,278 @@
"""Gateway-side clarify primitive (blocking event-based queue).
The ``clarify`` tool needs to ask the user a question and block the agent
thread until they respond. In CLI mode this is trivial ``input()`` is
synchronous. In gateway mode the agent runs on a worker thread while the
event loop handles the user's reply, so we need a thread-safe primitive
that:
* stores a pending clarify request (with a generated ``clarify_id``),
* blocks the agent thread on an ``Event``,
* resolves the wait when the gateway's button-callback or text-intercept
fires ``resolve_gateway_clarify(clarify_id, response)``,
* supports timeouts so a user who never responds does NOT hang the agent
thread forever (which would also pin the gateway's running-agent guard).
State is module-level (same shape as ``tools.approval``) so platform
adapters can call ``resolve_gateway_clarify`` without holding a back-
reference to the ``GatewayRunner`` instance.
Two delivery paths from the adapter:
1. **Button UI** adapters override ``send_clarify`` to render inline
buttons (e.g. Telegram ``InlineKeyboardMarkup``). The button
callback resolves with the chosen string. A final "Other (type
answer)" button enters text-capture mode for free-form responses.
2. **Text fallback** adapters without rich UI render a numbered list.
The user replies with a number ("2") or with free text; the gateway's
``_handle_message`` intercepts the reply and resolves directly.
"""
from __future__ import annotations
import logging
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Module-level state
# =========================================================================
@dataclass
class _ClarifyEntry:
"""One pending clarify request inside a gateway session."""
clarify_id: str
session_key: str
question: str
choices: Optional[List[str]]
event: threading.Event = field(default_factory=threading.Event)
response: Optional[str] = None
awaiting_text: bool = False # set when user picked "Other" or clarify is open-ended
def signature(self) -> Dict[str, object]:
return {
"clarify_id": self.clarify_id,
"session_key": self.session_key,
"question": self.question,
"choices": list(self.choices) if self.choices else None,
}
_lock = threading.RLock()
# clarify_id → _ClarifyEntry (primary lookup for button callbacks)
_entries: Dict[str, _ClarifyEntry] = {}
# session_key → list[clarify_id] (FIFO; for text-fallback intercept and session cleanup)
_session_index: Dict[str, List[str]] = {}
# =========================================================================
# Public API — agent-thread side
# =========================================================================
def register(
clarify_id: str,
session_key: str,
question: str,
choices: Optional[List[str]],
) -> _ClarifyEntry:
"""Register a pending clarify request and return the entry.
The caller (gateway clarify_callback) will then send the prompt to the
user and block on ``wait_for_response(clarify_id, timeout)``.
"""
entry = _ClarifyEntry(
clarify_id=clarify_id,
session_key=session_key,
question=question,
choices=list(choices) if choices else None,
# Open-ended (no choices) → next message IS the response, no buttons needed.
awaiting_text=not bool(choices),
)
with _lock:
_entries[clarify_id] = entry
_session_index.setdefault(session_key, []).append(clarify_id)
return entry
def wait_for_response(clarify_id: str, timeout: float) -> Optional[str]:
"""Block on the entry's event until resolved or timeout fires.
Polls in 1-second slices so the agent's inactivity heartbeat keeps
firing without this, ``Event.wait(timeout=600)`` blocks the thread
for 10 minutes with zero activity touches and the gateway's inactivity
watchdog kills the agent while the user is still typing.
Returns the resolved response string, or ``None`` on timeout.
"""
with _lock:
entry = _entries.get(clarify_id)
if entry is None:
return None
try:
from tools.environments.base import touch_activity_if_due
except Exception: # pragma: no cover - optional
touch_activity_if_due = None
deadline = time.monotonic() + max(timeout, 0.0)
activity_state = {"last_touch": time.monotonic(), "start": time.monotonic()}
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
if entry.event.wait(timeout=min(1.0, remaining)):
break
if touch_activity_if_due is not None:
touch_activity_if_due(activity_state, "waiting for user clarify response")
with _lock:
# Remove from indices regardless of resolution outcome.
_entries.pop(clarify_id, None)
ids = _session_index.get(entry.session_key)
if ids and clarify_id in ids:
ids.remove(clarify_id)
if not ids:
_session_index.pop(entry.session_key, None)
return entry.response
# =========================================================================
# Public API — gateway / adapter side
# =========================================================================
def resolve_gateway_clarify(clarify_id: str, response: str) -> bool:
"""Unblock the agent thread waiting on ``clarify_id``.
Returns True if an entry was found and resolved, False otherwise
(already resolved, expired, or never existed).
"""
with _lock:
entry = _entries.get(clarify_id)
if entry is None:
return False
entry.response = str(response) if response is not None else ""
entry.event.set()
return True
def get_pending_for_session(session_key: str) -> Optional[_ClarifyEntry]:
"""Return the OLDEST pending clarify entry for a session, or None.
Used by the text-fallback intercept in ``_handle_message`` when a
clarify is awaiting a free-form text response, the next user message
in that session is captured as the answer.
"""
with _lock:
ids = _session_index.get(session_key) or []
for cid in ids:
entry = _entries.get(cid)
if entry is None:
continue
if entry.awaiting_text:
return entry
return None
def mark_awaiting_text(clarify_id: str) -> bool:
"""Flip an entry into text-capture mode (user picked the 'Other' button).
Returns True if the entry exists and was flipped, False otherwise.
"""
with _lock:
entry = _entries.get(clarify_id)
if entry is None:
return False
entry.awaiting_text = True
return True
def has_pending(session_key: str) -> bool:
"""Return True when this session has at least one pending clarify entry."""
with _lock:
ids = _session_index.get(session_key) or []
return any(_entries.get(cid) is not None for cid in ids)
def clear_session(session_key: str) -> int:
"""Resolve and drop every pending clarify for a session.
Used by session-boundary cleanup (e.g. ``/new``, gateway shutdown,
cached-agent eviction) so blocked agent threads don't hang past the
end of their session. Returns the number of entries cancelled.
"""
with _lock:
ids = list(_session_index.pop(session_key, []) or [])
entries = [_entries.pop(cid, None) for cid in ids]
cancelled = 0
for entry in entries:
if entry is None:
continue
# Empty string sentinel — agent code can distinguish from a real
# response by inspecting the wait_for_response return value
# alongside its own timeout deadline. Most callers just treat any
# falsy result as "user did not respond".
entry.response = ""
entry.event.set()
cancelled += 1
return cancelled
# =========================================================================
# Config
# =========================================================================
def get_clarify_timeout() -> int:
"""Read the clarify response timeout (seconds) from config.
Defaults to 600 (10 minutes) long enough for the user to type a
thoughtful response, short enough that an abandoned prompt eventually
unblocks the agent thread instead of pinning the running-agent guard
forever.
Reads ``agent.clarify_timeout`` from config.yaml.
"""
try:
from hermes_cli.config import load_config
cfg = load_config() or {}
agent_cfg = cfg.get("agent", {}) or {}
return int(agent_cfg.get("clarify_timeout", 600))
except Exception:
return 600
# =========================================================================
# Per-session notify hook (gateway → adapter bridge)
# =========================================================================
# Mirrors tools.approval's _gateway_notify_cbs: the gateway registers a
# per-session callback that sends the clarify prompt to the user. The
# callback bridges sync→async (runs on the agent thread; schedules the
# adapter ``send_clarify`` call on the event loop).
_notify_cbs: Dict[str, Callable[[_ClarifyEntry], None]] = {}
def register_notify(session_key: str, cb: Callable[[_ClarifyEntry], None]) -> None:
"""Register a per-session notify callback used by ``clarify_callback``."""
with _lock:
_notify_cbs[session_key] = cb
def unregister_notify(session_key: str) -> None:
"""Drop the per-session notify callback and cancel any pending clarify entries."""
with _lock:
_notify_cbs.pop(session_key, None)
# Cancel any pending entries so blocked threads unwind when the run
# ends (interrupt, completion, gateway shutdown).
clear_session(session_key)
def get_notify(session_key: str) -> Optional[Callable[[_ClarifyEntry], None]]:
with _lock:
return _notify_cbs.get(session_key)

View file

@ -907,6 +907,19 @@ When the agent tries to run a potentially dangerous command, it asks you for app
Reply "yes"/"y" to approve or "no"/"n" to deny. Reply "yes"/"y" to approve or "no"/"n" to deny.
## Interactive Prompts (clarify)
When the agent calls the `clarify` tool — to ask which approach you prefer, get post-task feedback, or check before a non-trivial decision — Telegram renders the question with **inline keyboard buttons**:
> ❓ Which framework should I use for the dashboard?
>
> [1. Next.js] [2. Remix] [3. Astro]
> [✏️ Other (type answer)]
Tap a button to answer, or tap **Other** to type a free-form response (the next message you send becomes the answer). Open-ended `clarify` calls (no preset choices) skip the buttons and just capture your next message.
Configure the response timeout via `agent.clarify_timeout` in `~/.hermes/config.yaml` (default `600` seconds). If you don't respond within the timeout, the agent unblocks with a sentinel message and adapts rather than hanging.
## Security ## Security
:::warning :::warning