diff --git a/docs/brainstorms/2026-04-25-codex-bridge-completion-notification-requirements.md b/docs/brainstorms/2026-04-25-codex-bridge-completion-notification-requirements.md new file mode 100644 index 0000000000..3cf2ea8270 --- /dev/null +++ b/docs/brainstorms/2026-04-25-codex-bridge-completion-notification-requirements.md @@ -0,0 +1,47 @@ +--- +title: Codex Bridge 异步完成通知 MVP 需求 +date: 2026-04-25 +status: accepted +scope: lightweight +--- + +# Codex Bridge 异步完成通知 MVP 需求 + +## 背景 + +Hermes 通过 `skills/codex-bridge/references/cli.py start` 启动 Codex app-server stdio 任务后,会把任务状态写入本地 `codex_bridge.db`。当前实现不是常驻订阅或完成回调模式:Codex turn 完成后不会主动通知原 Feishu/平台会话,用户必须再次说“继续”后 Hermes 才会手动查询 `status` 或 `list`。 + +这造成一个产品异味:异步任务已经启动,但完成后没有人主动查收。 + +## 范围决策 + +本次做窄范围 MVP:让 Codex Bridge 启动的异步任务在完成后能回到原会话或目标发送完成摘要。不要做多租户调度系统,不重写现有 Codex Bridge 低层协议,不引入 mailbox/outbox/inbox 作为主通信机制。 + +## 目标 + +- 启动任务时可选记录通知目标,例如 `local`、`feishu:` 或其他 `send_message` 支持的显式平台目标。 +- 默认不改变现有 API 行为;未传通知目标时仍能正常启动和查询。 +- 提供 watcher/one-shot poll 入口,发现已完成但未处理通知的任务。 +- 对有目标的任务读取 final summary,生成简洁完成摘要,并通过可注入 notifier 发送。 +- 对无目标的完成任务标记为 `no_target`,避免 watcher 重启后重复处理。 +- 通过持久化 `notification_status` / `notified_at` 防重复通知。 + +## 非目标 + +- 不实现常驻多租户调度器。 +- 不实现 pending approval / `requestUserInput` 的实时双向交互。 +- 不让测试向真实 Feishu、WeChat、Telegram 等外部平台发消息。 +- 不开放 `danger-full-access` 默认权限。 +- 不用 mailbox/outbox/inbox 作为通信机制。 + +## 验收标准 + +- `codex_bridge(action="start", notify_target=...)` 能把目标写入任务状态。 +- watcher/notify 入口只通知 terminal 状态任务一次;重启或重复运行不会重复发送。 +- terminal 任务没有 target 时会被标记为 `no_target`,不会调用 notifier。 +- CLI 暴露 `--notify-target` 和 one-shot `notify-completed` 入口,并支持 dry-run。 +- 测试通过 mock/inject notifier 覆盖通知行为。 + +## 后续扩展说明 + +pending approval 和 `requestUserInput` 后续可复用同一通知目标字段:当任务进入 `waiting_for_approval` 或 `waiting_for_user_input` 时,watcher 可以发送带 request id 的交互提示;平台侧回复再映射到 `codex_bridge respond`。本次先只处理 terminal completion,避免把交互式审批设计混入 MVP。 diff --git a/docs/plans/2026-04-25-codex-bridge-completion-notification-plan.md b/docs/plans/2026-04-25-codex-bridge-completion-notification-plan.md new file mode 100644 index 0000000000..a7d0938fcc --- /dev/null +++ b/docs/plans/2026-04-25-codex-bridge-completion-notification-plan.md @@ -0,0 +1,82 @@ +--- +title: Codex Bridge 异步完成通知 MVP 实现计划 +date: 2026-04-25 +status: active +origin: docs/brainstorms/2026-04-25-codex-bridge-completion-notification-requirements.md +--- + +# Codex Bridge 异步完成通知 MVP 实现计划 + +## 问题框架 + +Codex Bridge 已能通过 app-server stdio 启动异步 Codex turn,并把状态写入 `codex_bridge.db`。缺口在完成后的主动送达:当前没有通知目标、通知状态,也没有 watcher 入口来把 terminal 任务的摘要回发给原会话。 + +## 技术决策 + +- 在 `codex_bridge_tasks` 上新增通知元数据:`notify_target`、`notification_status`、`notified_at`、`notification_error`。 +- `start` 接受可选 `notify_target`,不传时保持旧行为。 +- 新增 one-shot `notify_completed` action:扫描 terminal 且尚未处理通知的任务,按目标发送或标记 `no_target`。 +- 默认 notifier 复用现有 `send_message` 工具;测试和 CLI dry-run 通过注入或 dry-run 避免真实外发。 +- `local` 目标作为本地消费目标:记录为已通知并返回摘要,不调用外部平台。 + +## 实现单元 + +### U1: 持久化通知目标与状态 + +修改文件: +- `tools/codex_bridge_tool.py` +- `tests/tools/test_codex_bridge_tool.py` + +做法: +- 数据库初始化时对旧库执行兼容迁移。 +- `CodexBridgeTask.snapshot()`、`list_tasks()`、`get_task_snapshot()` 暴露通知字段。 +- `start_task()` 接受 `notify_target` 并保存。 + +测试场景: +- 启动任务时传入 `notify_target`,状态快照和持久化查询都能看到该值。 + +### U2: 完成通知 one-shot watcher + +修改文件: +- `tools/codex_bridge_tool.py` +- `tests/tools/test_codex_bridge_tool.py` + +做法: +- 增加扫描 terminal 任务的方法。 +- 对无 target 的任务标记 `no_target`,不调用 notifier。 +- 对有 target 的任务构造简洁摘要,调用 notifier 后标记 `sent` 和 `notified_at`。 +- 已 `sent` 或 `no_target` 的任务不再重复处理。 +- 支持 `dry_run`,只返回会处理的任务,不写通知状态,不发送。 + +测试场景: +- completed 任务只通知一次。 +- 无 target completed 任务不发送,并标记 `no_target`。 +- dry-run 不发送且不改变通知状态。 + +### U3: 工具 schema 与 CLI 入口 + +修改文件: +- `tools/codex_bridge_tool.py` +- `skills/codex-bridge/references/cli.py` +- `skills/codex-bridge/references/validator.py` +- `tests/skills/test_codex_bridge_skill.py` + +做法: +- schema 加入 `notify_completed` action、`notify_target`、`dry_run`。 +- CLI `start`/`smoke-test` 增加 `--notify-target`。 +- CLI 增加 `notify-completed` one-shot 命令。 +- validator 校验 notify 输出的基本结构。 + +测试场景: +- CLI start 能把 `--notify-target` 传给工具。 +- CLI notify-completed dry-run 调用 bridge 且不依赖真实平台。 + +## 验证 + +- `python -m py_compile tools/codex_bridge_tool.py skills/codex-bridge/references/cli.py skills/codex-bridge/references/validator.py` +- `scripts/run_tests.sh tests/tools/test_codex_bridge_tool.py tests/skills/test_codex_bridge_skill.py` + +## 风险 + +- 默认 notifier 依赖 `send_message` 的运行环境;没有 gateway 或目标不可达时会记录 `notification_error` 并保留可重试状态。 +- 当前只处理 terminal completion,不处理实时 approval/input;后续应在同一 target 模型上扩展。 diff --git a/docs/solutions/developer-experience/codex-bridge-async-completion-notifications-2026-04-25.md b/docs/solutions/developer-experience/codex-bridge-async-completion-notifications-2026-04-25.md new file mode 100644 index 0000000000..895b9e9c5f --- /dev/null +++ b/docs/solutions/developer-experience/codex-bridge-async-completion-notifications-2026-04-25.md @@ -0,0 +1,85 @@ +--- +title: Codex Bridge 异步任务需要持久化完成通知状态 +date: 2026-04-25 +category: docs/solutions/developer-experience/ +module: Codex Bridge +problem_type: developer_experience +component: assistant +severity: medium +applies_when: + - 异步 agent 任务由本地 bridge 启动,但完成结果需要回到原平台会话 + - 任务状态已经持久化,但缺少完成后主动送达能力 + - 测试不能向真实外部平台发送消息 +tags: [codex-bridge, async-notification, app-server, send-message, watcher] +--- + +# Codex Bridge 异步任务需要持久化完成通知状态 + +## Context + +Codex Bridge 已经通过 app-server stdio 启动 Codex 任务,并把状态写入 `codex_bridge.db`。dogfood 暴露出的体验问题是:异步任务完成后没有主动通知原 Feishu/平台会话,用户必须再次触发 Hermes 查询 `status` 或 `list` 才能知道结果。 + +这类问题不需要先做多租户调度系统。MVP 的关键是让任务在启动时可选记录通知目标,并让一个 one-shot watcher 可以可靠地处理 terminal 任务。 + +## Guidance + +在已有任务表上补齐三个概念,而不是重写底层通信协议: + +- `notify_target`:启动时可选记录目标,例如 `local` 或 `feishu:`。 +- `notification_status`:记录通知生命周期,例如 `pending`、`sent`、`failed`、`no_target`。 +- `notified_at` / `notification_error`:让 watcher 重启后能防重复,并保留失败原因。 + +watcher 应该只扫描 terminal 状态任务,并做幂等处理: + +- 有目标:构造简洁完成摘要,调用可注入 notifier,成功后标记 `sent`。 +- 无目标:标记 `no_target`,不发送,避免每次扫描重复捞到同一任务。 +- dry-run:返回预览,不发送,也不写通知状态。 + +默认 notifier 可以复用现有 `send_message` 能力,但核心 manager 方法要允许注入 notifier。这样单元测试可以用 fake notifier 验证行为,避免真实平台副作用。 + +## Why This Matters + +异步 bridge 的产品承诺不是“能启动后台任务”,而是“任务结束后用户能在原上下文看到结果”。如果只有状态表但没有通知状态,系统会卡在“完成但无人查收”的灰区;如果没有持久化防重复,watcher 或 daemon 重启又可能重复推送。 + +把通知状态做成任务元数据,可以在不引入 mailbox/outbox/inbox 通信机制的情况下满足 MVP,并为后续实时 approval / `requestUserInput` 扩展留下同一套 target 语义。 + +## When to Apply + +- 异步任务生命周期已经持久化,但完成后需要跨平台送达。 +- 现有平台发送能力已经存在,新增功能只需要选择目标和调用发送。 +- 需要保证测试环境不触发真实外部消息。 +- 需要 watcher/daemon 重启后不重复通知。 + +## Examples + +启动时记录目标: + +```python +codex_bridge( + action="start", + prompt="Investigate the failing tests", + notify_target="feishu:chat-1", +) +``` + +one-shot watcher dry-run: + +```bash +python skills/codex-bridge/references/cli.py notify-completed --dry-run +``` + +测试中注入 notifier: + +```python +deliveries = [] +manager.notify_completed( + notifier=lambda target, message: deliveries.append((target, message)) or {"ok": True} +) +``` + +## Related + +- `docs/brainstorms/2026-04-25-codex-bridge-completion-notification-requirements.md` +- `docs/plans/2026-04-25-codex-bridge-completion-notification-plan.md` +- `tools/codex_bridge_tool.py` +- `skills/codex-bridge/references/cli.py` diff --git a/skills/codex-bridge/references/cli.py b/skills/codex-bridge/references/cli.py index 4810c26cda..c5b5294314 100644 --- a/skills/codex-bridge/references/cli.py +++ b/skills/codex-bridge/references/cli.py @@ -30,6 +30,8 @@ try: validate_start_input, validate_status_input, validate_steer_input, + validate_notify_completed_output, + validate_notify_target, ) except ImportError: from validator import ( # type: ignore @@ -46,6 +48,8 @@ except ImportError: validate_start_input, validate_status_input, validate_steer_input, + validate_notify_completed_output, + validate_notify_target, ) from tools.codex_bridge_tool import DEFAULT_APPROVAL_POLICY, DEFAULT_SANDBOX, codex_bridge @@ -75,6 +79,7 @@ def _prompt_from_args(args: argparse.Namespace) -> str: def cmd_start(args: argparse.Namespace) -> dict[str, Any]: prompt = _prompt_from_args(args) validate_start_input(prompt, args.cwd, args.sandbox, args.approval_policy) + notify_target = validate_notify_target(args.notify_target) return call_bridge( "start", prompt=prompt, @@ -83,6 +88,7 @@ def cmd_start(args: argparse.Namespace) -> dict[str, Any]: sandbox=args.sandbox, approval_policy=args.approval_policy, codex_home=args.codex_home, + notify_target=notify_target, ) @@ -95,6 +101,12 @@ def cmd_list(args: argparse.Namespace) -> dict[str, Any]: return call_bridge("list", limit=args.limit) +def cmd_notify_completed(args: argparse.Namespace) -> dict[str, Any]: + data = call_bridge("notify_completed", limit=args.limit, dry_run=args.dry_run) + validate_notify_completed_output(data) + return data + + def cmd_steer(args: argparse.Namespace) -> dict[str, Any]: validate_steer_input(args.task_id, args.instruction) return call_bridge("steer", task_id=args.task_id, instruction=args.instruction) @@ -126,6 +138,7 @@ def _smoke_prompt(wait_seconds: int) -> str: def cmd_smoke_test(args: argparse.Namespace) -> dict[str, Any]: validate_start_input(_smoke_prompt(args.wait), args.cwd, args.sandbox, args.approval_policy) + notify_target = validate_notify_target(args.notify_target) started = call_bridge( "start", prompt=_smoke_prompt(args.wait), @@ -134,6 +147,7 @@ def cmd_smoke_test(args: argparse.Namespace) -> dict[str, Any]: sandbox=args.sandbox, approval_policy=args.approval_policy, codex_home=args.codex_home, + notify_target=notify_target, ) task_id = started["task"]["hermes_task_id"] deadline = time.monotonic() + args.timeout @@ -166,6 +180,11 @@ def add_common_start_options(parser: argparse.ArgumentParser) -> None: parser.add_argument("--sandbox", default=DEFAULT_SANDBOX, type=validate_sandbox) parser.add_argument("--approval-policy", default=DEFAULT_APPROVAL_POLICY, type=validate_approval_policy) parser.add_argument("--codex-home", default=None, help="Optional CODEX_HOME override.") + parser.add_argument( + "--notify-target", + default=None, + help="Optional completion notification target, e.g. local or feishu:.", + ) def build_parser() -> argparse.ArgumentParser: @@ -186,6 +205,11 @@ def build_parser() -> argparse.ArgumentParser: list_parser.add_argument("--limit", type=int, default=10) list_parser.set_defaults(func=cmd_list) + notify = subparsers.add_parser("notify-completed", help="One-shot poll and notify completed tasks.") + notify.add_argument("--limit", type=int, default=10) + notify.add_argument("--dry-run", action="store_true", help="Preview notifications without sending or marking.") + notify.set_defaults(func=cmd_notify_completed) + steer = subparsers.add_parser("steer", help="Steer an active Codex turn.") steer.add_argument("task_id") steer.add_argument("--instruction", required=True) diff --git a/skills/codex-bridge/references/validator.py b/skills/codex-bridge/references/validator.py index 34ed62e3a1..a7ef48d010 100644 --- a/skills/codex-bridge/references/validator.py +++ b/skills/codex-bridge/references/validator.py @@ -11,6 +11,7 @@ ALLOWED_SANDBOXES = {"read-only", "workspace-write"} ALLOWED_APPROVAL_POLICIES = {"untrusted", "on-request"} ALLOWED_DECISIONS = {"accept", "acceptForSession", "decline", "cancel"} TERMINAL_STATUSES = {"completed", "failed", "cancelled"} +NOTIFICATION_STATUSES = {"sent", "failed", "no_target", "dry_run", "pending"} SMOKE_SENTINEL = "CODEX_ASYNC_OK" @@ -56,6 +57,15 @@ def validate_start_input(prompt: str, cwd: str, sandbox: str, approval_policy: s validate_approval_policy(approval_policy) +def validate_notify_target(target: str | None) -> str | None: + if target is None: + return None + normalized = target.strip() + if not normalized: + raise ValidationError("notify_target must be non-empty when provided.") + return normalized + + def validate_task_id(action: str, task_id: str | None) -> None: if not task_id or not str(task_id).strip(): raise ValidationError(f"{action} requires task_id.") @@ -123,10 +133,30 @@ def validate_bridge_output(action: str, data: Mapping[str, Any]) -> None: if action == "start": validate_start_output(data) return + if action == "notify_completed": + validate_notify_completed_output(data) + return if "success" in data and data.get("success") is not True: raise ValidationError(str(data.get("error") or f"{action} failed.")) +def validate_notify_completed_output(data: Mapping[str, Any]) -> None: + if data.get("success") is not True: + raise ValidationError("notify_completed output must have success=true.") + notifications = data.get("notifications") + if not isinstance(notifications, list): + raise ValidationError("notify_completed output must include notifications list.") + for item in notifications: + if not isinstance(item, Mapping): + raise ValidationError("notify_completed notifications must be objects.") + if not item.get("task_id"): + raise ValidationError("notify_completed notification missing task_id.") + status = item.get("notification_status") + if status not in NOTIFICATION_STATUSES: + allowed = ", ".join(sorted(NOTIFICATION_STATUSES)) + raise ValidationError(f"notification_status must be one of: {allowed}.") + + def contains_text(value: Any, needle: str) -> bool: if isinstance(value, str): return needle in value diff --git a/tests/skills/test_codex_bridge_skill.py b/tests/skills/test_codex_bridge_skill.py index e2f20dadb7..35a13c1329 100644 --- a/tests/skills/test_codex_bridge_skill.py +++ b/tests/skills/test_codex_bridge_skill.py @@ -123,10 +123,40 @@ def test_cli_start_validates_and_emits_bridge_json(tmp_path, monkeypatch, capsys "sandbox": "read-only", "approval_policy": "untrusted", "codex_home": None, + "notify_target": None, } ] +def test_cli_start_passes_notify_target(tmp_path, monkeypatch, capsys): + cli = load_reference_module("cli") + calls = [] + + def fake_codex_bridge(**kwargs): + calls.append(kwargs) + return json.dumps( + { + "success": True, + "protocol": {"mailbox": False, "transport": "app-server stdio"}, + "task": { + "hermes_task_id": "codex-abc", + "codex_thread_id": "thread-abc", + "codex_turn_id": "turn-abc", + "notify_target": kwargs["notify_target"], + }, + } + ) + + monkeypatch.setattr(cli, "codex_bridge", fake_codex_bridge) + + exit_code = cli.main(["start", "--cwd", str(tmp_path), "--notify-target", "local", "--prompt", "Analyze tests"]) + + assert exit_code == 0 + output = json.loads(capsys.readouterr().out) + assert output["task"]["notify_target"] == "local" + assert calls[0]["notify_target"] == "local" + + def test_cli_respond_maps_request_id_to_bridge_instruction(monkeypatch, capsys): cli = load_reference_module("cli") calls = [] @@ -218,3 +248,37 @@ def test_cli_smoke_test_polls_until_completed_with_sentinel(tmp_path, monkeypatc assert output["task_id"] == "codex-smoke" assert [call["action"] for call in calls] == ["start", "status"] assert "CODEX_ASYNC_OK" in calls[0]["prompt"] + assert calls[0]["notify_target"] is None + + +def test_cli_notify_completed_dry_run_uses_bridge_without_real_notifier(monkeypatch, capsys): + cli = load_reference_module("cli") + calls = [] + + def fake_codex_bridge(**kwargs): + calls.append(kwargs) + return json.dumps( + { + "success": True, + "dry_run": True, + "processed": 1, + "notifications": [ + { + "task_id": "codex-abc", + "target": "local", + "notification_status": "dry_run", + "sent": False, + "message": "preview", + } + ], + } + ) + + monkeypatch.setattr(cli, "codex_bridge", fake_codex_bridge) + + exit_code = cli.main(["notify-completed", "--limit", "5", "--dry-run"]) + + assert exit_code == 0 + output = json.loads(capsys.readouterr().out) + assert output["notifications"][0]["notification_status"] == "dry_run" + assert calls == [{"action": "notify_completed", "limit": 5, "dry_run": True}] diff --git a/tests/tools/test_codex_bridge_tool.py b/tests/tools/test_codex_bridge_tool.py index b6e6763951..4e65846000 100644 --- a/tests/tools/test_codex_bridge_tool.py +++ b/tests/tools/test_codex_bridge_tool.py @@ -75,6 +75,19 @@ def test_start_task_uses_app_server_thread_turn_without_mailbox(tmp_path, monkey assert "inbox" not in json.dumps(client.requests).lower() +def test_start_task_records_notify_target(tmp_path, monkeypatch): + manager = make_manager(tmp_path, monkeypatch) + + result = manager.start_task("Analyze tests", cwd=str(tmp_path), notify_target="feishu:chat-1") + task_id = result["task"]["hermes_task_id"] + + assert result["task"]["notify_target"] == "feishu:chat-1" + assert result["task"]["notification_status"] == "pending" + persisted = manager.status(task_id)["task"] + assert persisted["notify_target"] == "feishu:chat-1" + assert persisted["notification_status"] == "pending" + + def test_server_approval_request_can_be_reported_and_resolved(tmp_path, monkeypatch): manager = make_manager(tmp_path, monkeypatch) started = manager.start_task("Run a safe command", cwd=str(tmp_path)) @@ -143,8 +156,76 @@ def test_steer_and_interrupt_call_codex_turn_methods(tmp_path, monkeypatch): assert client.requests[-1][0] == "turn/interrupt" +def test_notify_completed_sends_once_for_targeted_completed_task(tmp_path, monkeypatch): + manager = make_manager(tmp_path, monkeypatch) + started = manager.start_task("Summarize a bug", cwd=str(tmp_path), notify_target="feishu:chat-1") + task_id = started["task"]["hermes_task_id"] + deliveries = [] + + manager.record_event( + task_id, + "turn/completed", + {"turn": {"id": "turn-1", "status": "completed"}, "message": "Done fixing it."}, + ) + + first = manager.notify_completed(notifier=lambda target, message: deliveries.append((target, message)) or {"ok": True}) + second = manager.notify_completed(notifier=lambda target, message: deliveries.append((target, message)) or {"ok": True}) + + assert first["processed"] == 1 + assert first["notifications"][0]["notification_status"] == "sent" + assert first["notifications"][0]["sent"] is True + assert second["processed"] == 0 + assert len(deliveries) == 1 + assert deliveries[0][0] == "feishu:chat-1" + assert task_id in deliveries[0][1] + assert manager.status(task_id)["task"]["notification_status"] == "sent" + + +def test_notify_completed_marks_no_target_without_sending(tmp_path, monkeypatch): + manager = make_manager(tmp_path, monkeypatch) + started = manager.start_task("No callback needed", cwd=str(tmp_path)) + task_id = started["task"]["hermes_task_id"] + + manager.record_event( + task_id, + "turn/completed", + {"turn": {"id": "turn-1", "status": "completed"}, "message": "Done."}, + ) + + result = manager.notify_completed(notifier=lambda _target, _message: (_ for _ in ()).throw(AssertionError("sent"))) + + assert result["processed"] == 1 + assert result["notifications"][0]["notification_status"] == "no_target" + assert result["notifications"][0]["sent"] is False + assert manager.status(task_id)["task"]["notification_status"] == "no_target" + + +def test_notify_completed_dry_run_does_not_send_or_mark(tmp_path, monkeypatch): + manager = make_manager(tmp_path, monkeypatch) + started = manager.start_task("Preview callback", cwd=str(tmp_path), notify_target="local") + task_id = started["task"]["hermes_task_id"] + + manager.record_event( + task_id, + "turn/completed", + {"turn": {"id": "turn-1", "status": "completed"}, "message": "Done."}, + ) + + result = manager.notify_completed( + dry_run=True, + notifier=lambda _target, _message: (_ for _ in ()).throw(AssertionError("sent")), + ) + + assert result["processed"] == 1 + assert result["notifications"][0]["notification_status"] == "dry_run" + assert result["notifications"][0]["sent"] is False + assert manager.status(task_id)["task"]["notification_status"] == "pending" + + def test_tool_schema_refuses_danger_full_access(): props = bridge.CODEX_BRIDGE_SCHEMA["parameters"]["properties"] assert "danger-full-access" not in props["sandbox"]["enum"] assert "never" not in props["approval_policy"]["enum"] + assert "notify_completed" in props["action"]["enum"] + assert "notify_target" in props diff --git a/tools/codex_bridge_tool.py b/tools/codex_bridge_tool.py index ee06c5f477..4471ca06e5 100644 --- a/tools/codex_bridge_tool.py +++ b/tools/codex_bridge_tool.py @@ -19,7 +19,7 @@ import time import uuid from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional from hermes_constants import get_hermes_home from tools.registry import registry, tool_error @@ -29,6 +29,7 @@ CODEX_BRIDGE_DB = "codex_bridge.db" DEFAULT_APPROVAL_POLICY = "untrusted" DEFAULT_SANDBOX = "read-only" EVENT_TAIL_LIMIT = 20 +TERMINAL_STATUSES = {"completed", "failed", "cancelled"} def check_codex_bridge_requirements() -> bool: @@ -113,6 +114,10 @@ class CodexBridgeStore: last_progress_summary TEXT, final_summary TEXT, error_summary TEXT, + notify_target TEXT, + notification_status TEXT, + notified_at REAL, + notification_error TEXT, created_at REAL NOT NULL, updated_at REAL NOT NULL, completed_at REAL @@ -143,6 +148,22 @@ class CodexBridgeStore: ); """ ) + self._ensure_task_columns(conn) + + def _ensure_task_columns(self, conn: sqlite3.Connection) -> None: + existing = { + row["name"] + for row in conn.execute("PRAGMA table_info(codex_bridge_tasks)").fetchall() + } + migrations = { + "notify_target": "ALTER TABLE codex_bridge_tasks ADD COLUMN notify_target TEXT", + "notification_status": "ALTER TABLE codex_bridge_tasks ADD COLUMN notification_status TEXT", + "notified_at": "ALTER TABLE codex_bridge_tasks ADD COLUMN notified_at REAL", + "notification_error": "ALTER TABLE codex_bridge_tasks ADD COLUMN notification_error TEXT", + } + for column, statement in migrations.items(): + if column not in existing: + conn.execute(statement) def upsert_task(self, task: "CodexBridgeTask") -> None: with self._lock, self._connect() as conn: @@ -152,8 +173,9 @@ class CodexBridgeStore: hermes_task_id, status, prompt_summary, codex_thread_id, codex_turn_id, cwd, model, sandbox, approval_policy, degraded_mode, last_progress_summary, final_summary, - error_summary, created_at, updated_at, completed_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + error_summary, notify_target, notification_status, notified_at, + notification_error, created_at, updated_at, completed_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(hermes_task_id) DO UPDATE SET status=excluded.status, codex_thread_id=excluded.codex_thread_id, @@ -161,6 +183,10 @@ class CodexBridgeStore: last_progress_summary=excluded.last_progress_summary, final_summary=excluded.final_summary, error_summary=excluded.error_summary, + notify_target=excluded.notify_target, + notification_status=excluded.notification_status, + notified_at=excluded.notified_at, + notification_error=excluded.notification_error, updated_at=excluded.updated_at, completed_at=excluded.completed_at """, @@ -178,6 +204,10 @@ class CodexBridgeStore: task.last_progress_summary, task.final_summary, task.error_summary, + task.notify_target, + task.notification_status, + task.notified_at, + task.notification_error, task.created_at, task.updated_at, task.completed_at, @@ -299,7 +329,9 @@ class CodexBridgeStore: """ SELECT hermes_task_id, status, prompt_summary, codex_thread_id, codex_turn_id, last_progress_summary, final_summary, - error_summary, created_at, updated_at, completed_at + error_summary, notify_target, notification_status, + notified_at, notification_error, created_at, updated_at, + completed_at FROM codex_bridge_tasks ORDER BY updated_at DESC LIMIT ? """, @@ -307,6 +339,44 @@ class CodexBridgeStore: ).fetchall() return [dict(r) for r in rows] + def list_completed_for_notification(self, limit: int = 10) -> List[Dict[str, Any]]: + placeholders = ",".join("?" for _ in TERMINAL_STATUSES) + with self._connect() as conn: + rows = conn.execute( + f""" + SELECT * + FROM codex_bridge_tasks + WHERE status IN ({placeholders}) + AND ( + notification_status IS NULL + OR notification_status='pending' + OR notification_status='failed' + ) + ORDER BY completed_at ASC, updated_at ASC + LIMIT ? + """, + (*sorted(TERMINAL_STATUSES), limit), + ).fetchall() + return [dict(r) for r in rows] + + def update_notification_status( + self, + task_id: str, + status: str, + *, + notified_at: Optional[float] = None, + error: Optional[str] = None, + ) -> None: + with self._lock, self._connect() as conn: + conn.execute( + """ + UPDATE codex_bridge_tasks + SET notification_status=?, notified_at=?, notification_error=?, updated_at=? + WHERE hermes_task_id=? + """, + (status, notified_at, error, _now(), task_id), + ) + @dataclass class CodexBridgeTask: @@ -323,6 +393,10 @@ class CodexBridgeTask: last_progress_summary: Optional[str] = None final_summary: Optional[str] = None error_summary: Optional[str] = None + notify_target: Optional[str] = None + notification_status: Optional[str] = None + notified_at: Optional[float] = None + notification_error: Optional[str] = None created_at: float = field(default_factory=_now) updated_at: float = field(default_factory=_now) completed_at: Optional[float] = None @@ -343,6 +417,10 @@ class CodexBridgeTask: "last_progress_summary": self.last_progress_summary, "final_summary": self.final_summary, "error_summary": self.error_summary, + "notify_target": self.notify_target, + "notification_status": self.notification_status, + "notified_at": self.notified_at, + "notification_error": self.notification_error, "created_at": self.created_at, "updated_at": self.updated_at, "completed_at": self.completed_at, @@ -489,6 +567,7 @@ class CodexBridgeManager: sandbox: str = DEFAULT_SANDBOX, approval_policy: str = DEFAULT_APPROVAL_POLICY, codex_home: Optional[str] = None, + notify_target: Optional[str] = None, ) -> Dict[str, Any]: if not prompt or not prompt.strip(): raise ValueError("codex_bridge start requires a non-empty prompt.") @@ -505,6 +584,8 @@ class CodexBridgeManager: model=model, sandbox=sandbox, approval_policy=approval_policy, + notify_target=notify_target.strip() if notify_target and notify_target.strip() else None, + notification_status="pending" if notify_target and notify_target.strip() else None, ) client = CodexJsonRpcClient(task_id, task, self) with self._lock: @@ -579,6 +660,8 @@ class CodexBridgeManager: if stored: snap["recent_events"] = stored.get("recent_events", []) snap["pending_requests"] = stored.get("pending_requests", snap["pending_requests"]) + for key in ("notification_status", "notified_at", "notification_error"): + snap[key] = stored.get(key) return {"success": True, "task": snap} stored = self.store.get_task_snapshot(task_id) if stored: @@ -589,6 +672,70 @@ class CodexBridgeManager: def list_tasks(self, limit: int = 10) -> Dict[str, Any]: return {"success": True, "tasks": self.store.list_tasks(limit=limit)} + def notify_completed( + self, + *, + limit: int = 10, + dry_run: bool = False, + notifier: Optional[Callable[[str, str], Any]] = None, + ) -> Dict[str, Any]: + notifier = notifier or _default_completion_notifier + candidates = self.store.list_completed_for_notification(limit=limit) + results: List[Dict[str, Any]] = [] + for task in candidates: + task_id = str(task["hermes_task_id"]) + target = str(task.get("notify_target") or "").strip() + message = _completion_notification_message(task) + if not target: + result = { + "task_id": task_id, + "status": task.get("status"), + "notification_status": "no_target", + "sent": False, + "message": message, + } + if not dry_run: + self.store.update_notification_status(task_id, "no_target") + results.append(result) + continue + + result = { + "task_id": task_id, + "status": task.get("status"), + "target": target, + "notification_status": "dry_run" if dry_run else "pending", + "sent": False, + "message": message, + } + if dry_run: + results.append(result) + continue + + try: + delivery = notifier(target, message) + except Exception as exc: + error = str(exc) + self.store.update_notification_status(task_id, "failed", error=error) + result["notification_status"] = "failed" + result["error"] = error + results.append(result) + continue + + notified_at = _now() + self.store.update_notification_status(task_id, "sent", notified_at=notified_at) + result["notification_status"] = "sent" + result["sent"] = True + result["notified_at"] = notified_at + result["delivery"] = delivery + results.append(result) + + return { + "success": True, + "dry_run": dry_run, + "processed": len(results), + "notifications": results, + } + def steer(self, task_id: str, instruction: str) -> Dict[str, Any]: task, client = self._active(task_id) if not task.codex_thread_id or not task.codex_turn_id: @@ -726,6 +873,35 @@ def _get_manager() -> CodexBridgeManager: return _MANAGER +def _completion_notification_message(task: Dict[str, Any]) -> str: + task_id = task.get("hermes_task_id") or "unknown" + status = task.get("status") or "unknown" + prompt = task.get("prompt_summary") or "(no prompt summary)" + summary = task.get("final_summary") or task.get("error_summary") or task.get("last_progress_summary") or "" + lines = [ + f"Codex Bridge task {task_id} finished with status: {status}.", + f"Prompt: {prompt}", + ] + if summary: + lines.append(f"Summary: {_summarize_payload(summary, max_chars=700)}") + return "\n".join(lines) + + +def _default_completion_notifier(target: str, message: str) -> Dict[str, Any]: + if target == "local": + return {"success": True, "target": target, "local": True} + from tools.send_message_tool import send_message_tool + + raw = send_message_tool({"action": "send", "target": target, "message": message}) + try: + result = json.loads(raw) + except Exception: + result = {"raw": raw} + if isinstance(result, dict) and result.get("error"): + raise RuntimeError(str(result["error"])) + return result if isinstance(result, dict) else {"result": result} + + def codex_bridge( action: str, prompt: Optional[str] = None, @@ -738,7 +914,9 @@ def codex_bridge( sandbox: str = DEFAULT_SANDBOX, approval_policy: str = DEFAULT_APPROVAL_POLICY, codex_home: Optional[str] = None, + notify_target: Optional[str] = None, limit: int = 10, + dry_run: bool = False, ) -> str: try: action = (action or "").strip().lower() @@ -750,6 +928,7 @@ def codex_bridge( sandbox=sandbox or DEFAULT_SANDBOX, approval_policy=approval_policy or DEFAULT_APPROVAL_POLICY, codex_home=codex_home, + notify_target=notify_target, ) elif action == "status": if not task_id: @@ -757,6 +936,8 @@ def codex_bridge( result = _get_manager().status(task_id) elif action == "list": result = _get_manager().list_tasks(limit=limit) + elif action == "notify_completed": + result = _get_manager().notify_completed(limit=limit, dry_run=dry_run) elif action == "steer": if not task_id or not instruction: raise ValueError("codex_bridge steer requires task_id and instruction.") @@ -770,7 +951,7 @@ def codex_bridge( raise ValueError("codex_bridge respond requires task_id and instruction=request_id.") result = _get_manager().respond(task_id, instruction, decision=decision, answers=answers) else: - raise ValueError("action must be one of start, status, list, steer, interrupt, respond.") + raise ValueError("action must be one of start, status, list, notify_completed, steer, interrupt, respond.") return _json_dumps(result) except Exception as exc: return tool_error(str(exc)) @@ -782,14 +963,14 @@ CODEX_BRIDGE_SCHEMA = { "Start and control local Codex tasks through Codex app-server JSON-RPC. " "Uses stdio/WebSocket-capable app-server protocol semantics and never " "uses mailbox, inbox, or outbox files as the communication path. " - "Actions: start, status, list, steer, interrupt, respond." + "Actions: start, status, list, notify_completed, steer, interrupt, respond." ), "parameters": { "type": "object", "properties": { "action": { "type": "string", - "enum": ["start", "status", "list", "steer", "interrupt", "cancel", "respond"], + "enum": ["start", "status", "list", "notify_completed", "steer", "interrupt", "cancel", "respond"], "description": "Bridge operation to perform.", }, "prompt": {"type": "string", "description": "Task prompt for action=start."}, @@ -823,7 +1004,18 @@ CODEX_BRIDGE_SCHEMA = { "type": "string", "description": "Optional CODEX_HOME override for testing or isolated runs.", }, + "notify_target": { + "type": "string", + "description": ( + "Optional completion notification target for action=start, such as " + "'local', 'feishu:', or any send_message target." + ), + }, "limit": {"type": "integer", "description": "List limit for action=list."}, + "dry_run": { + "type": "boolean", + "description": "For action=notify_completed, preview notifications without sending or marking tasks.", + }, }, "required": ["action"], }, @@ -846,7 +1038,9 @@ registry.register( sandbox=args.get("sandbox", DEFAULT_SANDBOX), approval_policy=args.get("approval_policy", DEFAULT_APPROVAL_POLICY), codex_home=args.get("codex_home"), + notify_target=args.get("notify_target"), limit=args.get("limit", 10), + dry_run=bool(args.get("dry_run", False)), ), check_fn=check_codex_bridge_requirements, emoji="C",