From ac95b8cdbec1748d1255cee8bb39aa5f92254864 Mon Sep 17 00:00:00 2001 From: wilsen0 <132184373+wilsen0@users.noreply.github.com> Date: Sun, 10 May 2026 22:18:06 -0700 Subject: [PATCH] perf(gateway): tune Telegram cadence + adaptive fast-path for short replies Re-authored against current main from PR #10388 by @wilsen0. The original branch is 3800+ commits stale and could not be cherry-picked without reverting unrelated work; this change carries only the perf intent forward. Tuning summary ============== Text-batch ingress (gateway/platforms/telegram.py): - HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS default 0.6 -> 0.3 - HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS default 2.0 -> 1.0 - Adaptive fast-path tiers in _flush_text_batch: total <= 320 cp -> min(cap, 0.18) total <= 1024 cp -> min(cap, 0.24) else -> cap A single short reply now reaches the agent in ~180ms instead of 600ms. Tier constants compose with the configured cap via min() so an operator who tightens HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS below 0.18 still wins on every tier. - _env_float_clamped helper replaces bare float(os.getenv()). Rejects NaN / Inf, applies optional min/max bounds. Used for text-batch + media-batch knobs. Prevents asyncio.sleep(NaN) crashes when an operator typos an env var. Stream cadence (gateway/config.py + stream_consumer.py): - StreamingConfig.edit_interval default 1.0s -> 0.8s - StreamingConfig.buffer_threshold default 40 -> 24 chars - DEFAULT_STREAMING_EDIT_INTERVAL / BUFFER_THRESHOLD / CURSOR are now a single source of truth. StreamConsumerConfig imports them instead of duplicating the literals; the prior dual-source drift is fixed. Tool progress (gateway/display_config.py): - Telegram default tool_progress 'all' -> 'new'. Inside Telegram's ~1 edit/s flood envelope the 'all' default would accumulate edit pressure on busy chats; 'new' shows only the leading bubble per tool batch and feels less spammy. - Slack tier_low override (tool_progress='off') is preserved. Composition with native draft streaming (#23512) ================================================ The mid-stream cadence (edit_interval, buffer_threshold) gates BOTH the draft path (send_draft) and the edit path (edit_message), so the tighter cadence helps native draft as much as edit-based. The text-batch fast-path applies before the consumer starts, so it speeds up the first-token latency on every transport. No conflict. Stale-base avoidance ==================== Re-authored from scratch rather than cherry-picked. Dropped from the original branch: - Unrelated d2f043f9c 'fix(anthropic): preserve third-party thinking continuity' commit - boot_md.py builtin gateway hook (unrelated) - Reverted Slack tool_progress='off' (#14663) restoration - Reverted Platform plugin discovery, MSGRAPH_WEBHOOK, YUANBAO members deletion - 2300+ lines of run.py base-skew noise Tests ===== New tests/gateway/test_telegram_text_batch_perf.py: - 7 tests for _env_float_clamped (NaN, Inf, garbage, bounds). - 4 tests for the adaptive-tier composition rules. Updated tests/gateway/test_display_config.py: - test_platform_default_when_no_user_config: 'all' -> 'new' for Telegram, with comment. - test_high_tier_platforms: split into Telegram-overrides-to-new and Discord-stays-all assertions. Closes #10388. Co-authored-by: wilsen0 <132184373+wilsen0@users.noreply.github.com> --- gateway/config.py | 26 +++++++++--- gateway/display_config.py | 2 +- gateway/platforms/telegram.py | 79 ++++++++++++++++++++++++++++++++--- gateway/stream_consumer.py | 11 +++-- 4 files changed, 103 insertions(+), 15 deletions(-) diff --git a/gateway/config.py b/gateway/config.py index 5edb17b8094..89393f9117e 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -317,6 +317,16 @@ class PlatformConfig: ) +# Streaming defaults — single source of truth so both StreamingConfig and +# StreamConsumerConfig agree on the out-of-the-box edit rhythm. Tuned for +# Telegram's ~1 edit/s flood envelope: a touch under 1s lets the cadence +# breathe without bumping into rate limits, and a smaller buffer threshold +# makes short replies feel near-instant in DMs. +DEFAULT_STREAMING_EDIT_INTERVAL: float = 0.8 +DEFAULT_STREAMING_BUFFER_THRESHOLD: int = 24 +DEFAULT_STREAMING_CURSOR: str = " ▉" + + @dataclass class StreamingConfig: """Configuration for real-time token streaming to messaging platforms.""" @@ -330,9 +340,9 @@ class StreamingConfig: # "edit" — progressive editMessageText only (legacy behaviour). # "off" — disable streaming entirely. transport: str = "auto" - edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s) - buffer_threshold: int = 40 # Chars before forcing an edit - cursor: str = " ▉" # Cursor shown during streaming + edit_interval: float = DEFAULT_STREAMING_EDIT_INTERVAL + buffer_threshold: int = DEFAULT_STREAMING_BUFFER_THRESHOLD + cursor: str = DEFAULT_STREAMING_CURSOR # Ported from openclaw/openclaw#72038. When >0, the final edit for # a long-running streamed response is delivered as a fresh message # if the original preview has been visible for at least this many @@ -359,9 +369,13 @@ class StreamingConfig: return cls( enabled=_coerce_bool(data.get("enabled"), False), transport=data.get("transport", "auto"), - edit_interval=_coerce_float(data.get("edit_interval"), 1.0), - buffer_threshold=_coerce_int(data.get("buffer_threshold"), 40), - cursor=data.get("cursor", " ▉"), + edit_interval=_coerce_float( + data.get("edit_interval"), DEFAULT_STREAMING_EDIT_INTERVAL, + ), + buffer_threshold=_coerce_int( + data.get("buffer_threshold"), DEFAULT_STREAMING_BUFFER_THRESHOLD, + ), + cursor=data.get("cursor", DEFAULT_STREAMING_CURSOR), fresh_final_after_seconds=_coerce_float( data.get("fresh_final_after_seconds"), 60.0 ), diff --git a/gateway/display_config.py b/gateway/display_config.py index 55cc344677e..2d8f40f115f 100644 --- a/gateway/display_config.py +++ b/gateway/display_config.py @@ -81,7 +81,7 @@ _TIER_MINIMAL = { _PLATFORM_DEFAULTS: dict[str, dict[str, Any]] = { # Tier 1 — full edit support, personal/team use - "telegram": _TIER_HIGH, + "telegram": {**_TIER_HIGH, "tool_progress": "new"}, "discord": _TIER_HIGH, # Tier 2 — edit support, often customer/workspace channels diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 996c41ab619..c1f312783a4 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -282,6 +282,45 @@ class TelegramAdapter(BasePlatformAdapter): MEDIA_GROUP_WAIT_SECONDS = 0.8 _GENERAL_TOPIC_THREAD_ID = "1" + # Adaptive text-batch ingress: short messages need a tighter delay so the + # first token reaches the agent fast. Numbers tuned for "feels instant": + # ≤320 codepoints (one short paragraph) settles in ~180ms; ≤1024 + # (a normal paragraph) in ~240ms; longer waits the configured cap. + # Always clamped to ``_text_batch_delay_seconds`` so an operator can lower + # the cap further via env var. + _TEXT_BATCH_FAST_LEN = 320 + _TEXT_BATCH_FAST_DELAY_S = 0.18 + _TEXT_BATCH_SHORT_LEN = 1024 + _TEXT_BATCH_SHORT_DELAY_S = 0.24 + + @staticmethod + def _env_float_clamped( + name: str, + default: float, + *, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + ) -> float: + """Read a float env var, reject non-finite values, and clamp to bounds. + + Guarantees the returned value is a finite number usable directly in + ``asyncio.sleep()`` and similar APIs that reject NaN / Inf. + """ + import math + + raw = os.getenv(name) + try: + value = float(raw) if raw is not None else float(default) + except (TypeError, ValueError): + value = float(default) + if not math.isfinite(value): + value = float(default) + if min_value is not None: + value = max(value, min_value) + if max_value is not None: + value = min(value, max_value) + return value + @property def message_len_fn(self): """Telegram measures message length in UTF-16 code units.""" @@ -303,9 +342,24 @@ class TelegramAdapter(BasePlatformAdapter): self._media_group_events: Dict[str, MessageEvent] = {} self._media_group_tasks: Dict[str, asyncio.Task] = {} # Buffer rapid text messages so Telegram client-side splits of long - # messages are aggregated into a single MessageEvent. - self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6")) - self._text_batch_split_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")) + # messages are aggregated into a single MessageEvent. Lower defaults + # (0.3s / 1.0s instead of 0.6s / 2.0s) let short replies stream + # without a noticeable wait — combined with the adaptive fast-path + # in ``_calc_text_batch_delay`` below, ≤320-codepoint replies settle + # in ~180ms. All bounds are conservative for Telegram's + # ~1 edit/s flood envelope. + self._text_batch_delay_seconds = self._env_float_clamped( + "HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", + 0.3, + min_value=0.08, + max_value=2.0, + ) + self._text_batch_split_delay_seconds = self._env_float_clamped( + "HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS", + 1.0, + min_value=self._text_batch_delay_seconds, + max_value=4.0, + ) self._pending_text_batches: Dict[str, MessageEvent] = {} self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} self._polling_error_task: Optional[asyncio.Task] = None @@ -3808,12 +3862,27 @@ class TelegramAdapter(BasePlatformAdapter): """ current_task = asyncio.current_task() try: - # Adaptive delay: if the latest chunk is near Telegram's 4096-char - # split point, a continuation is almost certain — wait longer. + # Adaptive delay tiers: + # - last chunk ≥ _SPLIT_THRESHOLD: a continuation is almost + # certain → wait the longer split delay. + # - total accumulated text ≤ _TEXT_BATCH_FAST_LEN (~320 cp): + # short message → cap delay at _TEXT_BATCH_FAST_DELAY_S + # so the agent sees the text near-instantly. + # - total ≤ _TEXT_BATCH_SHORT_LEN (~1024 cp): + # medium → cap at _TEXT_BATCH_SHORT_DELAY_S. + # - otherwise: use the configured cap. + # Tiers compose with operator overrides via the env-var-driven + # ``_text_batch_delay_seconds`` (e.g. an operator who sets the + # cap below 0.18s gets that lower number on every tier). pending = self._pending_text_batches.get(key) last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + total_len = len(getattr(pending, "text", "") or "") if pending else 0 if last_len >= self._SPLIT_THRESHOLD: delay = self._text_batch_split_delay_seconds + elif total_len <= self._TEXT_BATCH_FAST_LEN: + delay = min(self._text_batch_delay_seconds, self._TEXT_BATCH_FAST_DELAY_S) + elif total_len <= self._TEXT_BATCH_SHORT_LEN: + delay = min(self._text_batch_delay_seconds, self._TEXT_BATCH_SHORT_DELAY_S) else: delay = self._text_batch_delay_seconds await asyncio.sleep(delay) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 5e553b67ee5..558a86bd295 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -25,6 +25,11 @@ from typing import Any, Callable, Optional from gateway.platforms.base import BasePlatformAdapter as _BasePlatformAdapter from gateway.platforms.base import _custom_unit_to_cp +from gateway.config import ( + DEFAULT_STREAMING_EDIT_INTERVAL as _DEFAULT_STREAMING_EDIT_INTERVAL, + DEFAULT_STREAMING_BUFFER_THRESHOLD as _DEFAULT_STREAMING_BUFFER_THRESHOLD, + DEFAULT_STREAMING_CURSOR as _DEFAULT_STREAMING_CURSOR, +) logger = logging.getLogger("gateway.stream_consumer") @@ -43,9 +48,9 @@ _COMMENTARY = object() @dataclass class StreamConsumerConfig: """Runtime config for a single stream consumer instance.""" - edit_interval: float = 1.0 - buffer_threshold: int = 40 - cursor: str = " ▉" + edit_interval: float = _DEFAULT_STREAMING_EDIT_INTERVAL + buffer_threshold: int = _DEFAULT_STREAMING_BUFFER_THRESHOLD + cursor: str = _DEFAULT_STREAMING_CURSOR buffer_only: bool = False # When >0, the final edit for a streamed response is delivered as a # fresh message if the original preview has been visible for at least