mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
Mirror messages are persisted via _append_to_sqlite. JSONL writer was a redundant dual-write. Updated test assertions from JSONL file checks to SQLite mock verification.
271 lines
10 KiB
Python
271 lines
10 KiB
Python
"""Tests for gateway/mirror.py — session mirroring."""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import gateway.mirror as mirror_mod
|
|
from gateway.mirror import (
|
|
mirror_to_session,
|
|
_find_session_id,
|
|
)
|
|
|
|
|
|
def _setup_sessions(tmp_path, sessions_data):
|
|
"""Helper to write a fake sessions.json and patch module-level paths."""
|
|
sessions_dir = tmp_path / "sessions"
|
|
sessions_dir.mkdir(parents=True, exist_ok=True)
|
|
index_file = sessions_dir / "sessions.json"
|
|
index_file.write_text(json.dumps(sessions_data))
|
|
return sessions_dir, index_file
|
|
|
|
|
|
class TestFindSessionId:
|
|
def test_finds_matching_session(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"agent:main:telegram:dm": {
|
|
"session_id": "sess_abc",
|
|
"origin": {"platform": "telegram", "chat_id": "12345"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
}
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "12345")
|
|
|
|
assert result == "sess_abc"
|
|
|
|
def test_returns_most_recent(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"old": {
|
|
"session_id": "sess_old",
|
|
"origin": {"platform": "telegram", "chat_id": "12345"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
},
|
|
"new": {
|
|
"session_id": "sess_new",
|
|
"origin": {"platform": "telegram", "chat_id": "12345"},
|
|
"updated_at": "2026-02-01T00:00:00",
|
|
},
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "12345")
|
|
|
|
assert result == "sess_new"
|
|
|
|
def test_thread_id_disambiguates_same_chat(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"topic_a": {
|
|
"session_id": "sess_topic_a",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "thread_id": "10"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
},
|
|
"topic_b": {
|
|
"session_id": "sess_topic_b",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "thread_id": "11"},
|
|
"updated_at": "2026-02-01T00:00:00",
|
|
},
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "-1001", thread_id="10")
|
|
|
|
assert result == "sess_topic_a"
|
|
|
|
def test_user_id_disambiguates_same_group_chat(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"alice": {
|
|
"session_id": "sess_alice",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "user_id": "alice"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
},
|
|
"bob": {
|
|
"session_id": "sess_bob",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "user_id": "bob"},
|
|
"updated_at": "2026-02-01T00:00:00",
|
|
},
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "-1001", user_id="alice")
|
|
|
|
assert result == "sess_alice"
|
|
|
|
def test_ambiguous_same_group_chat_without_user_id_returns_none(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"alice": {
|
|
"session_id": "sess_alice",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "user_id": "alice"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
},
|
|
"bob": {
|
|
"session_id": "sess_bob",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "user_id": "bob"},
|
|
"updated_at": "2026-02-01T00:00:00",
|
|
},
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "-1001")
|
|
|
|
assert result is None
|
|
|
|
def test_no_match_returns_none(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"sess": {
|
|
"session_id": "sess_1",
|
|
"origin": {"platform": "discord", "chat_id": "999"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
}
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "12345")
|
|
|
|
assert result is None
|
|
|
|
def test_missing_sessions_file(self, tmp_path):
|
|
with patch.object(mirror_mod, "_SESSIONS_INDEX", tmp_path / "nope.json"):
|
|
result = _find_session_id("telegram", "12345")
|
|
|
|
assert result is None
|
|
|
|
def test_platform_case_insensitive(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"s1": {
|
|
"session_id": "sess_1",
|
|
"origin": {"platform": "Telegram", "chat_id": "123"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
}
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = _find_session_id("telegram", "123")
|
|
|
|
assert result == "sess_1"
|
|
|
|
|
|
|
|
class TestMirrorToSession:
|
|
def test_successful_mirror(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"s1": {
|
|
"session_id": "sess_abc",
|
|
"origin": {"platform": "telegram", "chat_id": "12345"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
}
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file), \
|
|
patch("gateway.mirror._append_to_sqlite") as mock_sqlite:
|
|
result = mirror_to_session("telegram", "12345", "Hello!", source_label="cli")
|
|
|
|
assert result is True
|
|
|
|
# Check SQLite writer was called with the mirror message
|
|
mock_sqlite.assert_called_once()
|
|
call_args = mock_sqlite.call_args
|
|
assert call_args[0][0] == "sess_abc"
|
|
msg = call_args[0][1]
|
|
assert msg["content"] == "Hello!"
|
|
assert msg["role"] == "assistant"
|
|
assert msg["mirror"] is True
|
|
assert msg["mirror_source"] == "cli"
|
|
|
|
def test_successful_mirror_uses_thread_id(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"topic_a": {
|
|
"session_id": "sess_topic_a",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "thread_id": "10"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
},
|
|
"topic_b": {
|
|
"session_id": "sess_topic_b",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "thread_id": "11"},
|
|
"updated_at": "2026-02-01T00:00:00",
|
|
},
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file), \
|
|
patch("gateway.mirror._append_to_sqlite") as mock_sqlite:
|
|
result = mirror_to_session("telegram", "-1001", "Hello topic!", source_label="cron", thread_id="10")
|
|
|
|
assert result is True
|
|
mock_sqlite.assert_called_once()
|
|
assert mock_sqlite.call_args[0][0] == "sess_topic_a"
|
|
|
|
def test_successful_mirror_uses_user_id_for_group_session(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
|
"alice": {
|
|
"session_id": "sess_alice",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "user_id": "alice"},
|
|
"updated_at": "2026-01-01T00:00:00",
|
|
},
|
|
"bob": {
|
|
"session_id": "sess_bob",
|
|
"origin": {"platform": "telegram", "chat_id": "-1001", "user_id": "bob"},
|
|
"updated_at": "2026-02-01T00:00:00",
|
|
},
|
|
})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file), \
|
|
patch("gateway.mirror._append_to_sqlite") as mock_sqlite:
|
|
result = mirror_to_session(
|
|
"telegram",
|
|
"-1001",
|
|
"Hello group!",
|
|
source_label="cli",
|
|
user_id="alice",
|
|
)
|
|
|
|
assert result is True
|
|
mock_sqlite.assert_called_once()
|
|
assert mock_sqlite.call_args[0][0] == "sess_alice"
|
|
|
|
def test_no_matching_session(self, tmp_path):
|
|
sessions_dir, index_file = _setup_sessions(tmp_path, {})
|
|
|
|
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
|
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file):
|
|
result = mirror_to_session("telegram", "99999", "Hello!")
|
|
|
|
assert result is False
|
|
|
|
def test_error_returns_false(self, tmp_path):
|
|
with patch("gateway.mirror._find_session_id", side_effect=Exception("boom")):
|
|
result = mirror_to_session("telegram", "123", "msg")
|
|
|
|
assert result is False
|
|
|
|
|
|
class TestAppendToSqlite:
|
|
def test_connection_is_closed_after_use(self, tmp_path):
|
|
"""Verify _append_to_sqlite closes the SessionDB connection."""
|
|
from gateway.mirror import _append_to_sqlite
|
|
mock_db = MagicMock()
|
|
|
|
with patch("hermes_state.SessionDB", return_value=mock_db):
|
|
_append_to_sqlite("sess_1", {"role": "assistant", "content": "hello"})
|
|
|
|
mock_db.append_message.assert_called_once()
|
|
mock_db.close.assert_called_once()
|
|
|
|
def test_connection_closed_even_on_error(self, tmp_path):
|
|
"""Verify connection is closed even when append_message raises."""
|
|
from gateway.mirror import _append_to_sqlite
|
|
mock_db = MagicMock()
|
|
mock_db.append_message.side_effect = Exception("db error")
|
|
|
|
with patch("hermes_state.SessionDB", return_value=mock_db):
|
|
_append_to_sqlite("sess_1", {"role": "assistant", "content": "hello"})
|
|
|
|
mock_db.close.assert_called_once()
|