hermes-agent/tests/tools/test_session_search.py
helix4u 6ab78401c9 fix(aux): add session_search extra_body and concurrency controls
Adds auxiliary.<task>.extra_body config passthrough so reasoning-heavy
OpenAI-compatible providers can receive provider-specific request fields
(e.g. enable_thinking: false on GLM) on auxiliary calls, and bounds
session_search summary fan-out with auxiliary.session_search.max_concurrency
(default 3, clamped 1-5) to avoid 429 bursts on small providers.

- agent/auxiliary_client.py: extract _get_auxiliary_task_config helper,
  add _get_task_extra_body, merge config+explicit extra_body with explicit winning
- hermes_cli/config.py: extra_body defaults on all aux tasks +
  session_search.max_concurrency; _config_version 19 -> 20
- tools/session_search_tool.py: semaphore around _summarize_all gather
- tests: coverage in test_auxiliary_client, test_session_search, test_aux_config
- docs: user-guide/configuration.md + fallback-providers.md

Co-authored-by: Teknium <teknium@nousresearch.com>
2026-04-20 00:47:39 -07:00

436 lines
16 KiB
Python

"""Tests for tools/session_search_tool.py — helper functions and search dispatcher."""
import asyncio
import json
import time
import pytest
from tools.session_search_tool import (
_format_timestamp,
_format_conversation,
_truncate_around_matches,
_get_session_search_max_concurrency,
_HIDDEN_SESSION_SOURCES,
MAX_SESSION_CHARS,
SESSION_SEARCH_SCHEMA,
)
# =========================================================================
# Tool schema guidance
# =========================================================================
class TestHiddenSessionSources:
"""Verify the _HIDDEN_SESSION_SOURCES constant used for third-party isolation."""
def test_tool_source_is_hidden(self):
assert "tool" in _HIDDEN_SESSION_SOURCES
def test_standard_sources_not_hidden(self):
for src in ("cli", "telegram", "discord", "slack", "cron"):
assert src not in _HIDDEN_SESSION_SOURCES
class TestSessionSearchSchema:
def test_keeps_cross_session_recall_guidance_without_current_session_nudge(self):
description = SESSION_SEARCH_SCHEMA["description"]
assert "past conversations" in description
assert "recent turns of the current session" not in description
# =========================================================================
# _format_timestamp
# =========================================================================
class TestFormatTimestamp:
def test_unix_float(self):
ts = 1700000000.0 # Nov 14, 2023
result = _format_timestamp(ts)
assert "2023" in result or "November" in result
def test_unix_int(self):
result = _format_timestamp(1700000000)
assert isinstance(result, str)
assert len(result) > 5
def test_iso_string(self):
result = _format_timestamp("2024-01-15T10:30:00")
assert isinstance(result, str)
def test_none_returns_unknown(self):
assert _format_timestamp(None) == "unknown"
def test_numeric_string(self):
result = _format_timestamp("1700000000.0")
assert isinstance(result, str)
assert "unknown" not in result.lower()
# =========================================================================
# _format_conversation
# =========================================================================
class TestFormatConversation:
def test_basic_messages(self):
msgs = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
]
result = _format_conversation(msgs)
assert "[USER]: Hello" in result
assert "[ASSISTANT]: Hi there!" in result
def test_tool_message(self):
msgs = [
{"role": "tool", "content": "search results", "tool_name": "web_search"},
]
result = _format_conversation(msgs)
assert "[TOOL:web_search]" in result
def test_long_tool_output_truncated(self):
msgs = [
{"role": "tool", "content": "x" * 1000, "tool_name": "terminal"},
]
result = _format_conversation(msgs)
assert "[truncated]" in result
def test_assistant_with_tool_calls(self):
msgs = [
{
"role": "assistant",
"content": "",
"tool_calls": [
{"function": {"name": "web_search"}},
{"function": {"name": "terminal"}},
],
},
]
result = _format_conversation(msgs)
assert "web_search" in result
assert "terminal" in result
def test_empty_messages(self):
result = _format_conversation([])
assert result == ""
# =========================================================================
# _truncate_around_matches
# =========================================================================
class TestTruncateAroundMatches:
def test_short_text_unchanged(self):
text = "Short text about docker"
result = _truncate_around_matches(text, "docker")
assert result == text
def test_long_text_truncated(self):
# Create text longer than MAX_SESSION_CHARS with query term in middle
padding = "x" * (MAX_SESSION_CHARS + 5000)
text = padding + " KEYWORD_HERE " + padding
result = _truncate_around_matches(text, "KEYWORD_HERE")
assert len(result) <= MAX_SESSION_CHARS + 100 # +100 for prefix/suffix markers
assert "KEYWORD_HERE" in result
def test_truncation_adds_markers(self):
text = "a" * 50000 + " target " + "b" * (MAX_SESSION_CHARS + 5000)
result = _truncate_around_matches(text, "target")
assert "truncated" in result.lower()
def test_no_match_takes_from_start(self):
text = "x" * (MAX_SESSION_CHARS + 5000)
result = _truncate_around_matches(text, "nonexistent")
# Should take from the beginning
assert result.startswith("x")
def test_match_at_beginning(self):
text = "KEYWORD " + "x" * (MAX_SESSION_CHARS + 5000)
result = _truncate_around_matches(text, "KEYWORD")
assert "KEYWORD" in result
def test_multiword_phrase_match_beats_individual_term(self):
"""Full phrase deep in text should be found even when a single term
appears much earlier in boilerplate."""
boilerplate = "The project setup is complex. " * 500 # ~15K, has 'project' early
filler = "x" * (MAX_SESSION_CHARS + 20000)
target = "We reviewed the keystone project roadmap in detail."
text = boilerplate + filler + target + filler
result = _truncate_around_matches(text, "keystone project")
assert "keystone project" in result.lower()
def test_multiword_proximity_cooccurrence(self):
"""When exact phrase is absent, terms co-occurring within proximity
should be preferred over a lone early term."""
early = "project " + "a" * (MAX_SESSION_CHARS + 20000)
# Place 'keystone' and 'project' near each other (but not as exact phrase)
cooccur = "this keystone initiative for the project was pivotal"
tail = "b" * (MAX_SESSION_CHARS + 20000)
text = early + cooccur + tail
result = _truncate_around_matches(text, "keystone project")
assert "keystone" in result.lower()
assert "project" in result.lower()
def test_multiword_window_maximises_coverage(self):
"""Sliding window should capture as many match clusters as possible."""
# Place two phrase matches: one at ~50K, one at ~60K, both should fit
pre = "z" * 50000
match1 = " alpha beta "
gap = "z" * 10000
match2 = " alpha beta "
post = "z" * (MAX_SESSION_CHARS + 40000)
text = pre + match1 + gap + match2 + post
result = _truncate_around_matches(text, "alpha beta")
assert result.lower().count("alpha beta") == 2
class TestSessionSearchConcurrency:
def test_defaults_to_three(self):
assert _get_session_search_max_concurrency() == 3
def test_reads_and_clamps_configured_value(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"auxiliary": {"session_search": {"max_concurrency": 9}}},
)
assert _get_session_search_max_concurrency() == 5
def test_session_search_respects_configured_concurrency_limit(self, monkeypatch):
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"auxiliary": {"session_search": {"max_concurrency": 1}}},
)
max_seen = {"value": 0}
active = {"value": 0}
async def fake_summarize(_text, _query, _meta):
active["value"] += 1
max_seen["value"] = max(max_seen["value"], active["value"])
await asyncio.sleep(0.01)
active["value"] -= 1
return "summary"
monkeypatch.setattr("tools.session_search_tool._summarize_session", fake_summarize)
monkeypatch.setattr("model_tools._run_async", lambda coro: asyncio.run(coro))
mock_db = MagicMock()
mock_db.search_messages.return_value = [
{"session_id": "s1", "source": "cli", "session_started": 1709500000, "model": "test"},
{"session_id": "s2", "source": "cli", "session_started": 1709500001, "model": "test"},
{"session_id": "s3", "source": "cli", "session_started": 1709500002, "model": "test"},
]
mock_db.get_session.side_effect = lambda sid: {
"id": sid,
"parent_session_id": None,
"source": "cli",
"started_at": 1709500000,
}
mock_db.get_messages_as_conversation.side_effect = lambda sid: [
{"role": "user", "content": f"message from {sid}"},
{"role": "assistant", "content": "response"},
]
result = json.loads(session_search(query="message", db=mock_db, limit=3))
assert result["success"] is True
assert result["count"] == 3
assert max_seen["value"] == 1
# =========================================================================
# session_search (dispatcher)
# =========================================================================
class TestSessionSearch:
def test_no_db_returns_error(self):
from tools.session_search_tool import session_search
result = json.loads(session_search(query="test"))
assert result["success"] is False
assert "not available" in result["error"].lower()
def test_empty_query_returns_error(self):
from tools.session_search_tool import session_search
mock_db = object()
result = json.loads(session_search(query="", db=mock_db))
assert result["success"] is False
def test_whitespace_query_returns_error(self):
from tools.session_search_tool import session_search
mock_db = object()
result = json.loads(session_search(query=" ", db=mock_db))
assert result["success"] is False
def test_current_session_excluded(self):
"""session_search should never return the current session."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
current_sid = "20260304_120000_abc123"
# Simulate FTS5 returning matches only from the current session
mock_db.search_messages.return_value = [
{"session_id": current_sid, "content": "test match", "source": "cli",
"session_started": 1709500000, "model": "test"},
]
mock_db.get_session.return_value = {"parent_session_id": None}
result = json.loads(session_search(
query="test", db=mock_db, current_session_id=current_sid,
))
assert result["success"] is True
assert result["count"] == 0
assert result["results"] == []
def test_current_session_excluded_keeps_others(self):
"""Other sessions should still be returned when current is excluded."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
current_sid = "20260304_120000_abc123"
other_sid = "20260303_100000_def456"
mock_db.search_messages.return_value = [
{"session_id": current_sid, "content": "match 1", "source": "cli",
"session_started": 1709500000, "model": "test"},
{"session_id": other_sid, "content": "match 2", "source": "telegram",
"session_started": 1709400000, "model": "test"},
]
mock_db.get_session.return_value = {"parent_session_id": None}
mock_db.get_messages_as_conversation.return_value = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
# Mock async_call_llm to raise RuntimeError → summarizer returns None
from unittest.mock import AsyncMock, patch as _patch
with _patch("tools.session_search_tool.async_call_llm",
new_callable=AsyncMock,
side_effect=RuntimeError("no provider")):
result = json.loads(session_search(
query="test", db=mock_db, current_session_id=current_sid,
))
assert result["success"] is True
# Current session should be skipped, only other_sid should appear
assert result["sessions_searched"] == 1
assert current_sid not in [r.get("session_id") for r in result.get("results", [])]
def test_current_child_session_excludes_parent_lineage(self):
"""Compression/delegation parents should be excluded for the active child session."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = [
{"session_id": "parent_sid", "content": "match", "source": "cli",
"session_started": 1709500000, "model": "test"},
]
def _get_session(session_id):
if session_id == "child_sid":
return {"parent_session_id": "parent_sid"}
if session_id == "parent_sid":
return {"parent_session_id": None}
return None
mock_db.get_session.side_effect = _get_session
result = json.loads(session_search(
query="test", db=mock_db, current_session_id="child_sid",
))
assert result["success"] is True
assert result["count"] == 0
assert result["results"] == []
assert result["sessions_searched"] == 0
def test_limit_none_coerced_to_default(self):
"""Model sends limit=null → should fall back to 3, not TypeError."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = []
result = json.loads(session_search(
query="test", db=mock_db, limit=None,
))
assert result["success"] is True
def test_limit_type_object_coerced_to_default(self):
"""Model sends limit as a type object → should fall back to 3, not TypeError."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = []
result = json.loads(session_search(
query="test", db=mock_db, limit=int,
))
assert result["success"] is True
def test_limit_string_coerced(self):
"""Model sends limit as string '2' → should coerce to int."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = []
result = json.loads(session_search(
query="test", db=mock_db, limit="2",
))
assert result["success"] is True
def test_limit_clamped_to_range(self):
"""Negative or zero limit should be clamped to 1."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = []
result = json.loads(session_search(
query="test", db=mock_db, limit=-5,
))
assert result["success"] is True
result = json.loads(session_search(
query="test", db=mock_db, limit=0,
))
assert result["success"] is True
def test_current_root_session_excludes_child_lineage(self):
"""Delegation child hits should be excluded when they resolve to the current root session."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = [
{"session_id": "child_sid", "content": "match", "source": "cli",
"session_started": 1709500000, "model": "test"},
]
def _get_session(session_id):
if session_id == "root_sid":
return {"parent_session_id": None}
if session_id == "child_sid":
return {"parent_session_id": "root_sid"}
return None
mock_db.get_session.side_effect = _get_session
result = json.loads(session_search(
query="test", db=mock_db, current_session_id="root_sid",
))
assert result["success"] is True
assert result["count"] == 0
assert result["results"] == []
assert result["sessions_searched"] == 0