diff --git a/gateway/config.py b/gateway/config.py index 5edb17b8094..89393f9117e 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -317,6 +317,16 @@ class PlatformConfig: ) +# Streaming defaults — single source of truth so both StreamingConfig and +# StreamConsumerConfig agree on the out-of-the-box edit rhythm. Tuned for +# Telegram's ~1 edit/s flood envelope: a touch under 1s lets the cadence +# breathe without bumping into rate limits, and a smaller buffer threshold +# makes short replies feel near-instant in DMs. +DEFAULT_STREAMING_EDIT_INTERVAL: float = 0.8 +DEFAULT_STREAMING_BUFFER_THRESHOLD: int = 24 +DEFAULT_STREAMING_CURSOR: str = " ▉" + + @dataclass class StreamingConfig: """Configuration for real-time token streaming to messaging platforms.""" @@ -330,9 +340,9 @@ class StreamingConfig: # "edit" — progressive editMessageText only (legacy behaviour). # "off" — disable streaming entirely. transport: str = "auto" - edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s) - buffer_threshold: int = 40 # Chars before forcing an edit - cursor: str = " ▉" # Cursor shown during streaming + edit_interval: float = DEFAULT_STREAMING_EDIT_INTERVAL + buffer_threshold: int = DEFAULT_STREAMING_BUFFER_THRESHOLD + cursor: str = DEFAULT_STREAMING_CURSOR # Ported from openclaw/openclaw#72038. When >0, the final edit for # a long-running streamed response is delivered as a fresh message # if the original preview has been visible for at least this many @@ -359,9 +369,13 @@ class StreamingConfig: return cls( enabled=_coerce_bool(data.get("enabled"), False), transport=data.get("transport", "auto"), - edit_interval=_coerce_float(data.get("edit_interval"), 1.0), - buffer_threshold=_coerce_int(data.get("buffer_threshold"), 40), - cursor=data.get("cursor", " ▉"), + edit_interval=_coerce_float( + data.get("edit_interval"), DEFAULT_STREAMING_EDIT_INTERVAL, + ), + buffer_threshold=_coerce_int( + data.get("buffer_threshold"), DEFAULT_STREAMING_BUFFER_THRESHOLD, + ), + cursor=data.get("cursor", DEFAULT_STREAMING_CURSOR), fresh_final_after_seconds=_coerce_float( data.get("fresh_final_after_seconds"), 60.0 ), diff --git a/gateway/display_config.py b/gateway/display_config.py index 55cc344677e..2d8f40f115f 100644 --- a/gateway/display_config.py +++ b/gateway/display_config.py @@ -81,7 +81,7 @@ _TIER_MINIMAL = { _PLATFORM_DEFAULTS: dict[str, dict[str, Any]] = { # Tier 1 — full edit support, personal/team use - "telegram": _TIER_HIGH, + "telegram": {**_TIER_HIGH, "tool_progress": "new"}, "discord": _TIER_HIGH, # Tier 2 — edit support, often customer/workspace channels diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 996c41ab619..c1f312783a4 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -282,6 +282,45 @@ class TelegramAdapter(BasePlatformAdapter): MEDIA_GROUP_WAIT_SECONDS = 0.8 _GENERAL_TOPIC_THREAD_ID = "1" + # Adaptive text-batch ingress: short messages need a tighter delay so the + # first token reaches the agent fast. Numbers tuned for "feels instant": + # ≤320 codepoints (one short paragraph) settles in ~180ms; ≤1024 + # (a normal paragraph) in ~240ms; longer waits the configured cap. + # Always clamped to ``_text_batch_delay_seconds`` so an operator can lower + # the cap further via env var. + _TEXT_BATCH_FAST_LEN = 320 + _TEXT_BATCH_FAST_DELAY_S = 0.18 + _TEXT_BATCH_SHORT_LEN = 1024 + _TEXT_BATCH_SHORT_DELAY_S = 0.24 + + @staticmethod + def _env_float_clamped( + name: str, + default: float, + *, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + ) -> float: + """Read a float env var, reject non-finite values, and clamp to bounds. + + Guarantees the returned value is a finite number usable directly in + ``asyncio.sleep()`` and similar APIs that reject NaN / Inf. + """ + import math + + raw = os.getenv(name) + try: + value = float(raw) if raw is not None else float(default) + except (TypeError, ValueError): + value = float(default) + if not math.isfinite(value): + value = float(default) + if min_value is not None: + value = max(value, min_value) + if max_value is not None: + value = min(value, max_value) + return value + @property def message_len_fn(self): """Telegram measures message length in UTF-16 code units.""" @@ -303,9 +342,24 @@ class TelegramAdapter(BasePlatformAdapter): self._media_group_events: Dict[str, MessageEvent] = {} self._media_group_tasks: Dict[str, asyncio.Task] = {} # Buffer rapid text messages so Telegram client-side splits of long - # messages are aggregated into a single MessageEvent. - self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6")) - self._text_batch_split_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")) + # messages are aggregated into a single MessageEvent. Lower defaults + # (0.3s / 1.0s instead of 0.6s / 2.0s) let short replies stream + # without a noticeable wait — combined with the adaptive fast-path + # in ``_calc_text_batch_delay`` below, ≤320-codepoint replies settle + # in ~180ms. All bounds are conservative for Telegram's + # ~1 edit/s flood envelope. + self._text_batch_delay_seconds = self._env_float_clamped( + "HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", + 0.3, + min_value=0.08, + max_value=2.0, + ) + self._text_batch_split_delay_seconds = self._env_float_clamped( + "HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS", + 1.0, + min_value=self._text_batch_delay_seconds, + max_value=4.0, + ) self._pending_text_batches: Dict[str, MessageEvent] = {} self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} self._polling_error_task: Optional[asyncio.Task] = None @@ -3808,12 +3862,27 @@ class TelegramAdapter(BasePlatformAdapter): """ current_task = asyncio.current_task() try: - # Adaptive delay: if the latest chunk is near Telegram's 4096-char - # split point, a continuation is almost certain — wait longer. + # Adaptive delay tiers: + # - last chunk ≥ _SPLIT_THRESHOLD: a continuation is almost + # certain → wait the longer split delay. + # - total accumulated text ≤ _TEXT_BATCH_FAST_LEN (~320 cp): + # short message → cap delay at _TEXT_BATCH_FAST_DELAY_S + # so the agent sees the text near-instantly. + # - total ≤ _TEXT_BATCH_SHORT_LEN (~1024 cp): + # medium → cap at _TEXT_BATCH_SHORT_DELAY_S. + # - otherwise: use the configured cap. + # Tiers compose with operator overrides via the env-var-driven + # ``_text_batch_delay_seconds`` (e.g. an operator who sets the + # cap below 0.18s gets that lower number on every tier). pending = self._pending_text_batches.get(key) last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + total_len = len(getattr(pending, "text", "") or "") if pending else 0 if last_len >= self._SPLIT_THRESHOLD: delay = self._text_batch_split_delay_seconds + elif total_len <= self._TEXT_BATCH_FAST_LEN: + delay = min(self._text_batch_delay_seconds, self._TEXT_BATCH_FAST_DELAY_S) + elif total_len <= self._TEXT_BATCH_SHORT_LEN: + delay = min(self._text_batch_delay_seconds, self._TEXT_BATCH_SHORT_DELAY_S) else: delay = self._text_batch_delay_seconds await asyncio.sleep(delay) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 5e553b67ee5..558a86bd295 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -25,6 +25,11 @@ from typing import Any, Callable, Optional from gateway.platforms.base import BasePlatformAdapter as _BasePlatformAdapter from gateway.platforms.base import _custom_unit_to_cp +from gateway.config import ( + DEFAULT_STREAMING_EDIT_INTERVAL as _DEFAULT_STREAMING_EDIT_INTERVAL, + DEFAULT_STREAMING_BUFFER_THRESHOLD as _DEFAULT_STREAMING_BUFFER_THRESHOLD, + DEFAULT_STREAMING_CURSOR as _DEFAULT_STREAMING_CURSOR, +) logger = logging.getLogger("gateway.stream_consumer") @@ -43,9 +48,9 @@ _COMMENTARY = object() @dataclass class StreamConsumerConfig: """Runtime config for a single stream consumer instance.""" - edit_interval: float = 1.0 - buffer_threshold: int = 40 - cursor: str = " ▉" + edit_interval: float = _DEFAULT_STREAMING_EDIT_INTERVAL + buffer_threshold: int = _DEFAULT_STREAMING_BUFFER_THRESHOLD + cursor: str = _DEFAULT_STREAMING_CURSOR buffer_only: bool = False # When >0, the final edit for a streamed response is delivered as a # fresh message if the original preview has been visible for at least