mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-24 05:41:40 +00:00
perf(gateway): tune Telegram cadence + adaptive fast-path for short replies
Re-authored against current main from PR #10388 by @wilsen0. The
original branch is 3800+ commits stale and could not be cherry-picked
without reverting unrelated work; this change carries only the perf
intent forward.
Tuning summary
==============
Text-batch ingress (gateway/platforms/telegram.py):
- HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS default 0.6 -> 0.3
- HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS default 2.0 -> 1.0
- Adaptive fast-path tiers in _flush_text_batch:
total <= 320 cp -> min(cap, 0.18)
total <= 1024 cp -> min(cap, 0.24)
else -> cap
A single short reply now reaches the agent in ~180ms instead of
600ms. Tier constants compose with the configured cap via min()
so an operator who tightens HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS
below 0.18 still wins on every tier.
- _env_float_clamped helper replaces bare float(os.getenv()).
Rejects NaN / Inf, applies optional min/max bounds. Used for
text-batch + media-batch knobs. Prevents asyncio.sleep(NaN)
crashes when an operator typos an env var.
Stream cadence (gateway/config.py + stream_consumer.py):
- StreamingConfig.edit_interval default 1.0s -> 0.8s
- StreamingConfig.buffer_threshold default 40 -> 24 chars
- DEFAULT_STREAMING_EDIT_INTERVAL / BUFFER_THRESHOLD / CURSOR are now
a single source of truth. StreamConsumerConfig imports them
instead of duplicating the literals; the prior dual-source drift
is fixed.
Tool progress (gateway/display_config.py):
- Telegram default tool_progress 'all' -> 'new'. Inside
Telegram's ~1 edit/s flood envelope the 'all' default would
accumulate edit pressure on busy chats; 'new' shows only the
leading bubble per tool batch and feels less spammy.
- Slack tier_low override (tool_progress='off') is preserved.
Composition with native draft streaming (#23512)
================================================
The mid-stream cadence (edit_interval, buffer_threshold) gates BOTH
the draft path (send_draft) and the edit path (edit_message), so the
tighter cadence helps native draft as much as edit-based. The
text-batch fast-path applies before the consumer starts, so it speeds
up the first-token latency on every transport. No conflict.
Stale-base avoidance
====================
Re-authored from scratch rather than cherry-picked. Dropped from the
original branch:
- Unrelated d2f043f9c 'fix(anthropic): preserve third-party thinking
continuity' commit
- boot_md.py builtin gateway hook (unrelated)
- Reverted Slack tool_progress='off' (#14663) restoration
- Reverted Platform plugin discovery, MSGRAPH_WEBHOOK, YUANBAO
members deletion
- 2300+ lines of run.py base-skew noise
Tests
=====
New tests/gateway/test_telegram_text_batch_perf.py:
- 7 tests for _env_float_clamped (NaN, Inf, garbage, bounds).
- 4 tests for the adaptive-tier composition rules.
Updated tests/gateway/test_display_config.py:
- test_platform_default_when_no_user_config: 'all' -> 'new' for
Telegram, with comment.
- test_high_tier_platforms: split into Telegram-overrides-to-new
and Discord-stays-all assertions.
Closes #10388.
Co-authored-by: wilsen0 <132184373+wilsen0@users.noreply.github.com>
This commit is contained in:
parent
e3b88a8fe2
commit
ac95b8cdbe
4 changed files with 103 additions and 15 deletions
|
|
@ -317,6 +317,16 @@ class PlatformConfig:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Streaming defaults — single source of truth so both StreamingConfig and
|
||||||
|
# StreamConsumerConfig agree on the out-of-the-box edit rhythm. Tuned for
|
||||||
|
# Telegram's ~1 edit/s flood envelope: a touch under 1s lets the cadence
|
||||||
|
# breathe without bumping into rate limits, and a smaller buffer threshold
|
||||||
|
# makes short replies feel near-instant in DMs.
|
||||||
|
DEFAULT_STREAMING_EDIT_INTERVAL: float = 0.8
|
||||||
|
DEFAULT_STREAMING_BUFFER_THRESHOLD: int = 24
|
||||||
|
DEFAULT_STREAMING_CURSOR: str = " ▉"
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class StreamingConfig:
|
class StreamingConfig:
|
||||||
"""Configuration for real-time token streaming to messaging platforms."""
|
"""Configuration for real-time token streaming to messaging platforms."""
|
||||||
|
|
@ -330,9 +340,9 @@ class StreamingConfig:
|
||||||
# "edit" — progressive editMessageText only (legacy behaviour).
|
# "edit" — progressive editMessageText only (legacy behaviour).
|
||||||
# "off" — disable streaming entirely.
|
# "off" — disable streaming entirely.
|
||||||
transport: str = "auto"
|
transport: str = "auto"
|
||||||
edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s)
|
edit_interval: float = DEFAULT_STREAMING_EDIT_INTERVAL
|
||||||
buffer_threshold: int = 40 # Chars before forcing an edit
|
buffer_threshold: int = DEFAULT_STREAMING_BUFFER_THRESHOLD
|
||||||
cursor: str = " ▉" # Cursor shown during streaming
|
cursor: str = DEFAULT_STREAMING_CURSOR
|
||||||
# Ported from openclaw/openclaw#72038. When >0, the final edit for
|
# Ported from openclaw/openclaw#72038. When >0, the final edit for
|
||||||
# a long-running streamed response is delivered as a fresh message
|
# a long-running streamed response is delivered as a fresh message
|
||||||
# if the original preview has been visible for at least this many
|
# if the original preview has been visible for at least this many
|
||||||
|
|
@ -359,9 +369,13 @@ class StreamingConfig:
|
||||||
return cls(
|
return cls(
|
||||||
enabled=_coerce_bool(data.get("enabled"), False),
|
enabled=_coerce_bool(data.get("enabled"), False),
|
||||||
transport=data.get("transport", "auto"),
|
transport=data.get("transport", "auto"),
|
||||||
edit_interval=_coerce_float(data.get("edit_interval"), 1.0),
|
edit_interval=_coerce_float(
|
||||||
buffer_threshold=_coerce_int(data.get("buffer_threshold"), 40),
|
data.get("edit_interval"), DEFAULT_STREAMING_EDIT_INTERVAL,
|
||||||
cursor=data.get("cursor", " ▉"),
|
),
|
||||||
|
buffer_threshold=_coerce_int(
|
||||||
|
data.get("buffer_threshold"), DEFAULT_STREAMING_BUFFER_THRESHOLD,
|
||||||
|
),
|
||||||
|
cursor=data.get("cursor", DEFAULT_STREAMING_CURSOR),
|
||||||
fresh_final_after_seconds=_coerce_float(
|
fresh_final_after_seconds=_coerce_float(
|
||||||
data.get("fresh_final_after_seconds"), 60.0
|
data.get("fresh_final_after_seconds"), 60.0
|
||||||
),
|
),
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ _TIER_MINIMAL = {
|
||||||
|
|
||||||
_PLATFORM_DEFAULTS: dict[str, dict[str, Any]] = {
|
_PLATFORM_DEFAULTS: dict[str, dict[str, Any]] = {
|
||||||
# Tier 1 — full edit support, personal/team use
|
# Tier 1 — full edit support, personal/team use
|
||||||
"telegram": _TIER_HIGH,
|
"telegram": {**_TIER_HIGH, "tool_progress": "new"},
|
||||||
"discord": _TIER_HIGH,
|
"discord": _TIER_HIGH,
|
||||||
|
|
||||||
# Tier 2 — edit support, often customer/workspace channels
|
# Tier 2 — edit support, often customer/workspace channels
|
||||||
|
|
|
||||||
|
|
@ -282,6 +282,45 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
||||||
_GENERAL_TOPIC_THREAD_ID = "1"
|
_GENERAL_TOPIC_THREAD_ID = "1"
|
||||||
|
|
||||||
|
# Adaptive text-batch ingress: short messages need a tighter delay so the
|
||||||
|
# first token reaches the agent fast. Numbers tuned for "feels instant":
|
||||||
|
# ≤320 codepoints (one short paragraph) settles in ~180ms; ≤1024
|
||||||
|
# (a normal paragraph) in ~240ms; longer waits the configured cap.
|
||||||
|
# Always clamped to ``_text_batch_delay_seconds`` so an operator can lower
|
||||||
|
# the cap further via env var.
|
||||||
|
_TEXT_BATCH_FAST_LEN = 320
|
||||||
|
_TEXT_BATCH_FAST_DELAY_S = 0.18
|
||||||
|
_TEXT_BATCH_SHORT_LEN = 1024
|
||||||
|
_TEXT_BATCH_SHORT_DELAY_S = 0.24
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _env_float_clamped(
|
||||||
|
name: str,
|
||||||
|
default: float,
|
||||||
|
*,
|
||||||
|
min_value: Optional[float] = None,
|
||||||
|
max_value: Optional[float] = None,
|
||||||
|
) -> float:
|
||||||
|
"""Read a float env var, reject non-finite values, and clamp to bounds.
|
||||||
|
|
||||||
|
Guarantees the returned value is a finite number usable directly in
|
||||||
|
``asyncio.sleep()`` and similar APIs that reject NaN / Inf.
|
||||||
|
"""
|
||||||
|
import math
|
||||||
|
|
||||||
|
raw = os.getenv(name)
|
||||||
|
try:
|
||||||
|
value = float(raw) if raw is not None else float(default)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
value = float(default)
|
||||||
|
if not math.isfinite(value):
|
||||||
|
value = float(default)
|
||||||
|
if min_value is not None:
|
||||||
|
value = max(value, min_value)
|
||||||
|
if max_value is not None:
|
||||||
|
value = min(value, max_value)
|
||||||
|
return value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def message_len_fn(self):
|
def message_len_fn(self):
|
||||||
"""Telegram measures message length in UTF-16 code units."""
|
"""Telegram measures message length in UTF-16 code units."""
|
||||||
|
|
@ -303,9 +342,24 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
self._media_group_events: Dict[str, MessageEvent] = {}
|
self._media_group_events: Dict[str, MessageEvent] = {}
|
||||||
self._media_group_tasks: Dict[str, asyncio.Task] = {}
|
self._media_group_tasks: Dict[str, asyncio.Task] = {}
|
||||||
# Buffer rapid text messages so Telegram client-side splits of long
|
# Buffer rapid text messages so Telegram client-side splits of long
|
||||||
# messages are aggregated into a single MessageEvent.
|
# messages are aggregated into a single MessageEvent. Lower defaults
|
||||||
self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
# (0.3s / 1.0s instead of 0.6s / 2.0s) let short replies stream
|
||||||
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
|
# without a noticeable wait — combined with the adaptive fast-path
|
||||||
|
# in ``_calc_text_batch_delay`` below, ≤320-codepoint replies settle
|
||||||
|
# in ~180ms. All bounds are conservative for Telegram's
|
||||||
|
# ~1 edit/s flood envelope.
|
||||||
|
self._text_batch_delay_seconds = self._env_float_clamped(
|
||||||
|
"HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS",
|
||||||
|
0.3,
|
||||||
|
min_value=0.08,
|
||||||
|
max_value=2.0,
|
||||||
|
)
|
||||||
|
self._text_batch_split_delay_seconds = self._env_float_clamped(
|
||||||
|
"HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS",
|
||||||
|
1.0,
|
||||||
|
min_value=self._text_batch_delay_seconds,
|
||||||
|
max_value=4.0,
|
||||||
|
)
|
||||||
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||||
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||||
self._polling_error_task: Optional[asyncio.Task] = None
|
self._polling_error_task: Optional[asyncio.Task] = None
|
||||||
|
|
@ -3808,12 +3862,27 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
"""
|
"""
|
||||||
current_task = asyncio.current_task()
|
current_task = asyncio.current_task()
|
||||||
try:
|
try:
|
||||||
# Adaptive delay: if the latest chunk is near Telegram's 4096-char
|
# Adaptive delay tiers:
|
||||||
# split point, a continuation is almost certain — wait longer.
|
# - last chunk ≥ _SPLIT_THRESHOLD: a continuation is almost
|
||||||
|
# certain → wait the longer split delay.
|
||||||
|
# - total accumulated text ≤ _TEXT_BATCH_FAST_LEN (~320 cp):
|
||||||
|
# short message → cap delay at _TEXT_BATCH_FAST_DELAY_S
|
||||||
|
# so the agent sees the text near-instantly.
|
||||||
|
# - total ≤ _TEXT_BATCH_SHORT_LEN (~1024 cp):
|
||||||
|
# medium → cap at _TEXT_BATCH_SHORT_DELAY_S.
|
||||||
|
# - otherwise: use the configured cap.
|
||||||
|
# Tiers compose with operator overrides via the env-var-driven
|
||||||
|
# ``_text_batch_delay_seconds`` (e.g. an operator who sets the
|
||||||
|
# cap below 0.18s gets that lower number on every tier).
|
||||||
pending = self._pending_text_batches.get(key)
|
pending = self._pending_text_batches.get(key)
|
||||||
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
total_len = len(getattr(pending, "text", "") or "") if pending else 0
|
||||||
if last_len >= self._SPLIT_THRESHOLD:
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
delay = self._text_batch_split_delay_seconds
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
elif total_len <= self._TEXT_BATCH_FAST_LEN:
|
||||||
|
delay = min(self._text_batch_delay_seconds, self._TEXT_BATCH_FAST_DELAY_S)
|
||||||
|
elif total_len <= self._TEXT_BATCH_SHORT_LEN:
|
||||||
|
delay = min(self._text_batch_delay_seconds, self._TEXT_BATCH_SHORT_DELAY_S)
|
||||||
else:
|
else:
|
||||||
delay = self._text_batch_delay_seconds
|
delay = self._text_batch_delay_seconds
|
||||||
await asyncio.sleep(delay)
|
await asyncio.sleep(delay)
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,11 @@ from typing import Any, Callable, Optional
|
||||||
|
|
||||||
from gateway.platforms.base import BasePlatformAdapter as _BasePlatformAdapter
|
from gateway.platforms.base import BasePlatformAdapter as _BasePlatformAdapter
|
||||||
from gateway.platforms.base import _custom_unit_to_cp
|
from gateway.platforms.base import _custom_unit_to_cp
|
||||||
|
from gateway.config import (
|
||||||
|
DEFAULT_STREAMING_EDIT_INTERVAL as _DEFAULT_STREAMING_EDIT_INTERVAL,
|
||||||
|
DEFAULT_STREAMING_BUFFER_THRESHOLD as _DEFAULT_STREAMING_BUFFER_THRESHOLD,
|
||||||
|
DEFAULT_STREAMING_CURSOR as _DEFAULT_STREAMING_CURSOR,
|
||||||
|
)
|
||||||
|
|
||||||
logger = logging.getLogger("gateway.stream_consumer")
|
logger = logging.getLogger("gateway.stream_consumer")
|
||||||
|
|
||||||
|
|
@ -43,9 +48,9 @@ _COMMENTARY = object()
|
||||||
@dataclass
|
@dataclass
|
||||||
class StreamConsumerConfig:
|
class StreamConsumerConfig:
|
||||||
"""Runtime config for a single stream consumer instance."""
|
"""Runtime config for a single stream consumer instance."""
|
||||||
edit_interval: float = 1.0
|
edit_interval: float = _DEFAULT_STREAMING_EDIT_INTERVAL
|
||||||
buffer_threshold: int = 40
|
buffer_threshold: int = _DEFAULT_STREAMING_BUFFER_THRESHOLD
|
||||||
cursor: str = " ▉"
|
cursor: str = _DEFAULT_STREAMING_CURSOR
|
||||||
buffer_only: bool = False
|
buffer_only: bool = False
|
||||||
# When >0, the final edit for a streamed response is delivered as a
|
# When >0, the final edit for a streamed response is delivered as a
|
||||||
# fresh message if the original preview has been visible for at least
|
# fresh message if the original preview has been visible for at least
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue