hermes-agent/tests/gateway/test_dm_topics.py
ethernet 48be2e0e4d
test: use subprocesses for each test file (#29016)
* ci(tests): install ripgrep from prebuilt tarball instead of apt

apt-get update + install of ripgrep takes ~4 min on the GHA Ubuntu
runners (the apt-get update against archive.ubuntu.com is the slow
part; ripgrep itself is small). Switching to the upstream musl
binary tarball cuts the step to a few seconds.

- Pinned to ripgrep 15.1.0 with sha256 verification (same hash as
  published in the releases sha256 sidecar file).
- Drops the `rg` binary into /usr/local/bin so it is on PATH for
  every subsequent step without GITHUB_PATH manipulation.
- Applied to both the test and e2e jobs in tests.yml.

* fix(cli): compile syntax check to tempdir, not source __pycache__

`_validate_critical_files_syntax` runs `py_compile.compile()` on each
critical bootstrap file after a successful `git pull`. The default
`py_compile` writes the resulting `.pyc` next to the source under
`__pycache__/`, which causes two real problems:

1. Parallel test workers walking the same source tree (e.g. running
   the suite under per-file process isolation) can race against each
   other on the `__pycache__` write — manifests as flaky 'directory
   not empty' errors during teardown.
2. In production, the post-pull syntax check leaves a `.pyc` behind
   that the next interpreter run might pick up — fine when the
   interpreter version matches, sketchy if it doesn't.

Fix: write the compiled output to a `tempfile.TemporaryDirectory()`
that's discarded on function exit. We only care about the compile-or-not
signal, not the artifact.

* test(runner): per-file process isolation, drop manual state reset + xdist

Replace fragile manual _reset_module_state test fixtures with robust
per-file subprocess isolation. Each test file runs in a fresh
`python -m pytest <file>` subprocess via ThreadPoolExecutor. No xdist,
no custom pytest plugin, no shared worker state.

Key changes:
  * scripts/run_tests_parallel.py — new runner: discovers test files,
    runs N in parallel via ThreadPoolExecutor, captures stdout per file,
    treats exit code 5 (no tests collected) as pass, kills all children
    on exit. Change from cpu_count to cpu_count*2. The runner is
    I/O-bound (waiting on subprocess.communicate() from pytest children)
    The parent process does almost no CPU work, so 2x oversubscription
    keeps more pipes full. When a file fails, immediately show the last
    30 lines of pytest output (stack traces + FAILED summary) plus a
    ready-to-copy repro command:
      python -m pytest tests/agent/test_auxiliary_client.py
  * scripts/run_tests.sh — delegates to run_tests_parallel.py
  * .github/workflows/tests.yml — test step: python
scripts/run_tests_parallel.py
  * pyproject.toml — drop pytest-xdist, pytest-split; simplify addopts
  * tests/conftest.py — remove ~200 lines of manual state-reset fixtures
  * AGENTS.md — update Testing section for per-file design

* test(runner): speed gateway test antipattern scan up

* fix(test): web search provider plugin test missing xai

* fix(tests): make 14 test files pass under per-file subprocess isolation

Tests that relied on cross-file state pollution from xdist workers
fail when run in isolation (per-file subprocess model). Root causes
and fixes:

Tool registry not populated:
  - test_video_generation_tool_surface_matrix: add discover_builtin_tools()
  - test_web_providers_brave_free/ddgs/searxng/general: autouse fixtures
    registering all 8 bundled web providers, reset after each test
  - test_website_policy: same provider registration pattern
  - test_web_tools_tavily: same pattern across 3 dispatch test classes
  - Also add is_safe_url/check_website_access mocks where SSRF check
    blocks example.com (DNS resolution fails in isolated envs)

Stale check_fn cache:
  - test_kanban_tools: invalidate_check_fn_cache() + _clear_tool_defs_cache()
    in both kanban guidance tests (prior test cached False for kanban_show)
  - test_discord_tool: cache invalidation in setup/teardown
  - test_homeassistant_tool: invalidate_check_fn_cache() before registry queries

Module-level state pollution:
  - test_auxiliary_client: autouse fixture clearing _aux_unhealthy_until cache
  - test_skill_commands: set_session_vars() instead of patch.dict(os.environ)
    (ContextVar takes precedence over os.environ)
  - test_dm_topics: overwrite sys.modules + separate telegram.constants mock
    + force-reimport of gateway.platforms.telegram
  - test_terminal_tool_requirements: removed duplicate class declaration,
    autouse _clear_caches fixture

* change(tests): run_tests.sh explicitly includes env vars

instead of manually dropping some vars, now we just only include some

* fix(tests): 5 more isolation/NixOS fixes

- test_approval_plugin_hooks: isolate HERMES_HOME so real user's
  command_allowlist doesn't short-circuit the approval path
- test_google_chat: skipif when Platform.GOOGLE_CHAT not in enum
  (feature not merged on this branch)
- test_write_deny: test systemd prefix against tmp_path instead of
  /etc/systemd which resolves to /nix/store on NixOS
- test_pty_bridge: use shutil.which('cat') instead of /bin/cat
  (doesn't exist on NixOS)
- profiles.py: rmtree onexc handler chmod's parent dirs too, fixing
  profile deletion when copytree preserved read-only modes from
  nix store

* fix(tests): clear unhealthy cache in autouse fixture for auxiliary_client

* fix(tests): skip send_message when telegram not installed; handle missing worker_id in browser_supervisor

* fix: py3.11 rmtree onexc compat + belt-and-suspenders unhealthy cache clear for expired codex test

* fix: address PR #29016 review feedback

- Remove tracked .pytest-cache/ artifact and add to .gitignore
- Fix stale 'xdist worker' comment in conftest.py
- Deduplicate web provider registration into tests/tools/conftest.py
  shared helper (register_all_web_providers), replacing 8 copy-pasted
  blocks across 6 test files
- Update PR description: remove stale recovered-test-files claim,
  fix worker count to match code (cpu_count*2)

* fix: eliminate race in stale-cache achievements test

The background scan thread could complete and overwrite _SNAPSHOT_CACHE
before evaluate_all() returned the stale data — only 10 fake sessions
made the scan finish instantly. Added scan_delay param to _FakeSessionDB
and set it to 2s in the stale-cache test so the background thread can't
win the race.
2026-05-21 16:40:04 +05:30

818 lines
25 KiB
Python

"""Tests for Telegram DM Private Chat Topics (Bot API 9.4).
Covers:
- _setup_dm_topics: loading persisted thread_ids from config
- _setup_dm_topics: creating new topics via API when no thread_id
- _persist_dm_topic_thread_id: saving thread_id back to config.yaml
- _get_dm_topic_info: looking up topic config by thread_id
- _cache_dm_topic_from_message: caching thread_ids from incoming messages
- _build_message_event: DM topic resolution in message events
"""
import asyncio
import os
import sys
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch, mock_open
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
telegram_mod = MagicMock()
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
# Register telegram.constants as a separate module mock so that
# ``from telegram.constants import ChatType`` resolves to our mock
# with string-valued members (not auto-generated MagicMocks).
constants_mod = MagicMock()
constants_mod.ParseMode.MARKDOWN_V2 = "MarkdownV2"
constants_mod.ChatType.GROUP = "group"
constants_mod.ChatType.SUPERGROUP = "supergroup"
constants_mod.ChatType.CHANNEL = "channel"
constants_mod.ChatType.PRIVATE = "private"
sys.modules["telegram"] = telegram_mod
sys.modules["telegram.ext"] = telegram_mod.ext
sys.modules["telegram.constants"] = constants_mod
sys.modules["telegram.request"] = telegram_mod.request
# Force reimport so the adapter picks up the mock ChatType.
sys.modules.pop("gateway.platforms.telegram", None)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
def _make_adapter(dm_topics_config=None, group_topics_config=None):
"""Create a TelegramAdapter with optional DM/group topics config."""
extra = {}
if dm_topics_config is not None:
extra["dm_topics"] = dm_topics_config
if group_topics_config is not None:
extra["group_topics"] = group_topics_config
config = PlatformConfig(enabled=True, token="***", extra=extra)
adapter = TelegramAdapter(config)
return adapter
# ── _setup_dm_topics: load persisted thread_ids ──
@pytest.mark.asyncio
async def test_setup_dm_topics_loads_persisted_thread_ids():
"""Topics with thread_id in config should be loaded into cache, not created."""
adapter = _make_adapter([
{
"chat_id": 111,
"topics": [
{"name": "General", "thread_id": 100},
{"name": "Work", "thread_id": 200},
],
}
])
adapter._bot = AsyncMock()
await adapter._setup_dm_topics()
# Both should be in cache
assert adapter._dm_topics["111:General"] == 100
assert adapter._dm_topics["111:Work"] == 200
# create_forum_topic should NOT have been called
adapter._bot.create_forum_topic.assert_not_called()
@pytest.mark.asyncio
async def test_setup_dm_topics_creates_when_no_thread_id():
"""Topics without thread_id should be created via API."""
adapter = _make_adapter([
{
"chat_id": 222,
"topics": [
{"name": "NewTopic", "icon_color": 7322096},
],
}
])
adapter._bot = AsyncMock()
mock_topic = SimpleNamespace(message_thread_id=999)
adapter._bot.create_forum_topic.return_value = mock_topic
# Mock the persist method so it doesn't touch the filesystem
adapter._persist_dm_topic_thread_id = MagicMock()
await adapter._setup_dm_topics()
# Should have been created
adapter._bot.create_forum_topic.assert_called_once_with(
chat_id=222, name="NewTopic", icon_color=7322096,
)
# Should be in cache
assert adapter._dm_topics["222:NewTopic"] == 999
# Should persist
adapter._persist_dm_topic_thread_id.assert_called_once_with(222, "NewTopic", 999)
@pytest.mark.asyncio
async def test_setup_dm_topics_mixed_persisted_and_new():
"""Mix of persisted and new topics should work correctly."""
adapter = _make_adapter([
{
"chat_id": 333,
"topics": [
{"name": "Existing", "thread_id": 50},
{"name": "New", "icon_color": 123},
],
}
])
adapter._bot = AsyncMock()
mock_topic = SimpleNamespace(message_thread_id=777)
adapter._bot.create_forum_topic.return_value = mock_topic
adapter._persist_dm_topic_thread_id = MagicMock()
await adapter._setup_dm_topics()
# Existing loaded from config
assert adapter._dm_topics["333:Existing"] == 50
# New created via API
assert adapter._dm_topics["333:New"] == 777
# Only one API call (for "New")
adapter._bot.create_forum_topic.assert_called_once()
@pytest.mark.asyncio
async def test_setup_dm_topics_skips_empty_config():
"""Empty dm_topics config should be a no-op."""
adapter = _make_adapter([])
adapter._bot = AsyncMock()
await adapter._setup_dm_topics()
adapter._bot.create_forum_topic.assert_not_called()
assert adapter._dm_topics == {}
@pytest.mark.asyncio
async def test_setup_dm_topics_no_config():
"""No dm_topics in config at all should be a no-op."""
adapter = _make_adapter()
adapter._bot = AsyncMock()
await adapter._setup_dm_topics()
adapter._bot.create_forum_topic.assert_not_called()
# ── _create_dm_topic: error handling ──
@pytest.mark.asyncio
async def test_create_dm_topic_handles_duplicate_error():
"""Duplicate topic error should return None gracefully."""
adapter = _make_adapter()
adapter._bot = AsyncMock()
adapter._bot.create_forum_topic.side_effect = Exception("topic_name_duplicate")
result = await adapter._create_dm_topic(chat_id=111, name="General")
assert result is None
@pytest.mark.asyncio
async def test_create_dm_topic_handles_generic_error():
"""Generic error should return None with warning."""
adapter = _make_adapter()
adapter._bot = AsyncMock()
adapter._bot.create_forum_topic.side_effect = Exception("some random error")
result = await adapter._create_dm_topic(chat_id=111, name="General")
assert result is None
@pytest.mark.asyncio
async def test_create_dm_topic_returns_none_without_bot():
"""No bot instance should return None."""
adapter = _make_adapter()
adapter._bot = None
result = await adapter._create_dm_topic(chat_id=111, name="General")
assert result is None
# ── _persist_dm_topic_thread_id ──
def test_persist_dm_topic_thread_id_writes_config(tmp_path):
"""Should write thread_id into the correct topic in config.yaml."""
import yaml
config_data = {
"platforms": {
"telegram": {
"extra": {
"dm_topics": [
{
"chat_id": 111,
"topics": [
{"name": "General", "icon_color": 123},
{"name": "Work", "icon_color": 456},
],
}
]
}
}
}
}
config_file = tmp_path / ".hermes" / "config.yaml"
config_file.parent.mkdir(parents=True)
with open(config_file, "w") as f:
yaml.dump(config_data, f)
adapter = _make_adapter()
with patch.object(Path, "home", return_value=tmp_path), \
patch.dict(os.environ, {"HERMES_HOME": str(tmp_path / ".hermes")}):
adapter._persist_dm_topic_thread_id(111, "General", 999)
with open(config_file) as f:
result = yaml.safe_load(f)
topics = result["platforms"]["telegram"]["extra"]["dm_topics"][0]["topics"]
assert topics[0]["thread_id"] == 999
assert "thread_id" not in topics[1] # "Work" should be untouched
def test_persist_dm_topic_thread_id_skips_if_already_set(tmp_path):
"""Should not overwrite an existing thread_id."""
import yaml
config_data = {
"platforms": {
"telegram": {
"extra": {
"dm_topics": [
{
"chat_id": 111,
"topics": [
{"name": "General", "icon_color": 123, "thread_id": 500},
],
}
]
}
}
}
}
config_file = tmp_path / ".hermes" / "config.yaml"
config_file.parent.mkdir(parents=True)
with open(config_file, "w") as f:
yaml.dump(config_data, f)
adapter = _make_adapter()
with patch.object(Path, "home", return_value=tmp_path):
adapter._persist_dm_topic_thread_id(111, "General", 999)
with open(config_file) as f:
result = yaml.safe_load(f)
topics = result["platforms"]["telegram"]["extra"]["dm_topics"][0]["topics"]
assert topics[0]["thread_id"] == 500 # unchanged
# ── _get_dm_topic_info ──
def test_persist_dm_topic_thread_id_preserves_config_on_write_failure(tmp_path):
"""Failed writes should leave the original config.yaml intact."""
import yaml
config_data = {
"platforms": {
"telegram": {
"extra": {
"dm_topics": [
{
"chat_id": 111,
"topics": [
{"name": "General", "icon_color": 123},
],
}
]
}
}
}
}
config_file = tmp_path / ".hermes" / "config.yaml"
config_file.parent.mkdir(parents=True)
original_text = yaml.dump(config_data)
config_file.write_text(original_text, encoding="utf-8")
adapter = _make_adapter()
def fail_dump(*args, **kwargs):
raise RuntimeError("boom")
with patch.object(Path, "home", return_value=tmp_path), \
patch.dict(os.environ, {"HERMES_HOME": str(tmp_path / ".hermes")}), \
patch("yaml.dump", side_effect=fail_dump):
adapter._persist_dm_topic_thread_id(111, "General", 999)
assert config_file.read_text(encoding="utf-8") == original_text
result = yaml.safe_load(config_file.read_text(encoding="utf-8"))
topics = result["platforms"]["telegram"]["extra"]["dm_topics"][0]["topics"]
assert "thread_id" not in topics[0]
def test_get_dm_topic_info_finds_cached_topic():
"""Should return topic config when thread_id is in cache."""
adapter = _make_adapter([
{
"chat_id": 111,
"topics": [
{"name": "General", "skill": "my-skill"},
],
}
])
adapter._dm_topics["111:General"] = 100
result = adapter._get_dm_topic_info("111", "100")
assert result is not None
assert result["name"] == "General"
assert result["skill"] == "my-skill"
def test_get_dm_topic_info_returns_none_for_unknown():
"""Should return None for unknown thread_id."""
adapter = _make_adapter([
{
"chat_id": 111,
"topics": [{"name": "General"}],
}
])
# Mock reload to avoid filesystem access
adapter._reload_dm_topics_from_config = lambda: None
result = adapter._get_dm_topic_info("111", "999")
assert result is None
def test_get_dm_topic_info_returns_none_without_config():
"""Should return None if no dm_topics config."""
adapter = _make_adapter()
adapter._reload_dm_topics_from_config = lambda: None
result = adapter._get_dm_topic_info("111", "100")
assert result is None
def test_get_dm_topic_info_returns_none_for_none_thread():
"""Should return None if thread_id is None."""
adapter = _make_adapter([
{"chat_id": 111, "topics": [{"name": "General"}]}
])
result = adapter._get_dm_topic_info("111", None)
assert result is None
def test_get_dm_topic_info_hot_reloads_from_config(tmp_path):
"""Should find a topic added to config after startup (hot-reload)."""
import yaml
# Start with empty topics
adapter = _make_adapter([
{"chat_id": 111, "topics": []}
])
# Write config with a new topic + thread_id
config_data = {
"platforms": {
"telegram": {
"extra": {
"dm_topics": [
{
"chat_id": 111,
"topics": [
{"name": "NewProject", "thread_id": 555},
],
}
]
}
}
}
}
config_file = tmp_path / ".hermes" / "config.yaml"
config_file.parent.mkdir(parents=True)
with open(config_file, "w") as f:
yaml.dump(config_data, f)
with patch.object(Path, "home", return_value=tmp_path), \
patch.dict(os.environ, {"HERMES_HOME": str(tmp_path / ".hermes")}):
result = adapter._get_dm_topic_info("111", "555")
assert result is not None
assert result["name"] == "NewProject"
# Should now be cached
assert adapter._dm_topics["111:NewProject"] == 555
# ── _cache_dm_topic_from_message ──
def test_cache_dm_topic_from_message():
"""Should cache a new topic mapping."""
adapter = _make_adapter()
adapter._cache_dm_topic_from_message("111", "100", "General")
assert adapter._dm_topics["111:General"] == 100
def test_cache_dm_topic_from_message_no_overwrite():
"""Should not overwrite an existing cached topic."""
adapter = _make_adapter()
adapter._dm_topics["111:General"] = 100
adapter._cache_dm_topic_from_message("111", "999", "General")
assert adapter._dm_topics["111:General"] == 100 # unchanged
# ── _build_message_event: auto_skill binding ──
def _make_mock_message(chat_id=111, chat_type="private", text="hello", thread_id=None,
user_id=42, user_name="Test User", forum_topic_created=None,
is_topic_message=None, is_forum=None):
"""Create a mock Telegram Message for _build_message_event tests."""
chat = SimpleNamespace(
id=chat_id,
type=chat_type,
title=None,
)
if is_forum is not None:
chat.is_forum = is_forum
# Add full_name attribute for DM chats
if not hasattr(chat, "full_name"):
chat.full_name = user_name
user = SimpleNamespace(
id=user_id,
full_name=user_name,
)
if is_topic_message is None:
is_topic_message = bool(thread_id) if chat_type == "private" else None
msg = SimpleNamespace(
chat=chat,
from_user=user,
text=text,
message_thread_id=thread_id,
is_topic_message=is_topic_message,
message_id=1001,
reply_to_message=None,
date=None,
forum_topic_created=forum_topic_created,
)
return msg
def test_build_message_event_sets_auto_skill():
"""When topic has a skill binding, auto_skill should be set on the event."""
from gateway.platforms.base import MessageType
adapter = _make_adapter([
{
"chat_id": 111,
"topics": [
{"name": "My Project", "skill": "accessibility-auditor", "thread_id": 100},
],
}
])
adapter._dm_topics["111:My Project"] = 100
msg = _make_mock_message(chat_id=111, thread_id=100, text="check this page")
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill == "accessibility-auditor"
# chat_topic should be the clean topic name, no [skill: ...] suffix
assert event.source.chat_topic == "My Project"
def test_build_message_event_no_auto_skill_without_binding():
"""Topics without skill binding should have auto_skill=None."""
from gateway.platforms.base import MessageType
adapter = _make_adapter([
{
"chat_id": 111,
"topics": [
{"name": "General", "thread_id": 200},
],
}
])
adapter._dm_topics["111:General"] = 200
msg = _make_mock_message(chat_id=111, thread_id=200)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill is None
assert event.source.chat_topic == "General"
def test_build_message_event_no_auto_skill_without_thread():
"""Regular DM messages (no thread_id) should have auto_skill=None."""
from gateway.platforms.base import MessageType
adapter = _make_adapter()
msg = _make_mock_message(chat_id=111, thread_id=None)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill is None
def test_build_message_event_filters_non_topic_dm_thread_id():
"""A DM reply-thread id should not be persisted unless Telegram marks it as a topic message."""
from gateway.platforms.base import MessageType
adapter = _make_adapter()
msg = _make_mock_message(chat_id=111, thread_id=777, is_topic_message=False)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.source.thread_id is None
assert event.source.chat_topic is None
assert event.auto_skill is None
def test_build_message_event_preserves_true_dm_topic_thread_id():
"""True DM topic messages should keep their thread id for routing."""
from gateway.platforms.base import MessageType
adapter = _make_adapter([
{
"chat_id": 111,
"topics": [
{"name": "General", "thread_id": 200},
],
}
])
adapter._dm_topics["111:General"] = 200
msg = _make_mock_message(chat_id=111, thread_id=200, is_topic_message=True)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.source.thread_id == "200"
assert event.source.chat_topic == "General"
# ── _build_message_event: group_topics skill binding ──
# The telegram mock sets sys.modules["telegram.constants"] = telegram_mod (root mock),
# so `from telegram.constants import ChatType` in telegram.py resolves to
# telegram_mod.ChatType — not telegram_mod.constants.ChatType. We must use
# the same ChatType object the production code sees so equality checks work.
from telegram.constants import ChatType as _ChatType # noqa: E402
def test_group_topic_skill_binding():
"""Group topic with skill config should set auto_skill on the event."""
from gateway.platforms.base import MessageType
adapter = _make_adapter(group_topics_config=[
{
"chat_id": -1001234567890,
"topics": [
{"name": "Engineering", "thread_id": 5, "skill": "software-development"},
{"name": "Sales", "thread_id": 12, "skill": "sales-framework"},
],
}
])
msg = _make_mock_message(
chat_id=-1001234567890,
chat_type=_ChatType.SUPERGROUP,
thread_id=5,
text="hello",
is_topic_message=True,
is_forum=True,
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill == "software-development"
assert event.source.chat_topic == "Engineering"
def test_group_topic_skill_binding_second_topic():
"""A different thread_id in the same group should resolve its own skill."""
from gateway.platforms.base import MessageType
adapter = _make_adapter(group_topics_config=[
{
"chat_id": -1001234567890,
"topics": [
{"name": "Engineering", "thread_id": 5, "skill": "software-development"},
{"name": "Sales", "thread_id": 12, "skill": "sales-framework"},
],
}
])
msg = _make_mock_message(
chat_id=-1001234567890,
chat_type=_ChatType.SUPERGROUP,
thread_id=12,
text="deal update",
is_topic_message=True,
is_forum=True,
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill == "sales-framework"
assert event.source.chat_topic == "Sales"
def test_group_topic_no_skill_binding():
"""Group topic without a skill key should have auto_skill=None but set chat_topic."""
from gateway.platforms.base import MessageType
adapter = _make_adapter(group_topics_config=[
{
"chat_id": -1001234567890,
"topics": [
{"name": "General", "thread_id": 1},
],
}
])
msg = _make_mock_message(
chat_id=-1001234567890,
chat_type=_ChatType.SUPERGROUP,
thread_id=1,
text="hey",
is_topic_message=True,
is_forum=True,
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill is None
assert event.source.chat_topic == "General"
def test_group_topic_unmapped_thread_id():
"""Thread ID not in config should fall through — no skill, no topic name."""
from gateway.platforms.base import MessageType
adapter = _make_adapter(group_topics_config=[
{
"chat_id": -1001234567890,
"topics": [
{"name": "Engineering", "thread_id": 5, "skill": "software-development"},
],
}
])
msg = _make_mock_message(
chat_id=-1001234567890,
chat_type=_ChatType.SUPERGROUP,
thread_id=999,
text="random",
is_topic_message=True,
is_forum=True,
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill is None
assert event.source.chat_topic is None
def test_group_topic_unmapped_chat_id():
"""Chat ID not in group_topics config should fall through silently."""
from gateway.platforms.base import MessageType
adapter = _make_adapter(group_topics_config=[
{
"chat_id": -1001234567890,
"topics": [
{"name": "Engineering", "thread_id": 5, "skill": "software-development"},
],
}
])
msg = _make_mock_message(
chat_id=-1009999999999,
chat_type=_ChatType.SUPERGROUP,
thread_id=5,
text="wrong group",
is_topic_message=True,
is_forum=True,
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill is None
assert event.source.chat_topic is None
def test_group_topic_no_config():
"""No group_topics config at all should be fine — no skill, no topic."""
from gateway.platforms.base import MessageType
adapter = _make_adapter() # no group_topics_config
msg = _make_mock_message(
chat_id=-1001234567890, chat_type=_ChatType.GROUP, thread_id=5, text="hi"
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill is None
assert event.source.chat_topic is None
def test_group_topic_chat_id_int_string_coercion():
"""chat_id as string in config should match integer chat.id via str() coercion."""
from gateway.platforms.base import MessageType
adapter = _make_adapter(group_topics_config=[
{
"chat_id": "-1001234567890", # string, not int
"topics": [
{"name": "Dev", "thread_id": "7", "skill": "hermes-agent-dev"},
],
}
])
msg = _make_mock_message(
chat_id=-1001234567890,
chat_type=_ChatType.SUPERGROUP,
thread_id=7,
text="test",
is_topic_message=True,
is_forum=True,
)
event = adapter._build_message_event(msg, MessageType.TEXT)
assert event.auto_skill == "hermes-agent-dev"
assert event.source.chat_topic == "Dev"
# ── _build_message_event: from_user=None fallback in DMs ──
def test_build_message_event_dm_from_user_none_falls_back_to_chat_id():
"""When from_user is None in a DM, user_id should fall back to chat.id."""
from gateway.platforms.base import MessageType
adapter = _make_adapter()
msg = _make_mock_message(chat_id=12345, user_id=42, user_name="Alice")
# Simulate from_user being None (edge case on fresh restart / forwarded msg)
msg.from_user = None
event = adapter._build_message_event(msg, MessageType.TEXT)
# Should fall back to chat.id since chat_type is "dm"
assert event.source.user_id == "12345"
assert event.source.user_name == "Alice" # falls back to chat.full_name
def test_build_message_event_group_from_user_none_stays_none():
"""When from_user is None in a group, user_id should remain None."""
from gateway.platforms.base import MessageType
adapter = _make_adapter()
msg = _make_mock_message(
chat_id=-1001234567890, chat_type=_ChatType.SUPERGROUP,
user_id=42, user_name="Alice"
)
msg.from_user = None
event = adapter._build_message_event(msg, MessageType.TEXT)
# Groups should NOT fall back — anonymous senders stay None
assert event.source.user_id is None
assert event.source.user_name is None
def test_build_message_event_dm_from_user_present_uses_user():
"""When from_user is present in a DM, it should be used (no fallback)."""
from gateway.platforms.base import MessageType
adapter = _make_adapter()
msg = _make_mock_message(chat_id=12345, user_id=99999, user_name="Bob")
event = adapter._build_message_event(msg, MessageType.TEXT)
# Normal case — from_user is used directly
assert event.source.user_id == "99999"
assert event.source.user_name == "Bob"