From 2edbf155608ae7ea70b3d8fc90ac01b94d311fbc Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Wed, 15 Apr 2026 13:35:40 -0700 Subject: [PATCH] fix: enforce TTL in MessageDeduplicator + use yaml for gateway --config (#10306, #10216) (#10509) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two gateway fixes: 1. MessageDeduplicator.is_duplicate() now checks TTL at query time (#10306) Previously, is_duplicate() returned True for any previously seen ID without checking its age — expired entries were only purged when cache size exceeded max_size. On normal workloads that never overflow, message IDs stayed deduplicated forever instead of expiring after the TTL. Fix: check `now - timestamp < ttl` before returning True. Expired entries are removed and treated as new messages. 2. Gateway --config flag now uses yaml.safe_load() (#10216) The --config CLI flag in gateway/run.py main() used json.load() to parse config files. YAML is the only documented config format and every other config loader uses yaml.safe_load(). A YAML config file passed via --config would crash with json.JSONDecodeError. Closes #10306 Closes #10216 --- gateway/platforms/helpers.py | 5 +- gateway/run.py | 4 +- tests/gateway/test_message_deduplicator.py | 89 ++++++++++++++++++++++ 3 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 tests/gateway/test_message_deduplicator.py diff --git a/gateway/platforms/helpers.py b/gateway/platforms/helpers.py index c834dd89ca..18d97fcb7a 100644 --- a/gateway/platforms/helpers.py +++ b/gateway/platforms/helpers.py @@ -49,7 +49,10 @@ class MessageDeduplicator: return False now = time.time() if msg_id in self._seen: - return True + if now - self._seen[msg_id] < self._ttl: + return True + # Entry has expired — remove it and treat as new + del self._seen[msg_id] self._seen[msg_id] = now if len(self._seen) > self._max_size: cutoff = now - self._ttl diff --git a/gateway/run.py b/gateway/run.py index 327f8ae32a..ea45dcdd3d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -9725,9 +9725,9 @@ def main(): config = None if args.config: - import json + import yaml with open(args.config, encoding="utf-8") as f: - data = json.load(f) + data = yaml.safe_load(f) config = GatewayConfig.from_dict(data) # Run the gateway - exit with code 1 if no platforms connected, diff --git a/tests/gateway/test_message_deduplicator.py b/tests/gateway/test_message_deduplicator.py new file mode 100644 index 0000000000..59fe7e3949 --- /dev/null +++ b/tests/gateway/test_message_deduplicator.py @@ -0,0 +1,89 @@ +"""Tests for MessageDeduplicator TTL enforcement (#10306). + +Previously, is_duplicate() returned True for any previously seen ID without +checking its age — expired entries were only purged when cache size exceeded +max_size. Normal workloads never overflowed, so messages stayed "duplicate" +forever. + +The fix checks TTL at query time: if the entry's timestamp plus TTL is in +the past, the entry is treated as expired and the message is allowed through. +""" + +import time +from unittest.mock import patch + +from gateway.platforms.helpers import MessageDeduplicator + + +class TestMessageDeduplicatorTTL: + """TTL-based expiration must work regardless of cache size.""" + + def test_duplicate_within_ttl(self): + """Same message within TTL window is duplicate.""" + dedup = MessageDeduplicator(ttl_seconds=60) + assert dedup.is_duplicate("msg-1") is False + assert dedup.is_duplicate("msg-1") is True + + def test_not_duplicate_after_ttl_expires(self): + """Same message AFTER TTL expires should NOT be duplicate.""" + dedup = MessageDeduplicator(ttl_seconds=5) + assert dedup.is_duplicate("msg-1") is False + + # Fast-forward time past TTL + dedup._seen["msg-1"] = time.time() - 10 # 10s ago, TTL is 5s + assert dedup.is_duplicate("msg-1") is False, \ + "Expired entry should not be treated as duplicate" + + def test_expired_entry_gets_refreshed(self): + """After an expired entry is allowed through, it should be re-tracked.""" + dedup = MessageDeduplicator(ttl_seconds=5) + assert dedup.is_duplicate("msg-1") is False + + # Expire the entry + dedup._seen["msg-1"] = time.time() - 10 + + # Should be allowed through (expired) + assert dedup.is_duplicate("msg-1") is False + # Now should be duplicate again (freshly tracked) + assert dedup.is_duplicate("msg-1") is True + + def test_different_messages_not_confused(self): + """Different message IDs are independent.""" + dedup = MessageDeduplicator(ttl_seconds=60) + assert dedup.is_duplicate("msg-1") is False + assert dedup.is_duplicate("msg-2") is False + assert dedup.is_duplicate("msg-1") is True + assert dedup.is_duplicate("msg-2") is True + + def test_empty_id_never_duplicate(self): + """Empty/None message IDs are never treated as duplicate.""" + dedup = MessageDeduplicator(ttl_seconds=60) + assert dedup.is_duplicate("") is False + assert dedup.is_duplicate("") is False + + def test_max_size_eviction_prunes_expired(self): + """Cache pruning on overflow removes expired entries.""" + dedup = MessageDeduplicator(max_size=5, ttl_seconds=60) + # Add 6 entries, with the first 3 expired + now = time.time() + for i in range(3): + dedup._seen[f"old-{i}"] = now - 120 # expired (2 min ago, TTL 60s) + for i in range(3): + dedup.is_duplicate(f"new-{i}") + # Now we have 6 entries. Next insert triggers pruning. + dedup.is_duplicate("trigger") + # The 3 expired entries should be gone, leaving 4 fresh ones + assert len(dedup._seen) == 4 + assert "old-0" not in dedup._seen + assert "new-0" in dedup._seen + + def test_ttl_zero_means_no_dedup(self): + """With TTL=0, all entries expire immediately.""" + dedup = MessageDeduplicator(ttl_seconds=0) + assert dedup.is_duplicate("msg-1") is False + # Entry was just added at time.time(), and TTL is 0, + # so now - seen_time >= 0 = ttl, meaning it's expired + # But time.time() might be the exact same float, so + # the check is `now - ts < ttl` which is `0 < 0` = False + # This means TTL=0 effectively disables dedup + assert dedup.is_duplicate("msg-1") is False