feat(skills): watchers skill — poll RSS / HTTP JSON / GitHub via cron no-agent (#21881)

* feat(skills): watchers skill — poll RSS / HTTP JSON / GitHub via cron no-agent

Ships three reusable polling scripts plus a shared watermark helper as an
optional skill.  Users wire them into the existing cron (no_agent=True)
mode rather than learning a new subsystem.

Supersedes the closed PR #21497 (parallel watcher subsystem).  Same value,
zero new core surface.

## What ships

- optional-skills/devops/watchers/SKILL.md: pattern + three example cron commands
- optional-skills/devops/watchers/scripts/_watermark.py: shared helper
  (atomic state writes, bounded ID set, first-run baseline)
- optional-skills/devops/watchers/scripts/watch_rss.py: RSS 2.0 + Atom
- optional-skills/devops/watchers/scripts/watch_http_json.py: any JSON endpoint
  with configurable id_field / items_path / headers
- optional-skills/devops/watchers/scripts/watch_github.py: issues / pulls /
  releases / commits (uses GITHUB_TOKEN if present)

## Invariants enforced by the shared helper

- First run records baseline, emits nothing (never replays existing feed)
- Watermark file is <state_dir>/<name>.json, atomic replace on write
- Bounded to 500 IDs (configurable)
- Empty stdout when no new items — cron treats that as silent delivery

## Validation
- watch_rss.py against news.ycombinator.com/rss first run → empty stdout, watermark populated
- Removed one seen-id, second run → emitted exactly that item
- No DeprecationWarnings (ET element truth-value footgun dodged explicitly)

End-user pattern: 'hermes cron create my-feed --schedule "*/15 * * * *" --no-agent --script $HERMES_HOME/skills/devops/watchers/scripts/watch_rss.py --script-args "--name hn --url https://news.ycombinator.com/rss" --deliver telegram'

* docs(skills/watchers): tighten description to match peer optional skills

* docs(skills/watchers): align frontmatter + structure with peer optional skills

* docs(skills/watchers): gate to linux/macos (shell syntax in examples)
This commit is contained in:
Teknium 2026-05-08 09:27:15 -07:00 committed by GitHub
parent 839cdd1b05
commit ea8e608821
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 680 additions and 0 deletions

View file

@ -0,0 +1,148 @@
"""Shared watermark helper used by the three watcher scripts.
A watermark is just a JSON file that records the IDs we've seen on previous
runs, so the next run only emits items we haven't seen before.
Contract:
- First run: record all IDs from the fetched batch, emit nothing.
- Subsequent runs: emit items whose ID isn't in the stored set.
- Bounded: keep at most `max_seen` IDs (default 500).
- Atomic: write to a .tmp file and rename, so a crashed script can't
leave a half-written state file that permanently breaks dedup.
Import and use from any custom watcher script:
from _watermark import Watermark
wm = Watermark.load("my-feed-name")
new_items = wm.filter_new(fetched_items, id_key="id")
wm.save()
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
def _state_dir() -> Path:
"""Where watermark files live — respects WATCHER_STATE_DIR override."""
override = os.environ.get("WATCHER_STATE_DIR")
if override:
return Path(override)
# Default: $HERMES_HOME/watcher-state/, falling back to ~/.hermes/watcher-state/.
hermes_home = os.environ.get("HERMES_HOME") or str(Path.home() / ".hermes")
return Path(hermes_home) / "watcher-state"
class Watermark:
"""Per-watcher state. Persisted to <state_dir>/<name>.json."""
def __init__(self, name: str, *, max_seen: int = 500) -> None:
if not name or not name.replace("-", "").replace("_", "").isalnum():
raise ValueError(
f"watermark name must be alphanumeric + '-'/'_' (got {name!r})"
)
self.name = name
self.max_seen = max_seen
self._path = _state_dir() / f"{name}.json"
self._data: Dict[str, Any] = {"seen_ids": [], "first_run": True}
@classmethod
def load(cls, name: str, *, max_seen: int = 500) -> "Watermark":
wm = cls(name, max_seen=max_seen)
if wm._path.exists():
try:
wm._data = json.loads(wm._path.read_text(encoding="utf-8"))
wm._data.setdefault("seen_ids", [])
wm._data["first_run"] = False
except (OSError, json.JSONDecodeError):
# Corrupt state file — treat as a first run but don't crash.
wm._data = {"seen_ids": [], "first_run": True}
return wm
@property
def is_first_run(self) -> bool:
return bool(self._data.get("first_run", True))
@property
def seen(self) -> List[str]:
return list(self._data.get("seen_ids", []))
def filter_new(
self, items: Iterable[Dict[str, Any]], *, id_key: str = "id"
) -> List[Dict[str, Any]]:
"""Return items whose id isn't in the stored set.
Side effect: updates the in-memory seen set with every id in the
batch (so save() persists the full new watermark). On first run,
records every id but returns an empty list (baseline, no replay).
"""
existing = set(str(x) for x in self._data.get("seen_ids", []))
was_first_run = self.is_first_run
new_items: List[Dict[str, Any]] = []
batch_ids: List[str] = []
for item in items:
ident = item.get(id_key)
if ident is None:
continue
ident_str = str(ident)
batch_ids.append(ident_str)
if ident_str in existing:
continue
if was_first_run:
continue # record but don't emit
new_items.append(item)
combined = list(existing) + [i for i in batch_ids if i not in existing]
if len(combined) > self.max_seen:
combined = combined[-self.max_seen:]
self._data["seen_ids"] = combined
self._data["first_run"] = False
return new_items
def save(self) -> None:
self._path.parent.mkdir(parents=True, exist_ok=True)
tmp = self._path.with_suffix(".tmp")
tmp.write_text(
json.dumps(self._data, indent=2, sort_keys=True),
encoding="utf-8",
)
os.replace(tmp, self._path)
def format_items_as_markdown(
items: List[Dict[str, Any]],
*,
title_key: str = "title",
url_key: str = "url",
body_key: Optional[str] = None,
max_body_chars: int = 500,
) -> str:
"""Render a list of items as Markdown for cron delivery.
One heading per item + its URL + optional snippet of body. Output is
empty string when items is empty cron will then treat stdout as
silent and skip delivery (existing behavior).
"""
if not items:
return ""
lines: List[str] = []
for item in items:
title = (item.get(title_key) or "(no title)").strip()
url = (item.get(url_key) or "").strip()
lines.append(f"## {title}")
if url:
lines.append(url)
if body_key:
body = (item.get(body_key) or "").strip()
if body:
if len(body) > max_body_chars:
body = body[:max_body_chars].rstrip() + ""
lines.append("")
lines.append(body)
lines.append("")
return "\n".join(lines).rstrip() + "\n"

View file

@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""Watch GitHub activity — issues, pulls, releases, or commits — with dedup.
Usage (via cron with --no-agent):
hermes cron create hermes-issues \\
--schedule "*/5 * * * *" --no-agent \\
--script "$HERMES_HOME/skills/devops/watchers/scripts/watch_github.py" \\
--script-args "--name hermes-issues --repo NousResearch/hermes-agent --scope issues"
Set GITHUB_TOKEN (or GH_TOKEN) in ~/.hermes/.env to avoid the 60 req/hr
anonymous rate limit.
Scopes: issues | pulls | releases | commits. Or pass --search QUERY to
use the /search/issues endpoint instead of /repos/:owner/:repo/:scope.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from _watermark import Watermark, format_items_as_markdown # type: ignore
VALID_SCOPES = ("issues", "pulls", "releases", "commits")
def _flatten_commit(item):
"""Commit objects nest title/author/date under 'commit' — flatten for rendering."""
commit = item.get("commit") or {}
msg = (commit.get("message") or "").strip().splitlines()
title = msg[0] if msg else ""
body = "\n".join(msg[1:]).strip() if len(msg) > 1 else ""
author = (item.get("author") or {}).get("login") or (commit.get("author") or {}).get("name", "")
date = (commit.get("author") or {}).get("date", "")
return {
"id": item.get("sha", ""),
"title": f"{title} ({author})" if author else title,
"url": item.get("html_url"),
"body": body,
"created_at": date,
}
def _flatten_issue_or_release(item):
return {
"id": str(item.get("id", "")),
"title": item.get("title") or item.get("name") or "",
"url": item.get("html_url") or item.get("url"),
"body": (item.get("body") or "").strip(),
"state": item.get("state"),
"author": (item.get("user") or {}).get("login")
or (item.get("author") or {}).get("login"),
"created_at": item.get("created_at"),
}
def main() -> int:
p = argparse.ArgumentParser(description="Watch GitHub issues / pulls / releases / commits.")
p.add_argument("--name", required=True, help="Watcher name (used for state file)")
p.add_argument("--repo", default="",
help="owner/name of the repo (one of --repo or --search is required)")
p.add_argument("--scope", default="issues", choices=VALID_SCOPES,
help="What to poll (default: issues)")
p.add_argument("--search", default="",
help="GitHub issues search query (alternative to --repo/--scope)")
p.add_argument("--per-page", type=int, default=30,
help="Results per page (default: 30, max: 100)")
p.add_argument("--max", type=int, default=20,
help="Max new items to emit per tick (default: 20)")
p.add_argument("--with-body", action="store_true",
help="Include issue/commit body as a snippet under each item")
p.add_argument("--timeout", type=float, default=30.0,
help="HTTP timeout in seconds (default: 30)")
args = p.parse_args()
if not args.repo and not args.search:
print("watch_github: one of --repo or --search is required", file=sys.stderr)
return 2
if args.repo and not re.fullmatch(r"[A-Za-z0-9._-]+/[A-Za-z0-9._-]+", args.repo):
print(f"watch_github: --repo must be owner/name (got {args.repo!r})", file=sys.stderr)
return 2
# URL + flattening strategy.
if args.search:
url = (
"https://api.github.com/search/issues"
f"?q={urllib.parse.quote(args.search)}&per_page={args.per_page}"
)
flatten = _flatten_issue_or_release
items_path = "items"
elif args.scope == "commits":
url = f"https://api.github.com/repos/{args.repo}/commits?per_page={args.per_page}"
flatten = _flatten_commit
items_path = ""
else:
url = (
f"https://api.github.com/repos/{args.repo}/{args.scope}"
f"?per_page={args.per_page}&state=all"
)
flatten = _flatten_issue_or_release
items_path = ""
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "Hermes-Watcher/1.0",
}
token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(url)
for k, v in headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=args.timeout) as resp:
raw = resp.read()
except urllib.error.HTTPError as e:
print(f"watch_github: HTTP {e.code} from {url}", file=sys.stderr)
return 2
except (urllib.error.URLError, TimeoutError, OSError) as e:
print(f"watch_github: network error: {e}", file=sys.stderr)
return 2
try:
data = json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as e:
print(f"watch_github: response is not valid JSON: {e}", file=sys.stderr)
return 2
# Drill into items_path if needed (search endpoint returns {"items":[...]}).
if items_path:
data = data.get(items_path) if isinstance(data, dict) else None
if not isinstance(data, list):
print(f"watch_github: expected a list of items; got {type(data).__name__}",
file=sys.stderr)
return 2
items = [flatten(i) for i in data if isinstance(i, dict)]
# Drop any items that flattened without an ID (defensive).
items = [i for i in items if i.get("id")]
wm = Watermark.load(args.name)
new_items = wm.filter_new(items, id_key="id")
wm.save()
if args.max > 0:
new_items = new_items[: args.max]
body_key = "body" if args.with_body else None
output = format_items_as_markdown(new_items, body_key=body_key)
if output:
sys.stdout.write(output)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""Watch any JSON endpoint that returns a list of objects; dedup by ID field.
Usage (via cron with --no-agent):
hermes cron create api-events \\
--schedule "*/1 * * * *" --no-agent \\
--script "$HERMES_HOME/skills/devops/watchers/scripts/watch_http_json.py" \\
--script-args "--name api --url https://api.example.com/events \\
--id-field event_id --items-path data.events"
The response can be:
- a top-level JSON list (default), or
- a JSON object with a dotted ``--items-path`` pointing to the list.
Each item is deduped by ``--id-field`` (default "id").
Optional ``--header KEY:VALUE`` flags pass HTTP headers (repeatable).
"""
from __future__ import annotations
import argparse
import json
import sys
import urllib.error
import urllib.request
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from _watermark import Watermark, format_items_as_markdown # type: ignore
def _dig(obj, path: str):
"""Dotted-path lookup: _dig({'a':{'b':[1,2]}}, 'a.b') → [1,2]."""
if not path:
return obj
cur = obj
for part in path.split("."):
if isinstance(cur, dict) and part in cur:
cur = cur[part]
else:
return None
return cur
def _parse_header(s: str):
if ":" not in s:
raise argparse.ArgumentTypeError(
f"--header expects 'KEY: VALUE' (got {s!r})"
)
k, v = s.split(":", 1)
return (k.strip(), v.strip())
def main() -> int:
p = argparse.ArgumentParser(description="Poll a JSON endpoint.")
p.add_argument("--name", required=True, help="Watcher name (used for state file)")
p.add_argument("--url", required=True, help="JSON endpoint URL")
p.add_argument("--id-field", default="id",
help="Field used to dedup items (default: 'id')")
p.add_argument("--items-path", default="",
help="Dotted path to the list inside the JSON response (e.g. 'data.events')")
p.add_argument("--title-field", default="title",
help="Field used as the item title in the rendered output (default: 'title')")
p.add_argument("--url-field", default="url",
help="Field used as the item URL in the rendered output (default: 'url')")
p.add_argument("--body-field", default="",
help="Optional body field to include as a snippet under each item")
p.add_argument("--max", type=int, default=20,
help="Max new items to emit per tick (default: 20)")
p.add_argument("--header", action="append", type=_parse_header, default=[],
metavar="KEY: VALUE",
help="HTTP header (repeatable)")
p.add_argument("--timeout", type=float, default=20.0,
help="HTTP timeout in seconds (default: 20)")
args = p.parse_args()
req = urllib.request.Request(args.url, headers={"User-Agent": "Hermes-Watcher/1.0"})
for k, v in args.header:
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=args.timeout) as resp:
raw = resp.read()
except urllib.error.HTTPError as e:
print(f"watch_http_json: HTTP {e.code} from {args.url}", file=sys.stderr)
return 2
except (urllib.error.URLError, TimeoutError, OSError) as e:
print(f"watch_http_json: network error: {e}", file=sys.stderr)
return 2
try:
data = json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as e:
print(f"watch_http_json: response is not valid JSON: {e}", file=sys.stderr)
return 2
items = _dig(data, args.items_path) if args.items_path else data
if not isinstance(items, list):
print(
f"watch_http_json: items_path={args.items_path!r} did not resolve to a list "
f"(got {type(items).__name__})",
file=sys.stderr,
)
return 2
# Keep only dicts — skip any bare strings / numbers so filter_new doesn't crash.
items = [i for i in items if isinstance(i, dict)]
wm = Watermark.load(args.name)
new_items = wm.filter_new(items, id_key=args.id_field)
wm.save()
if args.max > 0:
new_items = new_items[: args.max]
body_key = args.body_field or None
output = format_items_as_markdown(
new_items,
title_key=args.title_field,
url_key=args.url_field,
body_key=body_key,
)
if output:
sys.stdout.write(output)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""Watch an RSS 2.0 or Atom feed; print new items to stdout, silent on empty.
Usage (via cron with --no-agent):
hermes cron create my-feed \\
--schedule "*/15 * * * *" --no-agent \\
--script "$HERMES_HOME/skills/devops/watchers/scripts/watch_rss.py" \\
--script-args "--name hn --url https://news.ycombinator.com/rss"
First run records a baseline (emits nothing). Subsequent runs emit only
items whose <guid> / <id> isn't in the watermark.
"""
from __future__ import annotations
import argparse
import sys
import urllib.error
import urllib.request
from pathlib import Path
from xml.etree import ElementTree as ET
sys.path.insert(0, str(Path(__file__).parent))
from _watermark import Watermark, format_items_as_markdown # type: ignore
def _strip_ns(tag: str) -> str:
return tag.split("}", 1)[1] if "}" in tag else tag
def _parse_feed(xml_bytes: bytes):
"""Return a list of {id, title, url, summary} dicts.
Handles both RSS 2.0 ``<item>`` and Atom ``<entry>``.
"""
try:
root = ET.fromstring(xml_bytes)
except ET.ParseError as e:
print(f"watch_rss: invalid XML: {e}", file=sys.stderr)
sys.exit(2)
entries = []
for item in root.iter():
tag = _strip_ns(item.tag)
if tag not in ("item", "entry"):
continue
# ElementTree Elements without children are *falsy* — use `is not None`.
children = {_strip_ns(c.tag): c for c in item}
guid_el = children.get("guid")
if guid_el is None:
guid_el = children.get("id")
link_el = children.get("link")
if link_el is not None:
href = link_el.attrib.get("href") or (link_el.text or "").strip()
else:
href = ""
guid = (guid_el.text or "").strip() if guid_el is not None else ""
guid = guid or href
if not guid:
continue
title_el = children.get("title")
title = (title_el.text or "").strip() if title_el is not None else ""
summ_el = children.get("description")
if summ_el is None:
summ_el = children.get("summary")
summary = (summ_el.text or "").strip() if summ_el is not None else ""
entries.append(
{"id": guid, "title": title, "url": href, "summary": summary}
)
return entries
def main() -> int:
p = argparse.ArgumentParser(description="Watch an RSS/Atom feed.")
p.add_argument("--name", required=True, help="Watcher name (used for state file)")
p.add_argument("--url", required=True, help="Feed URL")
p.add_argument("--max", type=int, default=10,
help="Max new items to emit per tick (default: 10)")
p.add_argument("--with-summary", action="store_true",
help="Include <description>/<summary> snippet under each item")
p.add_argument("--timeout", type=float, default=20.0,
help="HTTP timeout in seconds (default: 20)")
args = p.parse_args()
try:
req = urllib.request.Request(args.url, headers={"User-Agent": "Hermes-Watcher/1.0"})
with urllib.request.urlopen(req, timeout=args.timeout) as resp:
xml_bytes = resp.read()
except urllib.error.HTTPError as e:
print(f"watch_rss: HTTP {e.code} from {args.url}", file=sys.stderr)
return 2
except (urllib.error.URLError, TimeoutError, OSError) as e:
print(f"watch_rss: network error: {e}", file=sys.stderr)
return 2
entries = _parse_feed(xml_bytes)
wm = Watermark.load(args.name)
new_items = wm.filter_new(entries, id_key="id")
wm.save()
# Cap emitted items (watermark still records all seen IDs so we don't
# re-emit them next tick).
if args.max > 0:
new_items = new_items[: args.max]
body_key = "summary" if args.with_summary else None
output = format_items_as_markdown(new_items, body_key=body_key)
if output:
sys.stdout.write(output)
# Empty stdout on no-new — cron treats that as silent.
return 0
if __name__ == "__main__":
sys.exit(main())