mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-06 07:51:53 +00:00
The 5-second startup-grace filter in _on_room_message silently drops events where event_ts < startup_ts - 5. When the host clock is set ahead of real time, the comparison flips against every live event and the bot 'connects but never replies' — exactly the symptom in #12614. Reporter Schnurzel700 chased this for several weeks before tracing it to their Debian VM's clock being out of sync. The current /1000.0 millisecond->second conversion is correct (mautrix returns ms); the failure mode is purely environmental. Add a one-shot WARNING that fires when: - we are >30s past startup (initial-sync replay window closed), AND - 3 consecutive drops share the same skew within 60s (a constant clock offset, not varied-age backfill from an invited room). State is reset in connect() so reconnects after fixing NTP rearm the detector. Includes the NTP fix instruction in the warning message itself and a new Troubleshooting entry in the Matrix docs. 5 new tests cover the happy path, initial-sync backfill, under- threshold drops, varied-age backfill, and the reconnect rearm path.
This commit is contained in:
parent
56ad30de17
commit
519657aa98
3 changed files with 280 additions and 0 deletions
|
|
@ -2257,6 +2257,210 @@ class TestMatrixOnRoomMessageFilter:
|
|||
ev = self._mk_event(sender="@alice:example.org", body="hello bot")
|
||||
await self.adapter._on_room_message(ev)
|
||||
self.adapter._handle_text_message.assert_awaited_once()
|
||||
|
||||
|
||||
class TestMatrixClockSkewWarning:
|
||||
"""Clock-skew detector for #12614.
|
||||
|
||||
Reporter's host clock was set ~2 hours ahead of real time. The grace
|
||||
filter `event_ts < startup_ts - 5` then drops every live event because
|
||||
server timestamps look "older than startup". When this happens well
|
||||
after startup (>30s), the adapter logs a one-shot WARNING pointing the
|
||||
user at NTP instead of failing silently.
|
||||
"""
|
||||
|
||||
def setup_method(self):
|
||||
self.adapter = _make_adapter()
|
||||
self.adapter._user_id = "@bot:example.org"
|
||||
self.adapter._handle_text_message = AsyncMock()
|
||||
self.adapter._handle_media_message = AsyncMock()
|
||||
|
||||
@staticmethod
|
||||
def _mk_event(sender, ts_ms, event_id=None):
|
||||
ev = MagicMock()
|
||||
ev.room_id = "!room:example.org"
|
||||
ev.sender = sender
|
||||
ev.event_id = event_id or f"$evt-{sender}-{ts_ms}"
|
||||
ev.timestamp = ts_ms
|
||||
ev.server_timestamp = ts_ms
|
||||
ev.content = {"msgtype": "m.text", "body": "hi"}
|
||||
return ev
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_late_drops_emit_one_shot_clock_skew_warning(self, caplog):
|
||||
import logging
|
||||
import time as _t
|
||||
|
||||
# Simulate the reporter's environment: host clock is ~2 hours ahead
|
||||
# of server time. Startup happened "in the future" relative to the
|
||||
# real-world events we're now receiving.
|
||||
now = _t.time()
|
||||
self.adapter._startup_ts = now - 60 # bot started 60s ago (wall clock)
|
||||
# Server events are dated 2h before startup_ts (skewed clock).
|
||||
skewed_event_ts_ms = int((self.adapter._startup_ts - 7200) * 1000)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
|
||||
for i in range(5):
|
||||
ev = self._mk_event(
|
||||
sender=f"@alice{i}:example.org", ts_ms=skewed_event_ts_ms
|
||||
)
|
||||
await self.adapter._on_room_message(ev)
|
||||
|
||||
# Handler should never be invoked — all events failed the grace check.
|
||||
self.adapter._handle_text_message.assert_not_called()
|
||||
# Exactly one WARNING from THIS logger should be emitted. Filter by
|
||||
# logger name so unrelated stdlib/library warnings can't satisfy the
|
||||
# assertion.
|
||||
skew_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.name == "gateway.platforms.matrix"
|
||||
and r.levelname == "WARNING"
|
||||
and "set-ntp" in r.getMessage()
|
||||
]
|
||||
assert len(skew_warnings) == 1, (
|
||||
f"expected exactly 1 clock-skew warning, got {len(skew_warnings)}"
|
||||
)
|
||||
msg = skew_warnings[0].getMessage()
|
||||
assert "7200" in msg, f"skew value missing from message: {msg!r}"
|
||||
# Pin the counter so a regression in the gating logic (e.g. warning
|
||||
# at threshold 1 or 5, or not stopping after warn) is caught.
|
||||
assert self.adapter._late_grace_drops == 3
|
||||
assert self.adapter._clock_skew_warned is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initial_sync_drops_do_not_warn(self, caplog):
|
||||
"""During the first 30s after startup, old events are normal backfill."""
|
||||
import logging
|
||||
import time as _t
|
||||
|
||||
now = _t.time()
|
||||
# Startup was 1s ago — we're still in the initial-sync window.
|
||||
self.adapter._startup_ts = now - 1
|
||||
old_ts_ms = int((self.adapter._startup_ts - 3600) * 1000)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
|
||||
for i in range(5):
|
||||
ev = self._mk_event(
|
||||
sender=f"@alice{i}:example.org", ts_ms=old_ts_ms
|
||||
)
|
||||
await self.adapter._on_room_message(ev)
|
||||
|
||||
# Backfill drops are silent — no clock-skew warning fired.
|
||||
assert self.adapter._clock_skew_warned is False
|
||||
skew_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.name == "gateway.platforms.matrix"
|
||||
and "set-ntp" in r.getMessage()
|
||||
]
|
||||
assert skew_warnings == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fewer_than_three_late_drops_do_not_warn(self, caplog):
|
||||
"""A single delayed backfill event after 30s shouldn't trigger NTP advice."""
|
||||
import logging
|
||||
import time as _t
|
||||
|
||||
now = _t.time()
|
||||
self.adapter._startup_ts = now - 120 # extra slack vs the 30s gate
|
||||
old_ts_ms = int((self.adapter._startup_ts - 3600) * 1000)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
|
||||
for i in range(2): # only 2 late drops — under the threshold
|
||||
ev = self._mk_event(
|
||||
sender=f"@alice{i}:example.org", ts_ms=old_ts_ms
|
||||
)
|
||||
await self.adapter._on_room_message(ev)
|
||||
|
||||
assert self.adapter._late_grace_drops == 2
|
||||
assert self.adapter._clock_skew_warned is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_varied_backfill_skews_do_not_warn(self, caplog):
|
||||
"""Backfill from a freshly-invited room delivers events of varied age.
|
||||
|
||||
A genuine clock-skew bug produces drops with a *constant* offset
|
||||
(every event is ~X seconds older than wall clock). Joining an old
|
||||
room post-startup delivers events spanning hours-to-days; those
|
||||
skews vary wildly and must NOT trigger the NTP warning.
|
||||
"""
|
||||
import logging
|
||||
import time as _t
|
||||
|
||||
now = _t.time()
|
||||
self.adapter._startup_ts = now - 120
|
||||
# Each event has a different age, ranging from 1h to 30d ago.
|
||||
ages_in_hours = [1, 24, 168, 720, 4] # 1h, 1d, 1w, 30d, 4h
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
|
||||
for i, hrs in enumerate(ages_in_hours):
|
||||
ts_ms = int((self.adapter._startup_ts - hrs * 3600) * 1000)
|
||||
ev = self._mk_event(
|
||||
sender=f"@alice{i}:example.org", ts_ms=ts_ms
|
||||
)
|
||||
await self.adapter._on_room_message(ev)
|
||||
|
||||
# The varied-skew guard should keep the counter from reaching 3.
|
||||
assert self.adapter._late_grace_drops < 3
|
||||
assert self.adapter._clock_skew_warned is False
|
||||
skew_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.name == "gateway.platforms.matrix"
|
||||
and "set-ntp" in r.getMessage()
|
||||
]
|
||||
assert skew_warnings == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_state_reset_allows_warning_to_fire_again(self, caplog):
|
||||
"""After the reset block at top of connect() runs, the warning is rearmed.
|
||||
|
||||
Reconnect lifecycle: the user fixes NTP, restarts the bot, and the
|
||||
new connect() call resets _late_grace_drops / _clock_skew_warned at
|
||||
the top. This test exercises the rearm path by:
|
||||
1. Tripping the warning once (state: warned=True).
|
||||
2. Running the same reset block connect() runs.
|
||||
3. Tripping the warning a second time — the second warning should
|
||||
fire because the state was cleared.
|
||||
"""
|
||||
import logging
|
||||
import time as _t
|
||||
|
||||
now = _t.time()
|
||||
self.adapter._startup_ts = now - 60
|
||||
skewed_ms = int((self.adapter._startup_ts - 7200) * 1000)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
|
||||
for i in range(3):
|
||||
ev = self._mk_event(
|
||||
sender=f"@alice{i}:example.org", ts_ms=skewed_ms,
|
||||
event_id=f"$first-{i}",
|
||||
)
|
||||
await self.adapter._on_room_message(ev)
|
||||
assert self.adapter._clock_skew_warned is True
|
||||
|
||||
# Mirror the reset block in connect() (matrix.py around line 855).
|
||||
self.adapter._startup_ts = _t.time() - 60
|
||||
self.adapter._late_grace_drops = 0
|
||||
self.adapter._late_grace_skew = 0.0
|
||||
self.adapter._clock_skew_warned = False
|
||||
|
||||
# Same skewed-clock scenario should warn AGAIN after reset.
|
||||
skewed_ms2 = int((self.adapter._startup_ts - 7200) * 1000)
|
||||
for i in range(3):
|
||||
ev = self._mk_event(
|
||||
sender=f"@bob{i}:example.org", ts_ms=skewed_ms2,
|
||||
event_id=f"$second-{i}",
|
||||
)
|
||||
await self.adapter._on_room_message(ev)
|
||||
|
||||
skew_warnings = [
|
||||
r for r in caplog.records
|
||||
if r.name == "gateway.platforms.matrix"
|
||||
and "set-ntp" in r.getMessage()
|
||||
]
|
||||
assert len(skew_warnings) == 2, (
|
||||
f"expected 2 warnings (one per connect cycle), got {len(skew_warnings)}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DM auto-thread
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue