mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
fix(tools): let session_search match session titles
This commit is contained in:
parent
2c02583c2b
commit
4efec63a34
2 changed files with 129 additions and 4 deletions
|
|
@ -189,6 +189,48 @@ class TestDiscoveryShape:
|
|||
assert result["results"] == []
|
||||
assert result["count"] == 0
|
||||
|
||||
def test_query_can_match_session_title_without_message_hit(self, db):
|
||||
db.create_session("s_fingerprint", source="cli")
|
||||
db.set_session_title("s_fingerprint", "fingerprint-login")
|
||||
db.append_message("s_fingerprint", role="user", content="Let's configure PAM for biometric auth")
|
||||
db.append_message("s_fingerprint", role="assistant", content="Checking Linux auth settings.")
|
||||
|
||||
result = json.loads(session_search(query="fingerprint-login", db=db))
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["count"] == 1
|
||||
hit = result["results"][0]
|
||||
assert hit["session_id"] == "s_fingerprint"
|
||||
assert hit["title"] == "fingerprint-login"
|
||||
assert hit["matched_role"] == "session_title"
|
||||
assert "Session title matched" in hit["snippet"]
|
||||
|
||||
def test_title_query_strips_common_model_quoting(self, db):
|
||||
db.create_session("s_fingerprint", source="cli")
|
||||
db.set_session_title("s_fingerprint", "fingerprint-login")
|
||||
db.append_message("s_fingerprint", role="user", content="PAM auth setup")
|
||||
|
||||
result = json.loads(session_search(query="`fingerprint-login`", db=db))
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["results"][0]["session_id"] == "s_fingerprint"
|
||||
assert result["results"][0]["matched_role"] == "session_title"
|
||||
|
||||
def test_title_match_respects_current_session_filter(self, db):
|
||||
db.create_session("s_current", source="cli")
|
||||
db.set_session_title("s_current", "fingerprint-login")
|
||||
db.append_message("s_current", role="user", content="PAM auth setup")
|
||||
|
||||
result = json.loads(session_search(
|
||||
query="fingerprint-login",
|
||||
current_session_id="s_current",
|
||||
db=db,
|
||||
))
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["results"] == []
|
||||
assert result["count"] == 0
|
||||
|
||||
def test_limit_clamped_to_max_10(self, db):
|
||||
_seed_modpack_sessions(db)
|
||||
# Pass huge limit; should not error and should cap
|
||||
|
|
|
|||
|
|
@ -391,6 +391,78 @@ def _scroll(
|
|||
return json.dumps(response, ensure_ascii=False)
|
||||
|
||||
|
||||
def _normalize_title_query(query: str) -> str:
|
||||
"""Strip common quoting the model may include around a remembered title."""
|
||||
return query.strip().strip("`'\"")
|
||||
|
||||
|
||||
def _title_match_result(
|
||||
db,
|
||||
query: str,
|
||||
current_lineage_root: Optional[str],
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Return a discovery-shaped result when the query matches a session title."""
|
||||
title_query = _normalize_title_query(query)
|
||||
if not title_query:
|
||||
return None
|
||||
|
||||
try:
|
||||
session_id = db.resolve_session_by_title(title_query)
|
||||
except Exception:
|
||||
logging.debug("resolve_session_by_title failed for %r", title_query, exc_info=True)
|
||||
return None
|
||||
if not session_id:
|
||||
return None
|
||||
|
||||
lineage_root = _resolve_to_parent(db, session_id)
|
||||
if current_lineage_root and lineage_root == current_lineage_root:
|
||||
return None
|
||||
|
||||
try:
|
||||
session_meta = db.get_session(lineage_root) or db.get_session(session_id) or {}
|
||||
except Exception:
|
||||
logging.debug("get_session failed for title match %s", session_id, exc_info=True)
|
||||
session_meta = {}
|
||||
if session_meta.get("source") in _HIDDEN_SESSION_SOURCES:
|
||||
return None
|
||||
|
||||
try:
|
||||
messages = db.get_messages(session_id)
|
||||
except Exception:
|
||||
logging.debug("get_messages failed for title match %s", session_id, exc_info=True)
|
||||
messages = []
|
||||
|
||||
anchor_id = messages[0].get("id") if messages else None
|
||||
if anchor_id is not None:
|
||||
try:
|
||||
view = db.get_anchored_view(session_id, anchor_id, window=5, bookend=3)
|
||||
except Exception:
|
||||
logging.debug("get_anchored_view failed for title match %s/%s", session_id, anchor_id, exc_info=True)
|
||||
view = {}
|
||||
else:
|
||||
view = {}
|
||||
|
||||
entry = {
|
||||
"session_id": session_id,
|
||||
"when": _format_timestamp(session_meta.get("started_at")),
|
||||
"source": session_meta.get("source", "unknown"),
|
||||
"model": session_meta.get("model") or "unknown",
|
||||
"title": session_meta.get("title") or title_query,
|
||||
"matched_role": "session_title",
|
||||
"match_message_id": anchor_id,
|
||||
"snippet": f"Session title matched: {session_meta.get('title') or title_query}",
|
||||
"bookend_start": [_shape_message(m) for m in (view.get("bookend_start") or messages[:3])],
|
||||
"messages": [_shape_message(m, anchor_id=anchor_id) for m in (view.get("window") or messages[:5])],
|
||||
"bookend_end": [_shape_message(m) for m in (view.get("bookend_end") or messages[-3:])],
|
||||
"messages_before": view.get("messages_before", 0),
|
||||
"messages_after": view.get("messages_after", max(len(messages) - 5, 0)),
|
||||
"_lineage_root": lineage_root,
|
||||
}
|
||||
if lineage_root and lineage_root != session_id:
|
||||
entry["parent_session_id"] = lineage_root
|
||||
return entry
|
||||
|
||||
|
||||
def _discover(
|
||||
db,
|
||||
query: str,
|
||||
|
|
@ -401,6 +473,8 @@ def _discover(
|
|||
) -> str:
|
||||
"""Discovery shape: FTS5 + anchored window + bookends per hit. Single call."""
|
||||
role_list = role_filter if role_filter else ["user", "assistant"]
|
||||
current_lineage_root = _resolve_to_parent(db, current_session_id) if current_session_id else None
|
||||
title_result = _title_match_result(db, query, current_lineage_root)
|
||||
|
||||
try:
|
||||
raw_results = db.search_messages(
|
||||
|
|
@ -415,7 +489,7 @@ def _discover(
|
|||
logging.error("FTS5 search failed: %s", e, exc_info=True)
|
||||
return tool_error(f"Search failed: {e}", success=False)
|
||||
|
||||
if not raw_results:
|
||||
if not raw_results and not title_result:
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"mode": "discover",
|
||||
|
|
@ -425,13 +499,21 @@ def _discover(
|
|||
"message": "No matching sessions found.",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
current_lineage_root = _resolve_to_parent(db, current_session_id) if current_session_id else None
|
||||
|
||||
# Dedupe by lineage. Keep the raw owning session_id on the surviving
|
||||
# row — only that pairs validly with the FTS5 match id for the anchored
|
||||
# window. parent_session_id is exposed separately when different.
|
||||
seen_sessions = {}
|
||||
results = []
|
||||
|
||||
if title_result:
|
||||
title_lineage = title_result.pop("_lineage_root", None)
|
||||
if title_lineage:
|
||||
seen_sessions[title_lineage] = {"_title_only": True}
|
||||
results.append(title_result)
|
||||
|
||||
for r in raw_results:
|
||||
if len(seen_sessions) >= limit:
|
||||
break
|
||||
raw_sid = r["session_id"]
|
||||
resolved_sid = _resolve_to_parent(db, raw_sid)
|
||||
# Skip the current session lineage
|
||||
|
|
@ -446,8 +528,9 @@ def _discover(
|
|||
if len(seen_sessions) >= limit:
|
||||
break
|
||||
|
||||
results = []
|
||||
for lineage_root, match_info in seen_sessions.items():
|
||||
if match_info.get("_title_only"):
|
||||
continue
|
||||
hit_sid = match_info.get("session_id") or lineage_root
|
||||
msg_id = match_info.get("id")
|
||||
try:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue