mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-12 08:51:53 +00:00
Hermes can propose automations and let the user accept them with one tap via /suggestions, instead of making them assemble cron jobs by hand. Every proposal — wherever it originates — flows through one surface. Sources (the 'where suggestions come from'): - catalog: curated starter automations (daily briefing, important-mail monitor, weekly review, workday-start reminder) via /suggestions catalog - recipe: installing a skill that carries a metadata.hermes.recipe block registers a suggestion instead of auto-scheduling - usage / integration: reserved for the background-review detector and account-connect triggers (sources defined; emitters land next) Pieces: - cron/suggestions.py — the store. add/list/accept/dismiss, dedup+latch by key (dismissed proposals never re-offered), pending cap so it can't become a nag wall. Accepting calls the existing cron.jobs.create_job — there is NO second job engine. Mirrors jobs.py storage (atomic writes, lock, 0600). - cron/suggestion_catalog.py — the curated set. The important-mail monitor entry is where the old proactive-monitor poll->classify->surface engine lives now (cron/scripts/classify_items.py + the 'monitor' aux task), as ONE catalog automation rather than a standalone feature. - tools/recipes.py — recipe<->job bridge; register_recipe_suggestion() makes a recipe source 'recipe' of this surface. recipe_to_job_spec() is the single translation both the direct and suggestion paths share. - hermes_cli/suggestions_cmd.py — shared /suggestions handler (CLI + gateway never drift); /suggestions [accept N|dismiss N|catalog|clear]. - Wired: CommandDef + CLI dispatch (cli.py) + gateway dispatch (gateway/run.py) + aux 'monitor' task (config.py) + recipe-install hook (skills_hub.py). Consent-first throughout: nothing auto-schedules; acceptance is always explicit; dismissals latch. Supersedes #41122 (proactive-monitor) and #41127 (recipes): both fold in here as a catalog entry and a suggestion source respectively. Tests: store (dedup/cap/accept/dismiss/latch), catalog seeding+idempotency, recipe->suggestion bridge, command handler, aux config. E2E: recipe SKILL.md -> parsed -> suggested -> accepted -> real cron job persisted to jobs.json.
226 lines
8.3 KiB
Python
226 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Classify candidate items by urgency/importance and emit only the urgent ones.
|
|
|
|
The proactive-monitor pattern: a fetch step (a watcher script, an inbox dump, a
|
|
feed) produces a list of candidate items; this script scores each with a cheap
|
|
LLM and prints ONLY the items at or above a threshold. Below-threshold runs
|
|
print nothing, so a cron job wrapping this stays silent unless something
|
|
actually matters -- mirroring Poke's email monitor (fetch -> classify urgency
|
|
-> surface only what's above the bar).
|
|
|
|
Design choices:
|
|
* Uses Hermes' auxiliary client with task="monitor", so the classifier model
|
|
is configured once in config.yaml (auxiliary.monitor.{provider,model}) and
|
|
can be a cheap fast model independent of the main chat model.
|
|
* Reads items as JSON (a list of objects) from stdin or --input-file.
|
|
* One LLM call scores the whole batch (cheap, single round-trip) and returns
|
|
structured scores; we filter locally.
|
|
* Empty result -> empty stdout -> the cron job's [SILENT]/empty-stdout path
|
|
suppresses delivery. No spam on quiet intervals.
|
|
|
|
Usage (standalone):
|
|
cat items.json | python classify_items.py --threshold 7 \
|
|
--criteria "Urgent if it needs a reply today or is from my manager/family"
|
|
|
|
Usage (wired to a watcher via cron, agent mode):
|
|
Ask the agent: "Every 10 minutes, run watch_http_json.py for my inbox feed,
|
|
pipe its JSON into classify_items.py with my urgency criteria, and deliver
|
|
whatever it prints. Stay silent if it prints nothing."
|
|
|
|
Item schema (flexible): each item is an object; the classifier sees the whole
|
|
object. A "title"/"subject"/"summary"/"text" field helps it judge. An "id"
|
|
field (any of id/guid/message_id/url) is echoed back so duplicates can be
|
|
deduped upstream.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
def _eprint(*args: Any) -> None:
|
|
print(*args, file=sys.stderr)
|
|
|
|
|
|
def _load_items(input_file: Optional[str]) -> List[Dict[str, Any]]:
|
|
raw = ""
|
|
if input_file:
|
|
with open(input_file, encoding="utf-8") as f:
|
|
raw = f.read()
|
|
else:
|
|
raw = sys.stdin.read()
|
|
raw = raw.strip()
|
|
if not raw:
|
|
return []
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError as e:
|
|
_eprint(f"classify_items: input is not valid JSON: {e}")
|
|
sys.exit(2)
|
|
if isinstance(data, dict):
|
|
# Allow {"items": [...]} or a single object.
|
|
if isinstance(data.get("items"), list):
|
|
return data["items"]
|
|
return [data]
|
|
if isinstance(data, list):
|
|
return [x for x in data if isinstance(x, dict)]
|
|
_eprint("classify_items: expected a JSON list or {items: [...]}")
|
|
sys.exit(2)
|
|
|
|
|
|
def _item_id(item: Dict[str, Any], index: int) -> str:
|
|
for key in ("id", "guid", "message_id", "url", "link"):
|
|
val = item.get(key)
|
|
if val:
|
|
return str(val)
|
|
return f"item-{index}"
|
|
|
|
|
|
_CLASSIFY_INSTRUCTIONS = (
|
|
"You are an urgency classifier for a proactive assistant. You will be given "
|
|
"a numbered list of items and the user's importance criteria. Score EACH "
|
|
"item from 0 (ignore entirely) to 10 (interrupt the user now). Return ONLY a "
|
|
"JSON array, one object per item, in the same order: "
|
|
'[{"index": <int>, "score": <int 0-10>, "reason": "<short>"}]. '
|
|
"No prose, no markdown fences. Be conservative: most items should score low. "
|
|
"Only score high when the item clearly meets the user's criteria."
|
|
)
|
|
|
|
|
|
def _build_prompt(items: List[Dict[str, Any]], criteria: str) -> str:
|
|
lines = [f"USER IMPORTANCE CRITERIA:\n{criteria}\n", "ITEMS:"]
|
|
for i, item in enumerate(items):
|
|
# Show a compact view; the model sees the salient fields.
|
|
view = {
|
|
k: item[k]
|
|
for k in ("title", "subject", "summary", "text", "body", "from", "sender", "url")
|
|
if k in item
|
|
}
|
|
if not view:
|
|
view = item # fall back to the whole object
|
|
lines.append(f"[{i}] {json.dumps(view, ensure_ascii=False)[:1200]}")
|
|
lines.append(
|
|
"\nReturn the JSON array of scores now (one object per item, same order)."
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _parse_scores(content: str, n_items: int) -> Dict[int, Dict[str, Any]]:
|
|
text = (content or "").strip()
|
|
# Tolerate accidental markdown fences.
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
if "\n" in text:
|
|
text = text.split("\n", 1)[1]
|
|
try:
|
|
arr = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
# Last-ditch: find the first [...] block.
|
|
start = text.find("[")
|
|
end = text.rfind("]")
|
|
if start >= 0 and end > start:
|
|
try:
|
|
arr = json.loads(text[start : end + 1])
|
|
except json.JSONDecodeError:
|
|
_eprint("classify_items: could not parse classifier output")
|
|
return {}
|
|
else:
|
|
_eprint("classify_items: classifier returned no JSON array")
|
|
return {}
|
|
out: Dict[int, Dict[str, Any]] = {}
|
|
if isinstance(arr, list):
|
|
for obj in arr:
|
|
if not isinstance(obj, dict):
|
|
continue
|
|
idx = obj.get("index")
|
|
if isinstance(idx, int) and 0 <= idx < n_items:
|
|
out[idx] = obj
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Classify items by urgency; emit only urgent ones.")
|
|
parser.add_argument("--criteria", required=True, help="Plain-language importance criteria.")
|
|
parser.add_argument("--threshold", type=int, default=7, help="Minimum score (0-10) to surface. Default 7.")
|
|
parser.add_argument("--input-file", default=None, help="Read items JSON from this file instead of stdin.")
|
|
parser.add_argument("--format", choices=["text", "json"], default="text", help="Output format for surfaced items.")
|
|
args = parser.parse_args()
|
|
|
|
items = _load_items(args.input_file)
|
|
if not items:
|
|
# Nothing to classify -> silent. This is the common quiet-interval case.
|
|
return 0
|
|
|
|
# Import here so --help works without the package importable.
|
|
try:
|
|
from agent.auxiliary_client import call_llm
|
|
except Exception as e: # pragma: no cover - import guard
|
|
_eprint(f"classify_items: cannot import auxiliary client: {e}")
|
|
return 3
|
|
|
|
prompt = _build_prompt(items, args.criteria)
|
|
try:
|
|
resp = call_llm(
|
|
task="monitor",
|
|
messages=[{"role": "user", "content": prompt}],
|
|
max_tokens=1024,
|
|
temperature=0,
|
|
)
|
|
content = resp.choices[0].message.content
|
|
if not isinstance(content, str):
|
|
content = str(content) if content else ""
|
|
except Exception as e:
|
|
# Classification failure is NOT silent -- surface it so a broken monitor
|
|
# doesn't quietly swallow important items. Non-zero exit -> cron alerts.
|
|
_eprint(f"classify_items: classifier call failed: {e}")
|
|
return 4
|
|
|
|
scores = _parse_scores(content, len(items))
|
|
surfaced = []
|
|
for i, item in enumerate(items):
|
|
s = scores.get(i)
|
|
score = s.get("score") if isinstance(s, dict) else None
|
|
if isinstance(score, int) and score >= args.threshold:
|
|
surfaced.append((i, item, s))
|
|
|
|
if not surfaced:
|
|
# Below threshold -> silent. Empty stdout; cron suppresses delivery.
|
|
return 0
|
|
|
|
if args.format == "json":
|
|
out = [
|
|
{
|
|
"id": _item_id(item, i),
|
|
"score": s.get("score"),
|
|
"reason": s.get("reason", ""),
|
|
"item": item,
|
|
}
|
|
for (i, item, s) in surfaced
|
|
]
|
|
print(json.dumps(out, ensure_ascii=False, indent=2))
|
|
else:
|
|
blocks = []
|
|
for (i, item, s) in surfaced:
|
|
title = (
|
|
item.get("title")
|
|
or item.get("subject")
|
|
or item.get("summary")
|
|
or _item_id(item, i)
|
|
)
|
|
url = item.get("url") or item.get("link") or ""
|
|
reason = s.get("reason", "")
|
|
block = f"## [{s.get('score')}/10] {title}"
|
|
if url:
|
|
block += f"\n{url}"
|
|
if reason:
|
|
block += f"\n_{reason}_"
|
|
blocks.append(block)
|
|
print("\n\n".join(blocks))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|