diff --git a/optional-skills/devops/watchers/SKILL.md b/optional-skills/devops/watchers/SKILL.md new file mode 100644 index 0000000000..628f340b4c --- /dev/null +++ b/optional-skills/devops/watchers/SKILL.md @@ -0,0 +1,112 @@ +--- +name: watchers +description: Poll RSS, JSON APIs, and GitHub with watermark dedup. +version: 1.0.0 +author: Hermes Agent +license: MIT +platforms: [linux, macos] +metadata: + hermes: + tags: [cron, polling, rss, github, http, automation, monitoring] + category: devops + requires_toolsets: [terminal] + related_skills: [] +--- + +# Watchers + +Poll external sources on an interval and react only to new items. Three ready-made scripts plus a shared watermark helper; wire them into a cron job (or run them ad-hoc from the terminal). + +## When to Use + +- User wants to watch an RSS/Atom feed and be notified of new entries +- User wants to watch a GitHub repo's issues / pulls / releases / commits +- User wants to poll an arbitrary JSON endpoint and get notified on new items +- User asks for "a watcher for X" or "notify me when X changes" + +## Mental model + +A watcher is just a script that: + +1. Fetches data from the external source +2. Compares against a watermark file of previously-seen IDs +3. Writes the new watermark back +4. Prints new items to stdout (or nothing on no-change) + +The scripts below handle all three. The agent runs them via the terminal tool — from a cron job, a webhook, or an interactive chat — and reports what's new. + +## Ready-made scripts + +All three live in `$HERMES_HOME/skills/devops/watchers/scripts/` once the skill is installed. Each reads `WATCHER_STATE_DIR` (defaults to `$HERMES_HOME/watcher-state/`) for its state file, keyed by the `--name` argument. + +| Script | What it watches | Dedup key | +|---|---|---| +| `watch_rss.py` | RSS 2.0 or Atom feed URL | `` / `` | +| `watch_http_json.py` | Any JSON endpoint returning a list of objects | Configurable id field | +| `watch_github.py` | GitHub issues / pulls / releases / commits for a repo | `id` / `sha` | + +All three: + +- First run records a baseline — never replays existing feed +- Watermark is a bounded ID set (max 500) to cap memory +- Output format: `## \n<url>\n\n<optional body>` per item +- Empty stdout on no-new — the caller treats that as silent +- Non-zero exit on fetch errors + +## Usage + +Run a watcher directly from the terminal tool: + +```bash +python $HERMES_HOME/skills/devops/watchers/scripts/watch_rss.py \ + --name hn --url https://news.ycombinator.com/rss --max 5 +``` + +Watch a GitHub repo (set `GITHUB_TOKEN` in `~/.hermes/.env` to avoid the 60 req/hr anonymous rate limit): + +```bash +python $HERMES_HOME/skills/devops/watchers/scripts/watch_github.py \ + --name hermes-issues --repo NousResearch/hermes-agent --scope issues +``` + +Poll an arbitrary JSON API: + +```bash +python $HERMES_HOME/skills/devops/watchers/scripts/watch_http_json.py \ + --name api --url https://api.example.com/events \ + --id-field event_id --items-path data.events +``` + +## Wiring into cron + +Ask the agent to schedule a cron job with a prompt like: + +> Every 15 minutes, run `watch_rss.py --name hn --url https://news.ycombinator.com/rss`. If it prints anything, summarize the headlines and deliver them. If it prints nothing, stay silent. + +The agent invokes the script via the terminal tool inside the cron job's agent loop; no changes to cron's built-in `--script` flag are needed. + +## State files + +Every watcher writes `$HERMES_HOME/watcher-state/<name>.json`. Inspect: + +```bash +cat $HERMES_HOME/watcher-state/hn.json +``` + +Force a replay (next run treated as first poll): + +```bash +rm $HERMES_HOME/watcher-state/hn.json +``` + +## Writing your own + +All three scripts use the same template: load watermark, fetch, diff, save, emit. `scripts/_watermark.py` is the shared helper; import it to get atomic writes + bounded ID set + first-run baseline for free. See any of the three reference scripts for how little boilerplate it takes. + +## Common Pitfalls + +1. **Printing a "no new items" header every tick.** Callers rely on empty stdout = silent. If you print anything on an empty delta, you spam the channel. The shipped scripts handle this; custom scripts must too. +2. **Expecting the first run to emit items.** It won't — first run records a baseline. If you need an initial digest, delete the state file after the first run or add a `--prime-with-latest N` flag in your own script. +3. **Unbounded watermark growth.** The shared helper caps at 500 IDs. Raise it for high-churn feeds; lower it on constrained filesystems. +4. **Putting the state dir where the agent's sandbox can't write.** `$HERMES_HOME/watcher-state/` is always writable. Docker/Modal backends may not see arbitrary host paths. + diff --git a/optional-skills/devops/watchers/scripts/_watermark.py b/optional-skills/devops/watchers/scripts/_watermark.py new file mode 100755 index 0000000000..719b6804eb --- /dev/null +++ b/optional-skills/devops/watchers/scripts/_watermark.py @@ -0,0 +1,148 @@ +"""Shared watermark helper used by the three watcher scripts. + +A watermark is just a JSON file that records the IDs we've seen on previous +runs, so the next run only emits items we haven't seen before. + +Contract: +- First run: record all IDs from the fetched batch, emit nothing. +- Subsequent runs: emit items whose ID isn't in the stored set. +- Bounded: keep at most `max_seen` IDs (default 500). +- Atomic: write to a .tmp file and rename, so a crashed script can't + leave a half-written state file that permanently breaks dedup. + +Import and use from any custom watcher script: + + from _watermark import Watermark + + wm = Watermark.load("my-feed-name") + new_items = wm.filter_new(fetched_items, id_key="id") + wm.save() +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional + + +def _state_dir() -> Path: + """Where watermark files live — respects WATCHER_STATE_DIR override.""" + override = os.environ.get("WATCHER_STATE_DIR") + if override: + return Path(override) + # Default: $HERMES_HOME/watcher-state/, falling back to ~/.hermes/watcher-state/. + hermes_home = os.environ.get("HERMES_HOME") or str(Path.home() / ".hermes") + return Path(hermes_home) / "watcher-state" + + +class Watermark: + """Per-watcher state. Persisted to <state_dir>/<name>.json.""" + + def __init__(self, name: str, *, max_seen: int = 500) -> None: + if not name or not name.replace("-", "").replace("_", "").isalnum(): + raise ValueError( + f"watermark name must be alphanumeric + '-'/'_' (got {name!r})" + ) + self.name = name + self.max_seen = max_seen + self._path = _state_dir() / f"{name}.json" + self._data: Dict[str, Any] = {"seen_ids": [], "first_run": True} + + @classmethod + def load(cls, name: str, *, max_seen: int = 500) -> "Watermark": + wm = cls(name, max_seen=max_seen) + if wm._path.exists(): + try: + wm._data = json.loads(wm._path.read_text(encoding="utf-8")) + wm._data.setdefault("seen_ids", []) + wm._data["first_run"] = False + except (OSError, json.JSONDecodeError): + # Corrupt state file — treat as a first run but don't crash. + wm._data = {"seen_ids": [], "first_run": True} + return wm + + @property + def is_first_run(self) -> bool: + return bool(self._data.get("first_run", True)) + + @property + def seen(self) -> List[str]: + return list(self._data.get("seen_ids", [])) + + def filter_new( + self, items: Iterable[Dict[str, Any]], *, id_key: str = "id" + ) -> List[Dict[str, Any]]: + """Return items whose id isn't in the stored set. + + Side effect: updates the in-memory seen set with every id in the + batch (so save() persists the full new watermark). On first run, + records every id but returns an empty list (baseline, no replay). + """ + existing = set(str(x) for x in self._data.get("seen_ids", [])) + was_first_run = self.is_first_run + + new_items: List[Dict[str, Any]] = [] + batch_ids: List[str] = [] + for item in items: + ident = item.get(id_key) + if ident is None: + continue + ident_str = str(ident) + batch_ids.append(ident_str) + if ident_str in existing: + continue + if was_first_run: + continue # record but don't emit + new_items.append(item) + + combined = list(existing) + [i for i in batch_ids if i not in existing] + if len(combined) > self.max_seen: + combined = combined[-self.max_seen:] + self._data["seen_ids"] = combined + self._data["first_run"] = False + return new_items + + def save(self) -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + tmp = self._path.with_suffix(".tmp") + tmp.write_text( + json.dumps(self._data, indent=2, sort_keys=True), + encoding="utf-8", + ) + os.replace(tmp, self._path) + + +def format_items_as_markdown( + items: List[Dict[str, Any]], + *, + title_key: str = "title", + url_key: str = "url", + body_key: Optional[str] = None, + max_body_chars: int = 500, +) -> str: + """Render a list of items as Markdown for cron delivery. + + One heading per item + its URL + optional snippet of body. Output is + empty string when items is empty — cron will then treat stdout as + silent and skip delivery (existing behavior). + """ + if not items: + return "" + lines: List[str] = [] + for item in items: + title = (item.get(title_key) or "(no title)").strip() + url = (item.get(url_key) or "").strip() + lines.append(f"## {title}") + if url: + lines.append(url) + if body_key: + body = (item.get(body_key) or "").strip() + if body: + if len(body) > max_body_chars: + body = body[:max_body_chars].rstrip() + "…" + lines.append("") + lines.append(body) + lines.append("") + return "\n".join(lines).rstrip() + "\n" diff --git a/optional-skills/devops/watchers/scripts/watch_github.py b/optional-skills/devops/watchers/scripts/watch_github.py new file mode 100755 index 0000000000..bb4a3ca6f3 --- /dev/null +++ b/optional-skills/devops/watchers/scripts/watch_github.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +"""Watch GitHub activity — issues, pulls, releases, or commits — with dedup. + +Usage (via cron with --no-agent): + + hermes cron create hermes-issues \\ + --schedule "*/5 * * * *" --no-agent \\ + --script "$HERMES_HOME/skills/devops/watchers/scripts/watch_github.py" \\ + --script-args "--name hermes-issues --repo NousResearch/hermes-agent --scope issues" + +Set GITHUB_TOKEN (or GH_TOKEN) in ~/.hermes/.env to avoid the 60 req/hr +anonymous rate limit. + +Scopes: issues | pulls | releases | commits. Or pass --search QUERY to +use the /search/issues endpoint instead of /repos/:owner/:repo/:scope. +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from _watermark import Watermark, format_items_as_markdown # type: ignore + + +VALID_SCOPES = ("issues", "pulls", "releases", "commits") + + +def _flatten_commit(item): + """Commit objects nest title/author/date under 'commit' — flatten for rendering.""" + commit = item.get("commit") or {} + msg = (commit.get("message") or "").strip().splitlines() + title = msg[0] if msg else "" + body = "\n".join(msg[1:]).strip() if len(msg) > 1 else "" + author = (item.get("author") or {}).get("login") or (commit.get("author") or {}).get("name", "") + date = (commit.get("author") or {}).get("date", "") + return { + "id": item.get("sha", ""), + "title": f"{title} ({author})" if author else title, + "url": item.get("html_url"), + "body": body, + "created_at": date, + } + + +def _flatten_issue_or_release(item): + return { + "id": str(item.get("id", "")), + "title": item.get("title") or item.get("name") or "", + "url": item.get("html_url") or item.get("url"), + "body": (item.get("body") or "").strip(), + "state": item.get("state"), + "author": (item.get("user") or {}).get("login") + or (item.get("author") or {}).get("login"), + "created_at": item.get("created_at"), + } + + +def main() -> int: + p = argparse.ArgumentParser(description="Watch GitHub issues / pulls / releases / commits.") + p.add_argument("--name", required=True, help="Watcher name (used for state file)") + p.add_argument("--repo", default="", + help="owner/name of the repo (one of --repo or --search is required)") + p.add_argument("--scope", default="issues", choices=VALID_SCOPES, + help="What to poll (default: issues)") + p.add_argument("--search", default="", + help="GitHub issues search query (alternative to --repo/--scope)") + p.add_argument("--per-page", type=int, default=30, + help="Results per page (default: 30, max: 100)") + p.add_argument("--max", type=int, default=20, + help="Max new items to emit per tick (default: 20)") + p.add_argument("--with-body", action="store_true", + help="Include issue/commit body as a snippet under each item") + p.add_argument("--timeout", type=float, default=30.0, + help="HTTP timeout in seconds (default: 30)") + args = p.parse_args() + + if not args.repo and not args.search: + print("watch_github: one of --repo or --search is required", file=sys.stderr) + return 2 + if args.repo and not re.fullmatch(r"[A-Za-z0-9._-]+/[A-Za-z0-9._-]+", args.repo): + print(f"watch_github: --repo must be owner/name (got {args.repo!r})", file=sys.stderr) + return 2 + + # URL + flattening strategy. + if args.search: + url = ( + "https://api.github.com/search/issues" + f"?q={urllib.parse.quote(args.search)}&per_page={args.per_page}" + ) + flatten = _flatten_issue_or_release + items_path = "items" + elif args.scope == "commits": + url = f"https://api.github.com/repos/{args.repo}/commits?per_page={args.per_page}" + flatten = _flatten_commit + items_path = "" + else: + url = ( + f"https://api.github.com/repos/{args.repo}/{args.scope}" + f"?per_page={args.per_page}&state=all" + ) + flatten = _flatten_issue_or_release + items_path = "" + + headers = { + "Accept": "application/vnd.github+json", + "User-Agent": "Hermes-Watcher/1.0", + } + token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN") + if token: + headers["Authorization"] = f"Bearer {token}" + + req = urllib.request.Request(url) + for k, v in headers.items(): + req.add_header(k, v) + + try: + with urllib.request.urlopen(req, timeout=args.timeout) as resp: + raw = resp.read() + except urllib.error.HTTPError as e: + print(f"watch_github: HTTP {e.code} from {url}", file=sys.stderr) + return 2 + except (urllib.error.URLError, TimeoutError, OSError) as e: + print(f"watch_github: network error: {e}", file=sys.stderr) + return 2 + + try: + data = json.loads(raw.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + print(f"watch_github: response is not valid JSON: {e}", file=sys.stderr) + return 2 + + # Drill into items_path if needed (search endpoint returns {"items":[...]}). + if items_path: + data = data.get(items_path) if isinstance(data, dict) else None + if not isinstance(data, list): + print(f"watch_github: expected a list of items; got {type(data).__name__}", + file=sys.stderr) + return 2 + + items = [flatten(i) for i in data if isinstance(i, dict)] + # Drop any items that flattened without an ID (defensive). + items = [i for i in items if i.get("id")] + + wm = Watermark.load(args.name) + new_items = wm.filter_new(items, id_key="id") + wm.save() + + if args.max > 0: + new_items = new_items[: args.max] + + body_key = "body" if args.with_body else None + output = format_items_as_markdown(new_items, body_key=body_key) + if output: + sys.stdout.write(output) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/optional-skills/devops/watchers/scripts/watch_http_json.py b/optional-skills/devops/watchers/scripts/watch_http_json.py new file mode 100755 index 0000000000..6d8be8c541 --- /dev/null +++ b/optional-skills/devops/watchers/scripts/watch_http_json.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +"""Watch any JSON endpoint that returns a list of objects; dedup by ID field. + +Usage (via cron with --no-agent): + + hermes cron create api-events \\ + --schedule "*/1 * * * *" --no-agent \\ + --script "$HERMES_HOME/skills/devops/watchers/scripts/watch_http_json.py" \\ + --script-args "--name api --url https://api.example.com/events \\ + --id-field event_id --items-path data.events" + +The response can be: + - a top-level JSON list (default), or + - a JSON object with a dotted ``--items-path`` pointing to the list. + +Each item is deduped by ``--id-field`` (default "id"). + +Optional ``--header KEY:VALUE`` flags pass HTTP headers (repeatable). +""" + +from __future__ import annotations + +import argparse +import json +import sys +import urllib.error +import urllib.request +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from _watermark import Watermark, format_items_as_markdown # type: ignore + + +def _dig(obj, path: str): + """Dotted-path lookup: _dig({'a':{'b':[1,2]}}, 'a.b') → [1,2].""" + if not path: + return obj + cur = obj + for part in path.split("."): + if isinstance(cur, dict) and part in cur: + cur = cur[part] + else: + return None + return cur + + +def _parse_header(s: str): + if ":" not in s: + raise argparse.ArgumentTypeError( + f"--header expects 'KEY: VALUE' (got {s!r})" + ) + k, v = s.split(":", 1) + return (k.strip(), v.strip()) + + +def main() -> int: + p = argparse.ArgumentParser(description="Poll a JSON endpoint.") + p.add_argument("--name", required=True, help="Watcher name (used for state file)") + p.add_argument("--url", required=True, help="JSON endpoint URL") + p.add_argument("--id-field", default="id", + help="Field used to dedup items (default: 'id')") + p.add_argument("--items-path", default="", + help="Dotted path to the list inside the JSON response (e.g. 'data.events')") + p.add_argument("--title-field", default="title", + help="Field used as the item title in the rendered output (default: 'title')") + p.add_argument("--url-field", default="url", + help="Field used as the item URL in the rendered output (default: 'url')") + p.add_argument("--body-field", default="", + help="Optional body field to include as a snippet under each item") + p.add_argument("--max", type=int, default=20, + help="Max new items to emit per tick (default: 20)") + p.add_argument("--header", action="append", type=_parse_header, default=[], + metavar="KEY: VALUE", + help="HTTP header (repeatable)") + p.add_argument("--timeout", type=float, default=20.0, + help="HTTP timeout in seconds (default: 20)") + args = p.parse_args() + + req = urllib.request.Request(args.url, headers={"User-Agent": "Hermes-Watcher/1.0"}) + for k, v in args.header: + req.add_header(k, v) + + try: + with urllib.request.urlopen(req, timeout=args.timeout) as resp: + raw = resp.read() + except urllib.error.HTTPError as e: + print(f"watch_http_json: HTTP {e.code} from {args.url}", file=sys.stderr) + return 2 + except (urllib.error.URLError, TimeoutError, OSError) as e: + print(f"watch_http_json: network error: {e}", file=sys.stderr) + return 2 + + try: + data = json.loads(raw.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + print(f"watch_http_json: response is not valid JSON: {e}", file=sys.stderr) + return 2 + + items = _dig(data, args.items_path) if args.items_path else data + if not isinstance(items, list): + print( + f"watch_http_json: items_path={args.items_path!r} did not resolve to a list " + f"(got {type(items).__name__})", + file=sys.stderr, + ) + return 2 + + # Keep only dicts — skip any bare strings / numbers so filter_new doesn't crash. + items = [i for i in items if isinstance(i, dict)] + + wm = Watermark.load(args.name) + new_items = wm.filter_new(items, id_key=args.id_field) + wm.save() + + if args.max > 0: + new_items = new_items[: args.max] + + body_key = args.body_field or None + output = format_items_as_markdown( + new_items, + title_key=args.title_field, + url_key=args.url_field, + body_key=body_key, + ) + if output: + sys.stdout.write(output) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/optional-skills/devops/watchers/scripts/watch_rss.py b/optional-skills/devops/watchers/scripts/watch_rss.py new file mode 100755 index 0000000000..cc729f91b1 --- /dev/null +++ b/optional-skills/devops/watchers/scripts/watch_rss.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Watch an RSS 2.0 or Atom feed; print new items to stdout, silent on empty. + +Usage (via cron with --no-agent): + + hermes cron create my-feed \\ + --schedule "*/15 * * * *" --no-agent \\ + --script "$HERMES_HOME/skills/devops/watchers/scripts/watch_rss.py" \\ + --script-args "--name hn --url https://news.ycombinator.com/rss" + +First run records a baseline (emits nothing). Subsequent runs emit only +items whose <guid> / <id> isn't in the watermark. +""" + +from __future__ import annotations + +import argparse +import sys +import urllib.error +import urllib.request +from pathlib import Path +from xml.etree import ElementTree as ET + +sys.path.insert(0, str(Path(__file__).parent)) +from _watermark import Watermark, format_items_as_markdown # type: ignore + + +def _strip_ns(tag: str) -> str: + return tag.split("}", 1)[1] if "}" in tag else tag + + +def _parse_feed(xml_bytes: bytes): + """Return a list of {id, title, url, summary} dicts. + + Handles both RSS 2.0 ``<item>`` and Atom ``<entry>``. + """ + try: + root = ET.fromstring(xml_bytes) + except ET.ParseError as e: + print(f"watch_rss: invalid XML: {e}", file=sys.stderr) + sys.exit(2) + + entries = [] + for item in root.iter(): + tag = _strip_ns(item.tag) + if tag not in ("item", "entry"): + continue + # ElementTree Elements without children are *falsy* — use `is not None`. + children = {_strip_ns(c.tag): c for c in item} + + guid_el = children.get("guid") + if guid_el is None: + guid_el = children.get("id") + link_el = children.get("link") + if link_el is not None: + href = link_el.attrib.get("href") or (link_el.text or "").strip() + else: + href = "" + guid = (guid_el.text or "").strip() if guid_el is not None else "" + guid = guid or href + if not guid: + continue + + title_el = children.get("title") + title = (title_el.text or "").strip() if title_el is not None else "" + + summ_el = children.get("description") + if summ_el is None: + summ_el = children.get("summary") + summary = (summ_el.text or "").strip() if summ_el is not None else "" + + entries.append( + {"id": guid, "title": title, "url": href, "summary": summary} + ) + return entries + + +def main() -> int: + p = argparse.ArgumentParser(description="Watch an RSS/Atom feed.") + p.add_argument("--name", required=True, help="Watcher name (used for state file)") + p.add_argument("--url", required=True, help="Feed URL") + p.add_argument("--max", type=int, default=10, + help="Max new items to emit per tick (default: 10)") + p.add_argument("--with-summary", action="store_true", + help="Include <description>/<summary> snippet under each item") + p.add_argument("--timeout", type=float, default=20.0, + help="HTTP timeout in seconds (default: 20)") + args = p.parse_args() + + try: + req = urllib.request.Request(args.url, headers={"User-Agent": "Hermes-Watcher/1.0"}) + with urllib.request.urlopen(req, timeout=args.timeout) as resp: + xml_bytes = resp.read() + except urllib.error.HTTPError as e: + print(f"watch_rss: HTTP {e.code} from {args.url}", file=sys.stderr) + return 2 + except (urllib.error.URLError, TimeoutError, OSError) as e: + print(f"watch_rss: network error: {e}", file=sys.stderr) + return 2 + + entries = _parse_feed(xml_bytes) + + wm = Watermark.load(args.name) + new_items = wm.filter_new(entries, id_key="id") + wm.save() + + # Cap emitted items (watermark still records all seen IDs so we don't + # re-emit them next tick). + if args.max > 0: + new_items = new_items[: args.max] + + body_key = "summary" if args.with_summary else None + output = format_items_as_markdown(new_items, body_key=body_key) + if output: + sys.stdout.write(output) + # Empty stdout on no-new — cron treats that as silent. + return 0 + + +if __name__ == "__main__": + sys.exit(main())