Merge pull request #552 from NousResearch/feat/insights

feat: /insights command — usage analytics, cost estimation & activity patterns
This commit is contained in:
Teknium 2026-03-06 16:00:28 -08:00 committed by GitHub
commit 6d3804770c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1657 additions and 1 deletions

804
agent/insights.py Normal file
View file

@ -0,0 +1,804 @@
"""
Session Insights Engine for Hermes Agent.
Analyzes historical session data from the SQLite state database to produce
comprehensive usage insights token consumption, cost estimates, tool usage
patterns, activity trends, model/platform breakdowns, and session metrics.
Inspired by Claude Code's /insights command, adapted for Hermes Agent's
multi-platform architecture with additional cost estimation and platform
breakdown capabilities.
Usage:
from agent.insights import InsightsEngine
engine = InsightsEngine(db)
report = engine.generate(days=30)
print(engine.format_terminal(report))
"""
import json
import time
from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional
# =========================================================================
# Model pricing (USD per million tokens) — approximate as of early 2026
# =========================================================================
MODEL_PRICING = {
# OpenAI
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4.1": {"input": 2.00, "output": 8.00},
"gpt-4.1-mini": {"input": 0.40, "output": 1.60},
"gpt-4.1-nano": {"input": 0.10, "output": 0.40},
"gpt-4.5-preview": {"input": 75.00, "output": 150.00},
"gpt-5": {"input": 10.00, "output": 30.00},
"gpt-5.4": {"input": 10.00, "output": 30.00},
"o3": {"input": 10.00, "output": 40.00},
"o3-mini": {"input": 1.10, "output": 4.40},
"o4-mini": {"input": 1.10, "output": 4.40},
# Anthropic
"claude-opus-4-20250514": {"input": 15.00, "output": 75.00},
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.00},
"claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
# DeepSeek
"deepseek-chat": {"input": 0.14, "output": 0.28},
"deepseek-reasoner": {"input": 0.55, "output": 2.19},
# Google
"gemini-2.5-pro": {"input": 1.25, "output": 10.00},
"gemini-2.5-flash": {"input": 0.15, "output": 0.60},
"gemini-2.0-flash": {"input": 0.10, "output": 0.40},
# Meta (via providers)
"llama-4-maverick": {"input": 0.50, "output": 0.70},
"llama-4-scout": {"input": 0.20, "output": 0.30},
}
# Fallback: unknown/custom models get zero cost (we can't assume pricing
# for self-hosted models, custom OAI endpoints, local inference, etc.)
_DEFAULT_PRICING = {"input": 0.0, "output": 0.0}
def _has_known_pricing(model_name: str) -> bool:
"""Check if a model has known pricing (vs unknown/custom endpoint)."""
return _get_pricing(model_name) is not _DEFAULT_PRICING
def _get_pricing(model_name: str) -> Dict[str, float]:
"""Look up pricing for a model. Uses fuzzy matching on model name.
Returns _DEFAULT_PRICING (zero cost) for unknown/custom models
we can't assume costs for self-hosted endpoints, local inference, etc.
"""
if not model_name:
return _DEFAULT_PRICING
# Strip provider prefix (e.g., "anthropic/claude-..." -> "claude-...")
bare = model_name.split("/")[-1].lower()
# Exact match first
if bare in MODEL_PRICING:
return MODEL_PRICING[bare]
# Fuzzy prefix match — prefer the LONGEST matching key to avoid
# e.g. "gpt-4o" matching before "gpt-4o-mini" for "gpt-4o-mini-2024-07-18"
best_match = None
best_len = 0
for key, price in MODEL_PRICING.items():
if bare.startswith(key) and len(key) > best_len:
best_match = price
best_len = len(key)
if best_match:
return best_match
# Keyword heuristics (checked in most-specific-first order)
if "opus" in bare:
return {"input": 15.00, "output": 75.00}
if "sonnet" in bare:
return {"input": 3.00, "output": 15.00}
if "haiku" in bare:
return {"input": 0.80, "output": 4.00}
if "gpt-4o-mini" in bare:
return {"input": 0.15, "output": 0.60}
if "gpt-4o" in bare:
return {"input": 2.50, "output": 10.00}
if "gpt-5" in bare:
return {"input": 10.00, "output": 30.00}
if "deepseek" in bare:
return {"input": 0.14, "output": 0.28}
if "gemini" in bare:
return {"input": 0.15, "output": 0.60}
return _DEFAULT_PRICING
def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate the USD cost for a given model and token counts."""
pricing = _get_pricing(model)
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
def _format_duration(seconds: float) -> str:
"""Format seconds into a human-readable duration string."""
if seconds < 60:
return f"{seconds:.0f}s"
minutes = seconds / 60
if minutes < 60:
return f"{minutes:.0f}m"
hours = minutes / 60
if hours < 24:
remaining_min = int(minutes % 60)
return f"{int(hours)}h {remaining_min}m" if remaining_min else f"{int(hours)}h"
days = hours / 24
return f"{days:.1f}d"
def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
"""Create simple horizontal bar chart strings from values."""
peak = max(values) if values else 1
if peak == 0:
return ["" for _ in values]
return ["" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
class InsightsEngine:
"""
Analyzes session history and produces usage insights.
Works directly with a SessionDB instance (or raw sqlite3 connection)
to query session and message data.
"""
def __init__(self, db):
"""
Initialize with a SessionDB instance.
Args:
db: A SessionDB instance (from hermes_state.py)
"""
self.db = db
self._conn = db._conn
def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]:
"""
Generate a complete insights report.
Args:
days: Number of days to look back (default: 30)
source: Optional filter by source platform
Returns:
Dict with all computed insights
"""
cutoff = time.time() - (days * 86400)
# Gather raw data
sessions = self._get_sessions(cutoff, source)
tool_usage = self._get_tool_usage(cutoff, source)
message_stats = self._get_message_stats(cutoff, source)
if not sessions:
return {
"days": days,
"source_filter": source,
"empty": True,
"overview": {},
"models": [],
"platforms": [],
"tools": [],
"activity": {},
"top_sessions": [],
}
# Compute insights
overview = self._compute_overview(sessions, message_stats)
models = self._compute_model_breakdown(sessions)
platforms = self._compute_platform_breakdown(sessions)
tools = self._compute_tool_breakdown(tool_usage)
activity = self._compute_activity_patterns(sessions)
top_sessions = self._compute_top_sessions(sessions)
return {
"days": days,
"source_filter": source,
"empty": False,
"generated_at": time.time(),
"overview": overview,
"models": models,
"platforms": platforms,
"tools": tools,
"activity": activity,
"top_sessions": top_sessions,
}
# =========================================================================
# Data gathering (SQL queries)
# =========================================================================
# Columns we actually need (skip system_prompt, model_config blobs)
_SESSION_COLS = ("id, source, model, started_at, ended_at, "
"message_count, tool_call_count, input_tokens, output_tokens")
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
"""Fetch sessions within the time window."""
if source:
cursor = self._conn.execute(
f"""SELECT {self._SESSION_COLS} FROM sessions
WHERE started_at >= ? AND source = ?
ORDER BY started_at DESC""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
f"""SELECT {self._SESSION_COLS} FROM sessions
WHERE started_at >= ?
ORDER BY started_at DESC""",
(cutoff,),
)
return [dict(row) for row in cursor.fetchall()]
def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:
"""Get tool call counts from messages.
Uses two sources:
1. tool_name column on 'tool' role messages (set by gateway)
2. tool_calls JSON on 'assistant' role messages (covers CLI where
tool_name is not populated on tool responses)
"""
tool_counts = Counter()
# Source 1: explicit tool_name on tool response messages
if source:
cursor = self._conn.execute(
"""SELECT m.tool_name, COUNT(*) as count
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?
AND m.role = 'tool' AND m.tool_name IS NOT NULL
GROUP BY m.tool_name
ORDER BY count DESC""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"""SELECT m.tool_name, COUNT(*) as count
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?
AND m.role = 'tool' AND m.tool_name IS NOT NULL
GROUP BY m.tool_name
ORDER BY count DESC""",
(cutoff,),
)
for row in cursor.fetchall():
tool_counts[row["tool_name"]] += row["count"]
# Source 2: extract from tool_calls JSON on assistant messages
# (covers CLI sessions where tool_name is NULL on tool responses)
if source:
cursor2 = self._conn.execute(
"""SELECT m.tool_calls
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
(cutoff, source),
)
else:
cursor2 = self._conn.execute(
"""SELECT m.tool_calls
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
(cutoff,),
)
tool_calls_counts = Counter()
for row in cursor2.fetchall():
try:
calls = row["tool_calls"]
if isinstance(calls, str):
calls = json.loads(calls)
if isinstance(calls, list):
for call in calls:
func = call.get("function", {}) if isinstance(call, dict) else {}
name = func.get("name")
if name:
tool_calls_counts[name] += 1
except (json.JSONDecodeError, TypeError, AttributeError):
continue
# Merge: prefer tool_name source, supplement with tool_calls source
# for tools not already counted
if not tool_counts and tool_calls_counts:
# No tool_name data at all — use tool_calls exclusively
tool_counts = tool_calls_counts
elif tool_counts and tool_calls_counts:
# Both sources have data — use whichever has the higher count per tool
# (they may overlap, so take the max to avoid double-counting)
all_tools = set(tool_counts) | set(tool_calls_counts)
merged = Counter()
for tool in all_tools:
merged[tool] = max(tool_counts.get(tool, 0), tool_calls_counts.get(tool, 0))
tool_counts = merged
# Convert to the expected format
return [
{"tool_name": name, "count": count}
for name, count in tool_counts.most_common()
]
def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
"""Get aggregate message statistics."""
if source:
cursor = self._conn.execute(
"""SELECT
COUNT(*) as total_messages,
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"""SELECT
COUNT(*) as total_messages,
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?""",
(cutoff,),
)
row = cursor.fetchone()
return dict(row) if row else {
"total_messages": 0, "user_messages": 0,
"assistant_messages": 0, "tool_messages": 0,
}
# =========================================================================
# Computation
# =========================================================================
def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict:
"""Compute high-level overview statistics."""
total_input = sum(s.get("input_tokens") or 0 for s in sessions)
total_output = sum(s.get("output_tokens") or 0 for s in sessions)
total_tokens = total_input + total_output
total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions)
total_messages = sum(s.get("message_count") or 0 for s in sessions)
# Cost estimation (weighted by model)
total_cost = 0.0
models_with_pricing = set()
models_without_pricing = set()
for s in sessions:
model = s.get("model") or ""
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
total_cost += _estimate_cost(model, inp, out)
display = model.split("/")[-1] if "/" in model else (model or "unknown")
if _has_known_pricing(model):
models_with_pricing.add(display)
else:
models_without_pricing.add(display)
# Session duration stats (guard against negative durations from clock drift)
durations = []
for s in sessions:
start = s.get("started_at")
end = s.get("ended_at")
if start and end and end > start:
durations.append(end - start)
total_hours = sum(durations) / 3600 if durations else 0
avg_duration = sum(durations) / len(durations) if durations else 0
# Earliest and latest session
started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")]
date_range_start = min(started_timestamps) if started_timestamps else None
date_range_end = max(started_timestamps) if started_timestamps else None
return {
"total_sessions": len(sessions),
"total_messages": total_messages,
"total_tool_calls": total_tool_calls,
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"total_tokens": total_tokens,
"estimated_cost": total_cost,
"total_hours": total_hours,
"avg_session_duration": avg_duration,
"avg_messages_per_session": total_messages / len(sessions) if sessions else 0,
"avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0,
"user_messages": message_stats.get("user_messages") or 0,
"assistant_messages": message_stats.get("assistant_messages") or 0,
"tool_messages": message_stats.get("tool_messages") or 0,
"date_range_start": date_range_start,
"date_range_end": date_range_end,
"models_with_pricing": sorted(models_with_pricing),
"models_without_pricing": sorted(models_without_pricing),
}
def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
"""Break down usage by model."""
model_data = defaultdict(lambda: {
"sessions": 0, "input_tokens": 0, "output_tokens": 0,
"total_tokens": 0, "tool_calls": 0, "cost": 0.0,
})
for s in sessions:
model = s.get("model") or "unknown"
# Normalize: strip provider prefix for display
display_model = model.split("/")[-1] if "/" in model else model
d = model_data[display_model]
d["sessions"] += 1
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
d["input_tokens"] += inp
d["output_tokens"] += out
d["total_tokens"] += inp + out
d["tool_calls"] += s.get("tool_call_count") or 0
d["cost"] += _estimate_cost(model, inp, out)
d["has_pricing"] = _has_known_pricing(model)
result = [
{"model": model, **data}
for model, data in model_data.items()
]
# Sort by tokens first, fall back to session count when tokens are 0
result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True)
return result
def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]:
"""Break down usage by platform/source."""
platform_data = defaultdict(lambda: {
"sessions": 0, "messages": 0, "input_tokens": 0,
"output_tokens": 0, "total_tokens": 0, "tool_calls": 0,
})
for s in sessions:
source = s.get("source") or "unknown"
d = platform_data[source]
d["sessions"] += 1
d["messages"] += s.get("message_count") or 0
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
d["input_tokens"] += inp
d["output_tokens"] += out
d["total_tokens"] += inp + out
d["tool_calls"] += s.get("tool_call_count") or 0
result = [
{"platform": platform, **data}
for platform, data in platform_data.items()
]
result.sort(key=lambda x: x["sessions"], reverse=True)
return result
def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]:
"""Process tool usage data into a ranked list with percentages."""
total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0
result = []
for t in tool_usage:
pct = (t["count"] / total_calls * 100) if total_calls else 0
result.append({
"tool": t["tool_name"],
"count": t["count"],
"percentage": pct,
})
return result
def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
"""Analyze activity patterns by day of week and hour."""
day_counts = Counter() # 0=Monday ... 6=Sunday
hour_counts = Counter()
daily_counts = Counter() # date string -> count
for s in sessions:
ts = s.get("started_at")
if not ts:
continue
dt = datetime.fromtimestamp(ts)
day_counts[dt.weekday()] += 1
hour_counts[dt.hour] += 1
daily_counts[dt.strftime("%Y-%m-%d")] += 1
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
day_breakdown = [
{"day": day_names[i], "count": day_counts.get(i, 0)}
for i in range(7)
]
hour_breakdown = [
{"hour": i, "count": hour_counts.get(i, 0)}
for i in range(24)
]
# Busiest day and hour
busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None
busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None
# Active days (days with at least one session)
active_days = len(daily_counts)
# Streak calculation
if daily_counts:
all_dates = sorted(daily_counts.keys())
current_streak = 1
max_streak = 1
for i in range(1, len(all_dates)):
d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d")
d2 = datetime.strptime(all_dates[i], "%Y-%m-%d")
if (d2 - d1).days == 1:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 1
else:
max_streak = 0
return {
"by_day": day_breakdown,
"by_hour": hour_breakdown,
"busiest_day": busiest_day,
"busiest_hour": busiest_hour,
"active_days": active_days,
"max_streak": max_streak,
}
def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]:
"""Find notable sessions (longest, most messages, most tokens)."""
top = []
# Longest by duration
sessions_with_duration = [
s for s in sessions
if s.get("started_at") and s.get("ended_at")
]
if sessions_with_duration:
longest = max(
sessions_with_duration,
key=lambda s: (s["ended_at"] - s["started_at"]),
)
dur = longest["ended_at"] - longest["started_at"]
top.append({
"label": "Longest session",
"session_id": longest["id"][:16],
"value": _format_duration(dur),
"date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
})
# Most messages
most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0)
if (most_msgs.get("message_count") or 0) > 0:
top.append({
"label": "Most messages",
"session_id": most_msgs["id"][:16],
"value": f"{most_msgs['message_count']} msgs",
"date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?",
})
# Most tokens
most_tokens = max(
sessions,
key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0),
)
token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0)
if token_total > 0:
top.append({
"label": "Most tokens",
"session_id": most_tokens["id"][:16],
"value": f"{token_total:,} tokens",
"date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?",
})
# Most tool calls
most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0)
if (most_tools.get("tool_call_count") or 0) > 0:
top.append({
"label": "Most tool calls",
"session_id": most_tools["id"][:16],
"value": f"{most_tools['tool_call_count']} calls",
"date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?",
})
return top
# =========================================================================
# Formatting
# =========================================================================
def format_terminal(self, report: Dict) -> str:
"""Format the insights report for terminal display (CLI)."""
if report.get("empty"):
days = report.get("days", 30)
src = f" (source: {report['source_filter']})" if report.get("source_filter") else ""
return f" No sessions found in the last {days} days{src}."
lines = []
o = report["overview"]
days = report["days"]
src_filter = report.get("source_filter")
# Header
lines.append("")
lines.append(" ╔══════════════════════════════════════════════════════════╗")
lines.append(" ║ 📊 Hermes Insights ║")
period_label = f"Last {days} days"
if src_filter:
period_label += f" ({src_filter})"
padding = 58 - len(period_label) - 2
left_pad = padding // 2
right_pad = padding - left_pad
lines.append(f"{' ' * left_pad} {period_label} {' ' * right_pad}")
lines.append(" ╚══════════════════════════════════════════════════════════╝")
lines.append("")
# Date range
if o.get("date_range_start") and o.get("date_range_end"):
start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y")
end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y")
lines.append(f" Period: {start_str}{end_str}")
lines.append("")
# Overview
lines.append(" 📋 Overview")
lines.append(" " + "" * 56)
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
cost_str = f"${o['estimated_cost']:.2f}"
if o.get("models_without_pricing"):
cost_str += " *"
lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: {cost_str}")
if o["total_hours"] > 0:
lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}")
lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}")
lines.append("")
# Model breakdown
if report["models"]:
lines.append(" 🤖 Models Used")
lines.append(" " + "" * 56)
lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}")
for m in report["models"]:
model_name = m["model"][:28]
if m.get("has_pricing"):
cost_cell = f"${m['cost']:>6.2f}"
else:
cost_cell = " N/A"
lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}")
if o.get("models_without_pricing"):
lines.append(f" * Cost N/A for custom/self-hosted models")
lines.append("")
# Platform breakdown
if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"):
lines.append(" 📱 Platforms")
lines.append(" " + "" * 56)
lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}")
for p in report["platforms"]:
lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}")
lines.append("")
# Tool usage
if report["tools"]:
lines.append(" 🔧 Top Tools")
lines.append(" " + "" * 56)
lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}")
for t in report["tools"][:15]: # Top 15
lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%")
if len(report["tools"]) > 15:
lines.append(f" ... and {len(report['tools']) - 15} more tools")
lines.append("")
# Activity patterns
act = report.get("activity", {})
if act.get("by_day"):
lines.append(" 📅 Activity Patterns")
lines.append(" " + "" * 56)
# Day of week chart
day_values = [d["count"] for d in act["by_day"]]
bars = _bar_chart(day_values, max_width=15)
for i, d in enumerate(act["by_day"]):
bar = bars[i]
lines.append(f" {d['day']} {bar:<15} {d['count']}")
lines.append("")
# Peak hours (show top 5 busiest hours)
busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True)
busy_hours = [h for h in busy_hours if h["count"] > 0][:5]
if busy_hours:
hour_strs = []
for h in busy_hours:
hr = h["hour"]
ampm = "AM" if hr < 12 else "PM"
display_hr = hr % 12 or 12
hour_strs.append(f"{display_hr}{ampm} ({h['count']})")
lines.append(f" Peak hours: {', '.join(hour_strs)}")
if act.get("active_days"):
lines.append(f" Active days: {act['active_days']}")
if act.get("max_streak") and act["max_streak"] > 1:
lines.append(f" Best streak: {act['max_streak']} consecutive days")
lines.append("")
# Notable sessions
if report.get("top_sessions"):
lines.append(" 🏆 Notable Sessions")
lines.append(" " + "" * 56)
for ts in report["top_sessions"]:
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
lines.append("")
return "\n".join(lines)
def format_gateway(self, report: Dict) -> str:
"""Format the insights report for gateway/messaging (shorter)."""
if report.get("empty"):
days = report.get("days", 30)
return f"No sessions found in the last {days} days."
lines = []
o = report["overview"]
days = report["days"]
lines.append(f"📊 **Hermes Insights** — Last {days} days\n")
# Overview
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
cost_note = ""
if o.get("models_without_pricing"):
cost_note = " _(excludes custom/self-hosted models)_"
lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}{cost_note}")
if o["total_hours"] > 0:
lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}")
lines.append("")
# Models (top 5)
if report["models"]:
lines.append("**🤖 Models:**")
for m in report["models"][:5]:
cost_str = f"${m['cost']:.2f}" if m.get("has_pricing") else "N/A"
lines.append(f" {m['model'][:25]}{m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}")
lines.append("")
# Platforms (if multi-platform)
if len(report["platforms"]) > 1:
lines.append("**📱 Platforms:**")
for p in report["platforms"]:
lines.append(f" {p['platform']}{p['sessions']} sessions, {p['messages']:,} msgs")
lines.append("")
# Tools (top 8)
if report["tools"]:
lines.append("**🔧 Top Tools:**")
for t in report["tools"][:8]:
lines.append(f" {t['tool']}{t['count']:,} calls ({t['percentage']:.1f}%)")
lines.append("")
# Activity summary
act = report.get("activity", {})
if act.get("busiest_day") and act.get("busiest_hour"):
hr = act["busiest_hour"]["hour"]
ampm = "AM" if hr < 12 else "PM"
display_hr = hr % 12 or 12
lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)")
if act.get("active_days"):
lines.append(f"**Active days:** {act['active_days']}", )
if act.get("max_streak", 0) > 1:
lines.append(f"**Best streak:** {act['max_streak']} consecutive days")
return "\n".join(lines)

35
cli.py
View file

@ -1858,6 +1858,8 @@ class HermesCLI:
self._manual_compress()
elif cmd_lower == "/usage":
self._show_usage()
elif cmd_lower.startswith("/insights"):
self._show_insights(cmd_original)
elif cmd_lower == "/paste":
self._handle_paste_command()
elif cmd_lower == "/reload-mcp":
@ -1983,6 +1985,39 @@ class HermesCLI:
for quiet_logger in ('tools', 'minisweagent', 'run_agent', 'trajectory_compressor', 'cron', 'hermes_cli'):
logging.getLogger(quiet_logger).setLevel(logging.ERROR)
def _show_insights(self, command: str = "/insights"):
"""Show usage insights and analytics from session history."""
# Parse optional --days flag
parts = command.split()
days = 30
source = None
i = 1
while i < len(parts):
if parts[i] == "--days" and i + 1 < len(parts):
try:
days = int(parts[i + 1])
except ValueError:
print(f" Invalid --days value: {parts[i + 1]}")
return
i += 2
elif parts[i] == "--source" and i + 1 < len(parts):
source = parts[i + 1]
i += 2
else:
i += 1
try:
from hermes_state import SessionDB
from agent.insights import InsightsEngine
db = SessionDB()
engine = InsightsEngine(db)
report = engine.generate(days=days, source=source)
print(engine.format_terminal(report))
db.close()
except Exception as e:
print(f" Error generating insights: {e}")
def _reload_mcp(self):
"""Reload MCP servers: disconnect all, re-read config.yaml, reconnect.

View file

@ -659,7 +659,7 @@ class GatewayRunner:
# Emit command:* hook for any recognized slash command
_known_commands = {"new", "reset", "help", "status", "stop", "model",
"personality", "retry", "undo", "sethome", "set-home",
"compress", "usage", "reload-mcp", "update"}
"compress", "usage", "insights", "reload-mcp", "update"}
if command and command in _known_commands:
await self.hooks.emit(f"command:{command}", {
"platform": source.platform.value if source.platform else "",
@ -701,6 +701,9 @@ class GatewayRunner:
if command == "usage":
return await self._handle_usage_command(event)
if command == "insights":
return await self._handle_insights_command(event)
if command == "reload-mcp":
return await self._handle_reload_mcp_command(event)
@ -1104,6 +1107,7 @@ class GatewayRunner:
"`/sethome` — Set this chat as the home channel",
"`/compress` — Compress conversation context",
"`/usage` — Show token usage for this session",
"`/insights [days]` — Show usage insights and analytics",
"`/reload-mcp` — Reload MCP servers from config",
"`/update` — Update Hermes Agent to the latest version",
"`/help` — Show this message",
@ -1397,6 +1401,53 @@ class GatewayRunner:
)
return "No usage data available for this session."
async def _handle_insights_command(self, event: MessageEvent) -> str:
"""Handle /insights command -- show usage insights and analytics."""
import asyncio as _asyncio
args = event.get_command_args().strip()
days = 30
source = None
# Parse simple args: /insights 7 or /insights --days 7
if args:
parts = args.split()
i = 0
while i < len(parts):
if parts[i] == "--days" and i + 1 < len(parts):
try:
days = int(parts[i + 1])
except ValueError:
return f"Invalid --days value: {parts[i + 1]}"
i += 2
elif parts[i] == "--source" and i + 1 < len(parts):
source = parts[i + 1]
i += 2
elif parts[i].isdigit():
days = int(parts[i])
i += 1
else:
i += 1
try:
from hermes_state import SessionDB
from agent.insights import InsightsEngine
loop = _asyncio.get_event_loop()
def _run_insights():
db = SessionDB()
engine = InsightsEngine(db)
report = engine.generate(days=days, source=source)
result = engine.format_gateway(report)
db.close()
return result
return await loop.run_in_executor(None, _run_insights)
except Exception as e:
logger.error("Insights command error: %s", e, exc_info=True)
return f"Error generating insights: {e}"
async def _handle_reload_mcp_command(self, event: MessageEvent) -> str:
"""Handle /reload-mcp command -- disconnect and reconnect all MCP servers."""
loop = asyncio.get_event_loop()

View file

@ -28,6 +28,7 @@ COMMANDS = {
"/verbose": "Cycle tool progress display: off → new → all → verbose",
"/compress": "Manually compress conversation context (flush memories + summarize)",
"/usage": "Show token usage for the current session",
"/insights": "Show usage insights and analytics (last 30 days)",
"/quit": "Exit the CLI (also: /exit, /q)",
}

View file

@ -1625,6 +1625,32 @@ For more help on a command:
sessions_parser.set_defaults(func=cmd_sessions)
# =========================================================================
# insights command
# =========================================================================
insights_parser = subparsers.add_parser(
"insights",
help="Show usage insights and analytics",
description="Analyze session history to show token usage, costs, tool patterns, and activity trends"
)
insights_parser.add_argument("--days", type=int, default=30, help="Number of days to analyze (default: 30)")
insights_parser.add_argument("--source", help="Filter by platform (cli, telegram, discord, etc.)")
def cmd_insights(args):
try:
from hermes_state import SessionDB
from agent.insights import InsightsEngine
db = SessionDB()
engine = InsightsEngine(db)
report = engine.generate(days=args.days, source=args.source)
print(engine.format_terminal(report))
db.close()
except Exception as e:
print(f"Error generating insights: {e}")
insights_parser.set_defaults(func=cmd_insights)
# =========================================================================
# version command
# =========================================================================

739
tests/test_insights.py Normal file
View file

@ -0,0 +1,739 @@
"""Tests for agent/insights.py — InsightsEngine analytics and reporting."""
import time
import pytest
from pathlib import Path
from hermes_state import SessionDB
from agent.insights import (
InsightsEngine,
_get_pricing,
_estimate_cost,
_format_duration,
_bar_chart,
_has_known_pricing,
_DEFAULT_PRICING,
)
@pytest.fixture()
def db(tmp_path):
"""Create a SessionDB with a temp database file."""
db_path = tmp_path / "test_insights.db"
session_db = SessionDB(db_path=db_path)
yield session_db
session_db.close()
@pytest.fixture()
def populated_db(db):
"""Create a DB with realistic session data for insights testing."""
now = time.time()
day = 86400
# Session 1: CLI, claude-sonnet, ended, 2 days ago
db.create_session(
session_id="s1", source="cli",
model="anthropic/claude-sonnet-4-20250514", user_id="user1",
)
# Backdate the started_at
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's1'", (now - 2 * day,))
db.end_session("s1", end_reason="user_exit")
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's1'", (now - 2 * day + 3600,))
db.update_token_counts("s1", input_tokens=50000, output_tokens=15000)
db.append_message("s1", role="user", content="Hello, help me fix a bug")
db.append_message("s1", role="assistant", content="Sure, let me look into that.")
db.append_message("s1", role="assistant", content="Let me search the files.",
tool_calls=[{"function": {"name": "search_files"}}])
db.append_message("s1", role="tool", content="Found 3 matches", tool_name="search_files")
db.append_message("s1", role="assistant", content="Let me read the file.",
tool_calls=[{"function": {"name": "read_file"}}])
db.append_message("s1", role="tool", content="file contents...", tool_name="read_file")
db.append_message("s1", role="assistant", content="I found the bug. Let me fix it.",
tool_calls=[{"function": {"name": "patch"}}])
db.append_message("s1", role="tool", content="patched successfully", tool_name="patch")
db.append_message("s1", role="user", content="Thanks!")
db.append_message("s1", role="assistant", content="You're welcome!")
# Session 2: Telegram, gpt-4o, ended, 5 days ago
db.create_session(
session_id="s2", source="telegram",
model="gpt-4o", user_id="user1",
)
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's2'", (now - 5 * day,))
db.end_session("s2", end_reason="timeout")
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's2'", (now - 5 * day + 1800,))
db.update_token_counts("s2", input_tokens=20000, output_tokens=8000)
db.append_message("s2", role="user", content="Search the web for something")
db.append_message("s2", role="assistant", content="Searching...",
tool_calls=[{"function": {"name": "web_search"}}])
db.append_message("s2", role="tool", content="results...", tool_name="web_search")
db.append_message("s2", role="assistant", content="Here's what I found")
# Session 3: CLI, deepseek-chat, ended, 10 days ago
db.create_session(
session_id="s3", source="cli",
model="deepseek-chat", user_id="user1",
)
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's3'", (now - 10 * day,))
db.end_session("s3", end_reason="user_exit")
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's3'", (now - 10 * day + 7200,))
db.update_token_counts("s3", input_tokens=100000, output_tokens=40000)
db.append_message("s3", role="user", content="Run this terminal command")
db.append_message("s3", role="assistant", content="Running...",
tool_calls=[{"function": {"name": "terminal"}}])
db.append_message("s3", role="tool", content="output...", tool_name="terminal")
db.append_message("s3", role="assistant", content="Let me run another",
tool_calls=[{"function": {"name": "terminal"}}])
db.append_message("s3", role="tool", content="more output...", tool_name="terminal")
db.append_message("s3", role="assistant", content="And search files",
tool_calls=[{"function": {"name": "search_files"}}])
db.append_message("s3", role="tool", content="found stuff", tool_name="search_files")
# Session 4: Discord, same model as s1, ended, 1 day ago
db.create_session(
session_id="s4", source="discord",
model="anthropic/claude-sonnet-4-20250514", user_id="user2",
)
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's4'", (now - 1 * day,))
db.end_session("s4", end_reason="user_exit")
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's4'", (now - 1 * day + 900,))
db.update_token_counts("s4", input_tokens=10000, output_tokens=5000)
db.append_message("s4", role="user", content="Quick question")
db.append_message("s4", role="assistant", content="Sure, go ahead")
# Session 5: Old session, 45 days ago (should be excluded from 30-day window)
db.create_session(
session_id="s_old", source="cli",
model="gpt-4o-mini", user_id="user1",
)
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's_old'", (now - 45 * day,))
db.end_session("s_old", end_reason="user_exit")
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's_old'", (now - 45 * day + 600,))
db.update_token_counts("s_old", input_tokens=5000, output_tokens=2000)
db.append_message("s_old", role="user", content="old message")
db.append_message("s_old", role="assistant", content="old reply")
db._conn.commit()
return db
# =========================================================================
# Pricing helpers
# =========================================================================
class TestPricing:
def test_exact_match(self):
pricing = _get_pricing("gpt-4o")
assert pricing["input"] == 2.50
assert pricing["output"] == 10.00
def test_provider_prefix_stripped(self):
pricing = _get_pricing("anthropic/claude-sonnet-4-20250514")
assert pricing["input"] == 3.00
assert pricing["output"] == 15.00
def test_prefix_match(self):
pricing = _get_pricing("claude-3-5-sonnet-20241022")
assert pricing["input"] == 3.00
def test_keyword_heuristic_opus(self):
pricing = _get_pricing("some-new-opus-model")
assert pricing["input"] == 15.00
assert pricing["output"] == 75.00
def test_keyword_heuristic_haiku(self):
pricing = _get_pricing("anthropic/claude-haiku-future")
assert pricing["input"] == 0.80
def test_unknown_model_returns_zero_cost(self):
"""Unknown/custom models should NOT have fabricated costs."""
pricing = _get_pricing("totally-unknown-model-xyz")
assert pricing == _DEFAULT_PRICING
assert pricing["input"] == 0.0
assert pricing["output"] == 0.0
def test_custom_endpoint_model_zero_cost(self):
"""Self-hosted models should return zero cost."""
for model in ["FP16_Hermes_4.5", "Hermes_4.5_1T_epoch2", "my-local-llama"]:
pricing = _get_pricing(model)
assert pricing["input"] == 0.0, f"{model} should have zero cost"
assert pricing["output"] == 0.0, f"{model} should have zero cost"
def test_none_model(self):
pricing = _get_pricing(None)
assert pricing == _DEFAULT_PRICING
def test_empty_model(self):
pricing = _get_pricing("")
assert pricing == _DEFAULT_PRICING
def test_deepseek_heuristic(self):
pricing = _get_pricing("deepseek-v3")
assert pricing["input"] == 0.14
def test_gemini_heuristic(self):
pricing = _get_pricing("gemini-3.0-ultra")
assert pricing["input"] == 0.15
def test_dated_model_gpt4o_mini(self):
"""gpt-4o-mini-2024-07-18 should match gpt-4o-mini, NOT gpt-4o."""
pricing = _get_pricing("gpt-4o-mini-2024-07-18")
assert pricing["input"] == 0.15 # gpt-4o-mini price, not gpt-4o's 2.50
def test_dated_model_o3_mini(self):
"""o3-mini-2025-01-31 should match o3-mini, NOT o3."""
pricing = _get_pricing("o3-mini-2025-01-31")
assert pricing["input"] == 1.10 # o3-mini price, not o3's 10.00
def test_dated_model_gpt41_mini(self):
"""gpt-4.1-mini-2025-04-14 should match gpt-4.1-mini, NOT gpt-4.1."""
pricing = _get_pricing("gpt-4.1-mini-2025-04-14")
assert pricing["input"] == 0.40 # gpt-4.1-mini, not gpt-4.1's 2.00
def test_dated_model_gpt41_nano(self):
"""gpt-4.1-nano-2025-04-14 should match gpt-4.1-nano, NOT gpt-4.1."""
pricing = _get_pricing("gpt-4.1-nano-2025-04-14")
assert pricing["input"] == 0.10 # gpt-4.1-nano, not gpt-4.1's 2.00
class TestHasKnownPricing:
def test_known_commercial_model(self):
assert _has_known_pricing("gpt-4o") is True
assert _has_known_pricing("anthropic/claude-sonnet-4-20250514") is True
assert _has_known_pricing("deepseek-chat") is True
def test_unknown_custom_model(self):
assert _has_known_pricing("FP16_Hermes_4.5") is False
assert _has_known_pricing("my-custom-model") is False
assert _has_known_pricing("") is False
assert _has_known_pricing(None) is False
def test_heuristic_matched_models(self):
"""Models matched by keyword heuristics should be considered known."""
assert _has_known_pricing("some-opus-model") is True
assert _has_known_pricing("future-sonnet-v2") is True
class TestEstimateCost:
def test_basic_cost(self):
# gpt-4o: 2.50/M input, 10.00/M output
cost = _estimate_cost("gpt-4o", 1_000_000, 1_000_000)
assert cost == pytest.approx(12.50, abs=0.01)
def test_zero_tokens(self):
cost = _estimate_cost("gpt-4o", 0, 0)
assert cost == 0.0
def test_small_usage(self):
cost = _estimate_cost("gpt-4o", 1000, 500)
# 1000 * 2.50/1M + 500 * 10.00/1M = 0.0025 + 0.005 = 0.0075
assert cost == pytest.approx(0.0075, abs=0.0001)
# =========================================================================
# Format helpers
# =========================================================================
class TestFormatDuration:
def test_seconds(self):
assert _format_duration(45) == "45s"
def test_minutes(self):
assert _format_duration(300) == "5m"
def test_hours_with_minutes(self):
result = _format_duration(5400) # 1.5 hours
assert result == "1h 30m"
def test_exact_hours(self):
assert _format_duration(7200) == "2h"
def test_days(self):
result = _format_duration(172800) # 2 days
assert result == "2.0d"
class TestBarChart:
def test_basic_bars(self):
bars = _bar_chart([10, 5, 0, 20], max_width=10)
assert len(bars) == 4
assert len(bars[3]) == 10 # max value gets full width
assert len(bars[0]) == 5 # half of max
assert bars[2] == "" # zero gets empty
def test_empty_values(self):
bars = _bar_chart([], max_width=10)
assert bars == []
def test_all_zeros(self):
bars = _bar_chart([0, 0, 0], max_width=10)
assert all(b == "" for b in bars)
def test_single_value(self):
bars = _bar_chart([5], max_width=10)
assert len(bars) == 1
assert len(bars[0]) == 10
# =========================================================================
# InsightsEngine — empty DB
# =========================================================================
class TestInsightsEmpty:
def test_empty_db_returns_empty_report(self, db):
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report["empty"] is True
assert report["overview"] == {}
def test_empty_db_terminal_format(self, db):
engine = InsightsEngine(db)
report = engine.generate(days=30)
text = engine.format_terminal(report)
assert "No sessions found" in text
def test_empty_db_gateway_format(self, db):
engine = InsightsEngine(db)
report = engine.generate(days=30)
text = engine.format_gateway(report)
assert "No sessions found" in text
# =========================================================================
# InsightsEngine — populated DB
# =========================================================================
class TestInsightsPopulated:
def test_generate_returns_all_sections(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
assert report["empty"] is False
assert "overview" in report
assert "models" in report
assert "platforms" in report
assert "tools" in report
assert "activity" in report
assert "top_sessions" in report
def test_overview_session_count(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
overview = report["overview"]
# s1, s2, s3, s4 are within 30 days; s_old is 45 days ago
assert overview["total_sessions"] == 4
def test_overview_token_totals(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
overview = report["overview"]
expected_input = 50000 + 20000 + 100000 + 10000
expected_output = 15000 + 8000 + 40000 + 5000
assert overview["total_input_tokens"] == expected_input
assert overview["total_output_tokens"] == expected_output
assert overview["total_tokens"] == expected_input + expected_output
def test_overview_cost_positive(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
assert report["overview"]["estimated_cost"] > 0
def test_overview_duration_stats(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
overview = report["overview"]
# All 4 sessions have durations
assert overview["total_hours"] > 0
assert overview["avg_session_duration"] > 0
def test_model_breakdown(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
models = report["models"]
# Should have 3 distinct models (claude-sonnet x2, gpt-4o, deepseek-chat)
model_names = [m["model"] for m in models]
assert "claude-sonnet-4-20250514" in model_names
assert "gpt-4o" in model_names
assert "deepseek-chat" in model_names
# Claude-sonnet has 2 sessions (s1 + s4)
claude = next(m for m in models if "claude-sonnet" in m["model"])
assert claude["sessions"] == 2
def test_platform_breakdown(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
platforms = report["platforms"]
platform_names = [p["platform"] for p in platforms]
assert "cli" in platform_names
assert "telegram" in platform_names
assert "discord" in platform_names
cli = next(p for p in platforms if p["platform"] == "cli")
assert cli["sessions"] == 2 # s1 + s3
def test_tool_breakdown(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
tools = report["tools"]
tool_names = [t["tool"] for t in tools]
assert "terminal" in tool_names
assert "search_files" in tool_names
assert "read_file" in tool_names
assert "patch" in tool_names
assert "web_search" in tool_names
# terminal was used 2x in s3
terminal = next(t for t in tools if t["tool"] == "terminal")
assert terminal["count"] == 2
# Percentages should sum to ~100%
total_pct = sum(t["percentage"] for t in tools)
assert total_pct == pytest.approx(100.0, abs=0.1)
def test_activity_patterns(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
activity = report["activity"]
assert len(activity["by_day"]) == 7
assert len(activity["by_hour"]) == 24
assert activity["active_days"] >= 1
assert activity["busiest_day"] is not None
assert activity["busiest_hour"] is not None
def test_top_sessions(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
top = report["top_sessions"]
labels = [t["label"] for t in top]
assert "Longest session" in labels
assert "Most messages" in labels
assert "Most tokens" in labels
assert "Most tool calls" in labels
def test_source_filter_cli(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30, source="cli")
assert report["overview"]["total_sessions"] == 2 # s1, s3
def test_source_filter_telegram(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30, source="telegram")
assert report["overview"]["total_sessions"] == 1 # s2
def test_source_filter_nonexistent(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30, source="slack")
assert report["empty"] is True
def test_days_filter_short(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=3)
# Only s1 (2 days ago) and s4 (1 day ago) should be included
assert report["overview"]["total_sessions"] == 2
def test_days_filter_long(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=60)
# All 5 sessions should be included
assert report["overview"]["total_sessions"] == 5
# =========================================================================
# Formatting
# =========================================================================
class TestTerminalFormatting:
def test_terminal_format_has_sections(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_terminal(report)
assert "Hermes Insights" in text
assert "Overview" in text
assert "Models Used" in text
assert "Top Tools" in text
assert "Activity Patterns" in text
assert "Notable Sessions" in text
def test_terminal_format_shows_tokens(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_terminal(report)
assert "Input tokens" in text
assert "Output tokens" in text
assert "Est. cost" in text
assert "$" in text
def test_terminal_format_shows_platforms(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_terminal(report)
# Multi-platform, so Platforms section should show
assert "Platforms" in text
assert "cli" in text
assert "telegram" in text
def test_terminal_format_shows_bar_chart(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_terminal(report)
assert "" in text # Bar chart characters
def test_terminal_format_shows_na_for_custom_models(self, db):
"""Custom models should show N/A instead of fake cost."""
db.create_session(session_id="s1", source="cli", model="my-custom-model")
db.update_token_counts("s1", input_tokens=1000, output_tokens=500)
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
text = engine.format_terminal(report)
assert "N/A" in text
assert "custom/self-hosted" in text
class TestGatewayFormatting:
def test_gateway_format_is_shorter(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
terminal_text = engine.format_terminal(report)
gateway_text = engine.format_gateway(report)
assert len(gateway_text) < len(terminal_text)
def test_gateway_format_has_bold(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_gateway(report)
assert "**" in text # Markdown bold
def test_gateway_format_shows_cost(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_gateway(report)
assert "$" in text
assert "Est. cost" in text
def test_gateway_format_shows_models(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
text = engine.format_gateway(report)
assert "Models" in text
assert "sessions" in text
# =========================================================================
# Edge cases
# =========================================================================
class TestEdgeCases:
def test_session_with_no_tokens(self, db):
"""Sessions with zero tokens should not crash."""
db.create_session(session_id="s1", source="cli", model="test-model")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report["empty"] is False
assert report["overview"]["total_tokens"] == 0
assert report["overview"]["estimated_cost"] == 0.0
def test_session_with_no_end_time(self, db):
"""Active (non-ended) sessions should be included but duration = 0."""
db.create_session(session_id="s1", source="cli", model="test-model")
db.update_token_counts("s1", input_tokens=1000, output_tokens=500)
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
# Session included
assert report["overview"]["total_sessions"] == 1
assert report["overview"]["total_tokens"] == 1500
# But no duration stats (session not ended)
assert report["overview"]["total_hours"] == 0
def test_session_with_no_model(self, db):
"""Sessions with NULL model should not crash."""
db.create_session(session_id="s1", source="cli")
db.update_token_counts("s1", input_tokens=1000, output_tokens=500)
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report["empty"] is False
models = report["models"]
assert len(models) == 1
assert models[0]["model"] == "unknown"
assert models[0]["has_pricing"] is False
def test_custom_model_shows_zero_cost(self, db):
"""Custom/self-hosted models should show $0 cost, not fake estimates."""
db.create_session(session_id="s1", source="cli", model="FP16_Hermes_4.5")
db.update_token_counts("s1", input_tokens=100000, output_tokens=50000)
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report["overview"]["estimated_cost"] == 0.0
assert "FP16_Hermes_4.5" in report["overview"]["models_without_pricing"]
models = report["models"]
custom = next(m for m in models if m["model"] == "FP16_Hermes_4.5")
assert custom["cost"] == 0.0
assert custom["has_pricing"] is False
def test_tool_usage_from_tool_calls_json(self, db):
"""Tool usage should be extracted from tool_calls JSON when tool_name is NULL."""
import json as _json
db.create_session(session_id="s1", source="cli", model="test")
# Assistant message with tool_calls (this is what CLI produces)
db.append_message("s1", role="assistant", content="Let me search",
tool_calls=[{"id": "call_1", "type": "function",
"function": {"name": "search_files", "arguments": "{}"}}])
# Tool response WITHOUT tool_name (this is the CLI bug)
db.append_message("s1", role="tool", content="found results",
tool_call_id="call_1")
db.append_message("s1", role="assistant", content="Now reading",
tool_calls=[{"id": "call_2", "type": "function",
"function": {"name": "read_file", "arguments": "{}"}}])
db.append_message("s1", role="tool", content="file content",
tool_call_id="call_2")
db.append_message("s1", role="assistant", content="And searching again",
tool_calls=[{"id": "call_3", "type": "function",
"function": {"name": "search_files", "arguments": "{}"}}])
db.append_message("s1", role="tool", content="more results",
tool_call_id="call_3")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
tools = report["tools"]
# Should find tools from tool_calls JSON even though tool_name is NULL
tool_names = [t["tool"] for t in tools]
assert "search_files" in tool_names
assert "read_file" in tool_names
# search_files was called twice
sf = next(t for t in tools if t["tool"] == "search_files")
assert sf["count"] == 2
def test_overview_pricing_sets_are_lists(self, db):
"""models_with/without_pricing should be JSON-serializable lists."""
import json as _json
db.create_session(session_id="s1", source="cli", model="gpt-4o")
db.create_session(session_id="s2", source="cli", model="my-custom")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
overview = report["overview"]
assert isinstance(overview["models_with_pricing"], list)
assert isinstance(overview["models_without_pricing"], list)
# Should be JSON-serializable
_json.dumps(report["overview"]) # would raise if sets present
def test_mixed_commercial_and_custom_models(self, db):
"""Mix of commercial and custom models: only commercial ones get costs."""
db.create_session(session_id="s1", source="cli", model="gpt-4o")
db.update_token_counts("s1", input_tokens=10000, output_tokens=5000)
db.create_session(session_id="s2", source="cli", model="my-local-llama")
db.update_token_counts("s2", input_tokens=10000, output_tokens=5000)
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
# Cost should only come from gpt-4o, not from the custom model
overview = report["overview"]
assert overview["estimated_cost"] > 0
assert "gpt-4o" in overview["models_with_pricing"] # list now, not set
assert "my-local-llama" in overview["models_without_pricing"]
# Verify individual model entries
gpt = next(m for m in report["models"] if m["model"] == "gpt-4o")
assert gpt["has_pricing"] is True
assert gpt["cost"] > 0
llama = next(m for m in report["models"] if m["model"] == "my-local-llama")
assert llama["has_pricing"] is False
assert llama["cost"] == 0.0
def test_single_session_streak(self, db):
"""Single session should have streak of 0 or 1."""
db.create_session(session_id="s1", source="cli", model="test")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report["activity"]["max_streak"] <= 1
def test_no_tool_calls(self, db):
"""Sessions with no tool calls should produce empty tools list."""
db.create_session(session_id="s1", source="cli", model="test")
db.append_message("s1", role="user", content="hello")
db.append_message("s1", role="assistant", content="hi there")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report["tools"] == []
def test_only_one_platform(self, db):
"""Single-platform usage should still work."""
db.create_session(session_id="s1", source="cli", model="test")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert len(report["platforms"]) == 1
assert report["platforms"][0]["platform"] == "cli"
# Terminal format should NOT show platform section for single platform
text = engine.format_terminal(report)
# (it still shows platforms section if there's only cli and nothing else)
# Actually the condition is > 1 platforms OR non-cli, so single cli won't show
def test_large_days_value(self, db):
"""Very large days value should not crash."""
db.create_session(session_id="s1", source="cli", model="test")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=365)
assert report["empty"] is False
def test_zero_days(self, db):
"""Zero days should return empty (nothing is in the future)."""
db.create_session(session_id="s1", source="cli", model="test")
db._conn.commit()
engine = InsightsEngine(db)
report = engine.generate(days=0)
# Depending on timing, might catch the session if created <1s ago
# Just verify it doesn't crash
assert "empty" in report