diff --git a/gateway/run.py b/gateway/run.py index 13cae5297a8..b2dea3d3c47 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1249,6 +1249,7 @@ class GatewayRunner: # Per-session reasoning effort overrides from /reasoning. # Key: session_key, Value: parsed reasoning config dict. self._session_reasoning_overrides: Dict[str, Dict[str, Any]] = {} + self._kanban_notifier_profile = self._active_profile_name() # Teams meeting pipeline runtime (bound later when msgraph_webhook adapter exists). self._teams_pipeline_runtime = None self._teams_pipeline_runtime_error: Optional[str] = None @@ -4071,6 +4072,14 @@ class GatewayRunner: break await asyncio.sleep(1) + def _active_profile_name(self) -> str: + """Return the profile name this gateway represents.""" + try: + from hermes_cli.profiles import get_active_profile_name + return get_active_profile_name() or "default" + except Exception: + return "default" + async def _kanban_notifier_watcher(self, interval: float = 5.0) -> None: """Poll ``kanban_notify_subs`` and deliver terminal events to users. @@ -4119,6 +4128,10 @@ class GatewayRunner: self, "_kanban_sub_fail_counts", {} ) self._kanban_sub_fail_counts = sub_fail_counts + notifier_profile = getattr(self, "_kanban_notifier_profile", None) + if not notifier_profile: + notifier_profile = self._active_profile_name() + self._kanban_notifier_profile = notifier_profile # Initial delay so the gateway can finish wiring adapters. await asyncio.sleep(5) @@ -4181,6 +4194,13 @@ class GatewayRunner: if not subs: logger.debug("kanban notifier: board %s has no subscriptions", slug) for sub in subs: + owner_profile = sub.get("notifier_profile") or None + if owner_profile and owner_profile != notifier_profile: + logger.debug( + "kanban notifier: subscription for %s owned by profile %s; current profile %s skipping", + sub.get("task_id"), owner_profile, notifier_profile, + ) + continue platform = (sub.get("platform") or "").lower() if platform not in active_platforms: logger.debug( @@ -8343,6 +8363,7 @@ class GatewayRunner: platform=platform_str, chat_id=chat_id, thread_id=thread_id or None, user_id=user_id, + notifier_profile=getattr(self, "_kanban_notifier_profile", None) or self._active_profile_name(), ) finally: conn.close() diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 00a61b41d4d..21f7a7aa3dd 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -278,8 +278,8 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu "durations (90s, 30m, 2h, 1d). When exceeded, " "the dispatcher SIGTERMs (then SIGKILLs) the worker " "and re-queues the task.") - p_create.add_argument("--created-by", default="user", - help="Author name recorded on the task (default: user)") + p_create.add_argument("--created-by", default=None, + help="Author name recorded on the task (default: active profile or user)") p_create.add_argument("--skill", action="append", default=[], dest="skills", help="Skill to force-load into the worker " "(repeatable). Appended to the built-in " @@ -510,6 +510,10 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_nsub.add_argument("--chat-id", required=True) p_nsub.add_argument("--thread-id", default=None) p_nsub.add_argument("--user-id", default=None) + p_nsub.add_argument( + "--notifier-profile", default=None, + help="Profile gateway that owns/delivers this subscription (default: active profile)", + ) p_nlist = sub.add_parser( "notify-list", @@ -1921,6 +1925,7 @@ def _cmd_notify_subscribe(args: argparse.Namespace) -> int: conn, task_id=args.task_id, platform=args.platform, chat_id=args.chat_id, thread_id=args.thread_id, user_id=args.user_id, + notifier_profile=args.notifier_profile or _profile_author(), ) print(f"Subscribed {args.platform}:{args.chat_id}" + (f":{args.thread_id}" if args.thread_id else "") @@ -1939,8 +1944,9 @@ def _cmd_notify_list(args: argparse.Namespace) -> int: return 0 for s in subs: thr = f":{s['thread_id']}" if s.get("thread_id") else "" + owner = f" owner={s['notifier_profile']}" if s.get("notifier_profile") else "" print(f" {s['task_id']:10s} {s['platform']}:{s['chat_id']}{thr}" - f" (since event {s['last_event_id']})") + f" (since event {s['last_event_id']}){owner}") return 0 diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index a57b27ce5f5..f414766ef5c 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -861,6 +861,7 @@ CREATE TABLE IF NOT EXISTS kanban_notify_subs ( chat_id TEXT NOT NULL, thread_id TEXT NOT NULL DEFAULT '', user_id TEXT, + notifier_profile TEXT, created_at INTEGER NOT NULL, last_event_id INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (task_id, platform, chat_id, thread_id) @@ -1085,6 +1086,18 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: "ON task_events(run_id, id)" ) + notify_table_exists = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='kanban_notify_subs'" + ).fetchone() is not None + if notify_table_exists: + notify_cols = { + row["name"] for row in conn.execute("PRAGMA table_info(kanban_notify_subs)") + } + if "notifier_profile" not in notify_cols: + _add_column_if_missing( + conn, "kanban_notify_subs", "notifier_profile", "notifier_profile TEXT" + ) + # One-shot backfill: any task that is 'running' before runs existed # had its claim_lock / claim_expires / worker_pid on the task row. # Synthesize a matching task_runs row so subsequent end-run / heartbeat @@ -4341,6 +4354,7 @@ def add_notify_sub( chat_id: str, thread_id: Optional[str] = None, user_id: Optional[str] = None, + notifier_profile: Optional[str] = None, ) -> None: """Register a gateway source that wants terminal-state notifications for ``task_id``. Idempotent on (task, platform, chat, thread).""" @@ -4349,10 +4363,10 @@ def add_notify_sub( conn.execute( """ INSERT OR IGNORE INTO kanban_notify_subs - (task_id, platform, chat_id, thread_id, user_id, created_at) - VALUES (?, ?, ?, ?, ?, ?) + (task_id, platform, chat_id, thread_id, user_id, notifier_profile, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) """, - (task_id, platform, chat_id, thread_id or "", user_id, now), + (task_id, platform, chat_id, thread_id or "", user_id, notifier_profile, now), ) diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index d6d1bf49483..17252af827a 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -510,10 +510,12 @@ def test_notify_sub_crud(kanban_home): tid = kb.create_task(conn, title="x") kb.add_notify_sub( conn, task_id=tid, platform="telegram", chat_id="123", user_id="u1", + notifier_profile="default", ) subs = kb.list_notify_subs(conn, tid) assert len(subs) == 1 assert subs[0]["platform"] == "telegram" + assert subs[0]["notifier_profile"] == "default" # Duplicate add is a no-op. kb.add_notify_sub( conn, task_id=tid, platform="telegram", chat_id="123", diff --git a/tests/hermes_cli/test_kanban_notify.py b/tests/hermes_cli/test_kanban_notify.py index 6e59fb45aa7..e1c421594aa 100644 --- a/tests/hermes_cli/test_kanban_notify.py +++ b/tests/hermes_cli/test_kanban_notify.py @@ -319,3 +319,113 @@ def test_dispatcher_tick_does_not_call_init_db(kanban_home, monkeypatch): "_kanban_notifier_watcher must not call _kb.init_db(board=slug) — " "see issue #21378." ) + + +@pytest.mark.asyncio +async def test_notifier_skips_subscription_owned_by_other_profile(kanban_home): + """Each gateway keeps its watcher on, but only the subscribing profile claims.""" + import hermes_cli.kanban_db as kb + from gateway.run import GatewayRunner + from gateway.config import Platform + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="owned task", assignee="backend-engineer") + kb.add_notify_sub( + conn, + task_id=tid, + platform="telegram", + chat_id="chat1", + notifier_profile="default", + ) + kb.complete_task(conn, tid, result="done") + finally: + conn.close() + + runner = object.__new__(GatewayRunner) + runner._running = True + runner._kanban_sub_fail_counts = {} + runner._kanban_notifier_profile = "business-partner" + + fake_adapter = MagicMock() + fake_adapter.send = AsyncMock() + runner.adapters = {Platform.TELEGRAM: fake_adapter} + + _orig_sleep = asyncio.sleep + tick_count = 0 + + async def _fast_sleep(_): + nonlocal tick_count + await _orig_sleep(0) + tick_count += 1 + if tick_count >= 3: + runner._running = False + + with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): + await asyncio.wait_for( + runner._kanban_notifier_watcher(interval=1), + timeout=10.0, + ) + + fake_adapter.send.assert_not_called() + conn = kb.connect() + try: + subs = kb.list_notify_subs(conn, tid) + finally: + conn.close() + assert len(subs) == 1 + assert int(subs[0]["last_event_id"]) == 0, "wrong profile must not claim the event" + + +@pytest.mark.asyncio +async def test_notifier_delivers_subscription_owned_by_current_profile(kanban_home): + """The gateway for the profile that created/subscribed the task reports it.""" + import hermes_cli.kanban_db as kb + from gateway.run import GatewayRunner + from gateway.config import Platform + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="owned task", assignee="backend-engineer") + kb.add_notify_sub( + conn, + task_id=tid, + platform="telegram", + chat_id="chat1", + notifier_profile="default", + ) + kb.complete_task(conn, tid, result="done") + finally: + conn.close() + + runner = object.__new__(GatewayRunner) + runner._running = True + runner._kanban_sub_fail_counts = {} + runner._kanban_notifier_profile = "default" + + fake_adapter = MagicMock() + + async def _send_and_stop(chat_id, msg, metadata=None): + runner._running = False + + fake_adapter.send = AsyncMock(side_effect=_send_and_stop) + runner.adapters = {Platform.TELEGRAM: fake_adapter} + + _orig_sleep = asyncio.sleep + + async def _fast_sleep(_): + await _orig_sleep(0) + + with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): + await asyncio.wait_for( + runner._kanban_notifier_watcher(interval=1), + timeout=10.0, + ) + + fake_adapter.send.assert_called_once() + conn = kb.connect() + try: + subs = kb.list_notify_subs(conn, tid) + finally: + conn.close() + assert subs == []