mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 10:22:18 +00:00
fix(signal): harden recently-sent echo ring with LRU + TTL
This commit is contained in:
parent
b88d0007c9
commit
332f88f6a6
2 changed files with 263 additions and 8 deletions
|
|
@ -22,6 +22,7 @@ import subprocess
|
|||
import tempfile
|
||||
import time
|
||||
import uuid
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
|
@ -304,9 +305,15 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
self._account_normalized = self.account.strip()
|
||||
|
||||
# Track recently sent message timestamps to prevent echo-back loops
|
||||
# in Note to Self / self-chat mode (mirrors WhatsApp recentlySentIds).
|
||||
self._recent_sent_timestamps: set = set()
|
||||
self._max_recent_timestamps = 50
|
||||
# in Note to Self / self-chat mode and linked-device group sync-sents.
|
||||
# OrderedDict[timestamp_ms -> insertion_monotonic_seconds] gives us
|
||||
# LRU eviction (popitem(last=False) drops oldest) plus a TTL so that
|
||||
# under chatty groups a still-pending echo cannot be evicted just
|
||||
# because >50 outbounds happened. With a 5-minute TTL the cap only
|
||||
# matters for runaway producers, not normal traffic bursts.
|
||||
self._recent_sent_timestamps: "OrderedDict[int, float]" = OrderedDict()
|
||||
self._max_recent_timestamps = 512
|
||||
self._recent_sent_ttl_seconds = 300.0
|
||||
# Keep a separate bounded cache of outbound Signal message timestamps.
|
||||
# Signal quote.id is the timestamp of the quoted message, so this lets
|
||||
# inbound replies identify that the user replied to a message sent by
|
||||
|
|
@ -536,8 +543,7 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
sent_msg_group_id = sent_msg_group_info.get("groupId") if sent_msg_group_info else None
|
||||
if dest == self._account_normalized or sent_msg_group_id:
|
||||
# Check if this is an echo of our own outbound reply
|
||||
if sent_ts and sent_ts in self._recent_sent_timestamps:
|
||||
self._recent_sent_timestamps.discard(sent_ts)
|
||||
if self._consume_sent_timestamp(sent_ts):
|
||||
return
|
||||
# Genuine user Note to Self — promote to dataMessage
|
||||
is_note_to_self = True
|
||||
|
|
@ -1076,10 +1082,29 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
"""Record outbound message timestamp for echo-back filtering."""
|
||||
ts = rpc_result.get("timestamp") if isinstance(rpc_result, dict) else None
|
||||
if ts:
|
||||
self._recent_sent_timestamps.add(ts)
|
||||
self._remember_sent_message_timestamp(ts)
|
||||
if len(self._recent_sent_timestamps) > self._max_recent_timestamps:
|
||||
self._recent_sent_timestamps.pop()
|
||||
now = time.monotonic()
|
||||
# Re-insert to mark as most-recently-used.
|
||||
self._recent_sent_timestamps.pop(ts, None)
|
||||
self._recent_sent_timestamps[ts] = now
|
||||
# Drop entries older than TTL first (cheap O(k) where k=expired).
|
||||
cutoff = now - self._recent_sent_ttl_seconds
|
||||
while self._recent_sent_timestamps:
|
||||
oldest_ts, oldest_at = next(iter(self._recent_sent_timestamps.items()))
|
||||
if oldest_at < cutoff:
|
||||
self._recent_sent_timestamps.popitem(last=False)
|
||||
else:
|
||||
break
|
||||
# Hard cap as a last-resort guard against runaway producers.
|
||||
while len(self._recent_sent_timestamps) > self._max_recent_timestamps:
|
||||
self._recent_sent_timestamps.popitem(last=False)
|
||||
|
||||
def _consume_sent_timestamp(self, ts) -> bool:
|
||||
"""Pop a timestamp if it matches one we sent. Returns True on echo."""
|
||||
if ts and ts in self._recent_sent_timestamps:
|
||||
self._recent_sent_timestamps.pop(ts, None)
|
||||
return True
|
||||
return False
|
||||
|
||||
async def send_typing(self, chat_id: str, metadata=None) -> None:
|
||||
"""Send a typing indicator.
|
||||
|
|
|
|||
|
|
@ -2315,3 +2315,233 @@ class TestSignalContentlessEnvelope:
|
|||
|
||||
assert "event" in captured, "Normal message should NOT be skipped"
|
||||
assert captured["event"].text == "hello world"
|
||||
|
||||
|
||||
class TestSignalSyncMessageHandling:
|
||||
"""signal-cli running as a linked secondary device receives the user's
|
||||
own messages as ``syncMessage.sentMessage`` envelopes. Two cases must
|
||||
be handled:
|
||||
|
||||
1. Note to Self (destination == self): promote to dataMessage so the
|
||||
user can talk to the agent in their own self-chat.
|
||||
2. Group sync-sent (destination is None, groupInfo set): promote so
|
||||
single-user / personal groups work.
|
||||
|
||||
In both cases, the bot's own outbound replies bounce back as
|
||||
sync-sents and must be suppressed via the recently-sent timestamp ring.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_note_to_self_promoted_to_inbound(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch, account="+155****4567")
|
||||
captured = {}
|
||||
|
||||
async def fake_handle(event):
|
||||
captured["event"] = event
|
||||
|
||||
adapter.handle_message = fake_handle
|
||||
|
||||
await adapter._handle_envelope({
|
||||
"envelope": {
|
||||
"sourceNumber": "+155****4567", # self
|
||||
"sourceUuid": "uuid-self",
|
||||
"timestamp": 2000000000,
|
||||
"syncMessage": {
|
||||
"sentMessage": {
|
||||
"destinationNumber": "+155****4567",
|
||||
"destination": "+155****4567",
|
||||
"timestamp": 2000000000,
|
||||
"message": "note to self: buy milk",
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
assert "event" in captured, "Note to Self must reach handle_message"
|
||||
assert captured["event"].text == "note to self: buy milk"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_note_to_self_echo_of_own_reply_is_suppressed(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch, account="+155****4567")
|
||||
# Simulate that the bot just sent a reply with timestamp 3000000000
|
||||
adapter._track_sent_timestamp({"timestamp": 3000000000})
|
||||
called = []
|
||||
|
||||
async def fake_handle(event):
|
||||
called.append(event)
|
||||
|
||||
adapter.handle_message = fake_handle
|
||||
|
||||
await adapter._handle_envelope({
|
||||
"envelope": {
|
||||
"sourceNumber": "+155****4567",
|
||||
"sourceUuid": "uuid-self",
|
||||
"timestamp": 3000000000,
|
||||
"syncMessage": {
|
||||
"sentMessage": {
|
||||
"destinationNumber": "+155****4567",
|
||||
"destination": "+155****4567",
|
||||
"timestamp": 3000000000,
|
||||
"message": "this is the bot's own reply echo",
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
assert called == [], "Echo of bot's own reply must be suppressed"
|
||||
# Consumed: timestamp must be removed from the ring
|
||||
assert 3000000000 not in adapter._recent_sent_timestamps
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_sync_sent_promoted_to_inbound(self, monkeypatch):
|
||||
"""User sends a message in a group from their primary phone; the
|
||||
linked device receives it as a sync-sent with destination=None and
|
||||
a groupInfo block. It must be treated as inbound so the agent can
|
||||
respond in groups when the user is the only human participant."""
|
||||
adapter = _make_signal_adapter(
|
||||
monkeypatch, account="+155****4567", group_allowed="abc123=="
|
||||
)
|
||||
captured = {}
|
||||
|
||||
async def fake_handle(event):
|
||||
captured["event"] = event
|
||||
|
||||
adapter.handle_message = fake_handle
|
||||
|
||||
await adapter._handle_envelope({
|
||||
"envelope": {
|
||||
"sourceNumber": "+155****4567",
|
||||
"sourceUuid": "uuid-self",
|
||||
"timestamp": 4000000000,
|
||||
"syncMessage": {
|
||||
"sentMessage": {
|
||||
"destinationNumber": None,
|
||||
"destination": None,
|
||||
"timestamp": 4000000000,
|
||||
"message": "ping the group",
|
||||
"groupInfo": {
|
||||
"groupId": "abc123==",
|
||||
"type": "DELIVER",
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
assert "event" in captured, "Group sync-sent must reach handle_message"
|
||||
assert captured["event"].text == "ping the group"
|
||||
assert captured["event"].source.chat_id == "group:abc123=="
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_group_sync_sent_echo_of_own_reply_is_suppressed(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch, account="+155****4567")
|
||||
adapter._track_sent_timestamp({"timestamp": 5000000000})
|
||||
called = []
|
||||
|
||||
async def fake_handle(event):
|
||||
called.append(event)
|
||||
|
||||
adapter.handle_message = fake_handle
|
||||
|
||||
await adapter._handle_envelope({
|
||||
"envelope": {
|
||||
"sourceNumber": "+155****4567",
|
||||
"sourceUuid": "uuid-self",
|
||||
"timestamp": 5000000000,
|
||||
"syncMessage": {
|
||||
"sentMessage": {
|
||||
"destinationNumber": None,
|
||||
"destination": None,
|
||||
"timestamp": 5000000000,
|
||||
"message": "bot's own group reply",
|
||||
"groupInfo": {"groupId": "abc123==", "type": "DELIVER"},
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
assert called == [], "Group echo of bot's own reply must be suppressed"
|
||||
assert 5000000000 not in adapter._recent_sent_timestamps
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unrelated_sync_message_still_dropped(self, monkeypatch):
|
||||
"""Read receipts / typing sync events have no sentMessage at all,
|
||||
or a sentMessage with non-self destination — must keep being filtered."""
|
||||
adapter = _make_signal_adapter(monkeypatch, account="+155****4567")
|
||||
called = []
|
||||
|
||||
async def fake_handle(event):
|
||||
called.append(event)
|
||||
|
||||
adapter.handle_message = fake_handle
|
||||
|
||||
# No sentMessage at all
|
||||
await adapter._handle_envelope({
|
||||
"envelope": {
|
||||
"sourceNumber": "+155****4567",
|
||||
"timestamp": 6000000000,
|
||||
"syncMessage": {"readMessages": [{"sender": "+155****9999"}]},
|
||||
}
|
||||
})
|
||||
# sentMessage to a different contact (not self, not a group)
|
||||
await adapter._handle_envelope({
|
||||
"envelope": {
|
||||
"sourceNumber": "+155****4567",
|
||||
"timestamp": 6000000001,
|
||||
"syncMessage": {
|
||||
"sentMessage": {
|
||||
"destinationNumber": "+155****9999",
|
||||
"destination": "+155****9999",
|
||||
"timestamp": 6000000001,
|
||||
"message": "outbound DM to someone else",
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
assert called == [], "Non-promotable sync messages must be filtered"
|
||||
|
||||
|
||||
class TestRecentSentTimestampRing:
|
||||
"""Verify the LRU+TTL behaviour of the echo-suppression ring."""
|
||||
|
||||
def test_track_inserts_and_marks_most_recent(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
adapter._track_sent_timestamp({"timestamp": 1})
|
||||
adapter._track_sent_timestamp({"timestamp": 2})
|
||||
adapter._track_sent_timestamp({"timestamp": 1}) # touch
|
||||
# After touching 1, insertion order should be [2, 1]
|
||||
assert list(adapter._recent_sent_timestamps.keys()) == [2, 1]
|
||||
|
||||
def test_consume_returns_true_and_removes(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
adapter._track_sent_timestamp({"timestamp": 42})
|
||||
assert adapter._consume_sent_timestamp(42) is True
|
||||
assert 42 not in adapter._recent_sent_timestamps
|
||||
assert adapter._consume_sent_timestamp(42) is False
|
||||
assert adapter._consume_sent_timestamp(None) is False
|
||||
|
||||
def test_hard_cap_evicts_oldest(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
adapter._max_recent_timestamps = 3
|
||||
for ts in (1, 2, 3, 4):
|
||||
adapter._track_sent_timestamp({"timestamp": ts})
|
||||
# 1 should have been evicted (oldest); 2/3/4 retained in order
|
||||
assert list(adapter._recent_sent_timestamps.keys()) == [2, 3, 4]
|
||||
|
||||
def test_ttl_evicts_stale_entries(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
adapter._recent_sent_ttl_seconds = 100.0
|
||||
|
||||
# Drive time.monotonic deterministically.
|
||||
import gateway.platforms.signal as sig_mod
|
||||
fake_now = [1000.0]
|
||||
monkeypatch.setattr(sig_mod.time, "monotonic", lambda: fake_now[0])
|
||||
|
||||
adapter._track_sent_timestamp({"timestamp": 1})
|
||||
fake_now[0] = 1050.0
|
||||
adapter._track_sent_timestamp({"timestamp": 2})
|
||||
fake_now[0] = 1200.0 # 200s elapsed since ts=1 (>TTL), 150s since ts=2 (>TTL)
|
||||
adapter._track_sent_timestamp({"timestamp": 3})
|
||||
# Both 1 and 2 should be evicted on TTL, only 3 remains
|
||||
assert list(adapter._recent_sent_timestamps.keys()) == [3]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue