diff --git a/gateway/kanban_watchers.py b/gateway/kanban_watchers.py index 21753054f01..5bcf70c8d21 100644 --- a/gateway/kanban_watchers.py +++ b/gateway/kanban_watchers.py @@ -16,13 +16,45 @@ import os import sqlite3 import time from pathlib import Path -from typing import Any, Optional +from typing import Any, Callable, Optional # Match the logger run.py uses (logging.getLogger(__name__) where __name__ == # "gateway.run") so extracted log records keep their original logger name. logger = logging.getLogger("gateway.run") +def _resolve_auto_decompose_settings( + load_config: Callable[[], Any], +) -> "tuple[bool, int]": + """Resolve the live (enabled, per_tick) auto-decompose settings. + + Read fresh from config on every dispatcher tick (#49638) so that flipping + ``kanban.auto_decompose: false`` to STOP runaway fan-out takes effect on the + next tick instead of requiring a gateway restart. Auto-decompose is a + safety toggle — a user who sees it create and launch tasks they didn't + intend reaches for this flag to halt it, and a stale boot-captured value + silently ignoring that change is the bug reported in #49638. + + Fails **safe**: if the config read raises, return ``(False, 3)`` — a + transient read error must never re-enable a feature the user turned off, + nor fall back to the burst-prone default-on behaviour. ``per_tick`` is + clamped to ``>= 1``. + """ + try: + cfg = load_config() + except Exception: + return False, 3 + kcfg = cfg.get("kanban", {}) if isinstance(cfg, dict) else {} + enabled = bool(kcfg.get("auto_decompose", True)) + try: + per_tick = int(kcfg.get("auto_decompose_per_tick", 3) or 3) + except (TypeError, ValueError): + per_tick = 3 + if per_tick < 1: + per_tick = 1 + return enabled, per_tick + + def _acquire_singleton_lock(lock_path) -> "tuple[Optional[object], str]": """Take an exclusive, non-blocking advisory lock for the sole dispatcher. @@ -985,17 +1017,20 @@ class GatewayKanbanWatchersMixin: # ``kanban.auto_decompose_per_tick`` (default 3) so a bulk-load # of triage tasks doesn't burst-spend the aux LLM in one tick; # remainder defers to subsequent ticks. - auto_decompose_enabled = bool(kanban_cfg.get("auto_decompose", True)) - try: - auto_decompose_per_tick = int( - kanban_cfg.get("auto_decompose_per_tick", 3) or 3 - ) - except (TypeError, ValueError): - auto_decompose_per_tick = 3 - if auto_decompose_per_tick < 1: - auto_decompose_per_tick = 1 + # + # The flag is re-read from config EVERY tick (#49638) rather than + # captured once at boot. Auto-decompose is a safety toggle: a user who + # sees it fan out and run tasks they didn't intend reaches for + # ``kanban.auto_decompose: false`` to STOP it — and that must take + # effect on the next tick, not require a gateway restart. (Reported: + # auto-decompose created and launched destructive tasks while the user + # was still typing the task description, and the flag "couldn't be + # disabled" because the gateway had captured its boot-time value.) + def _read_auto_decompose_settings() -> tuple[bool, int]: + """Re-resolve (enabled, per_tick) from current config each tick.""" + return _resolve_auto_decompose_settings(_load_config) - def _auto_decompose_tick() -> int: + def _auto_decompose_tick(auto_decompose_per_tick: int) -> int: """Run the auto-decomposer for up to N triage tasks across all boards. Returns the number of triage tasks that were successfully decomposed or specified this tick. @@ -1090,8 +1125,12 @@ class GatewayKanbanWatchersMixin: logger.exception("kanban dispatcher: zombie reaper failed") try: - if auto_decompose_enabled: - await asyncio.to_thread(_auto_decompose_tick) + # Re-read the auto-decompose toggle live each tick so a user + # flipping kanban.auto_decompose=false to STOP runaway fan-out + # takes effect on the next tick, not on gateway restart (#49638). + _ad_enabled, _ad_per_tick = _read_auto_decompose_settings() + if _ad_enabled: + await asyncio.to_thread(_auto_decompose_tick, _ad_per_tick) results = await asyncio.to_thread(_tick_once) any_spawned = False for slug, res in (results or []): diff --git a/tests/gateway/test_kanban_auto_decompose_live.py b/tests/gateway/test_kanban_auto_decompose_live.py new file mode 100644 index 00000000000..700252b24df --- /dev/null +++ b/tests/gateway/test_kanban_auto_decompose_live.py @@ -0,0 +1,83 @@ +"""Tests for live auto-decompose settings resolution (issue #49638). + +The gateway dispatcher used to capture ``kanban.auto_decompose`` once at boot, +so a user who flipped it to ``false`` to STOP runaway auto-decompose (which had +created and launched tasks they didn't intend) found the flag had no effect +without a full gateway restart. ``_resolve_auto_decompose_settings`` is now +called every tick, reading the current config. +""" + +from __future__ import annotations + +import pytest + +from gateway.kanban_watchers import _resolve_auto_decompose_settings + + +def test_enabled_by_default_when_key_absent(): + enabled, per_tick = _resolve_auto_decompose_settings(lambda: {"kanban": {}}) + assert enabled is True + assert per_tick == 3 + + +def test_disabled_when_flag_false(): + enabled, per_tick = _resolve_auto_decompose_settings( + lambda: {"kanban": {"auto_decompose": False}} + ) + assert enabled is False + + +def test_per_tick_respected_and_clamped(): + enabled, per_tick = _resolve_auto_decompose_settings( + lambda: {"kanban": {"auto_decompose": True, "auto_decompose_per_tick": 7}} + ) + assert (enabled, per_tick) == (True, 7) + + # 0 is treated as "unset" by the `or 3` fallback → default 3 (a 0 per-tick + # cap would disable progress, so falling back to the default is the safe read). + _, per_tick_zero = _resolve_auto_decompose_settings( + lambda: {"kanban": {"auto_decompose_per_tick": 0}} + ) + assert per_tick_zero == 3 + + # A genuine negative value clamps up to 1. + _, per_tick_neg = _resolve_auto_decompose_settings( + lambda: {"kanban": {"auto_decompose_per_tick": -5}} + ) + assert per_tick_neg == 1 + + +def test_malformed_per_tick_falls_back_to_default(): + _, per_tick = _resolve_auto_decompose_settings( + lambda: {"kanban": {"auto_decompose_per_tick": "lots"}} + ) + assert per_tick == 3 + + +def test_config_read_error_fails_safe_disabled(): + """A transient config read failure must DISABLE auto-decompose, never + silently fall back to the default-on behaviour the user turned off.""" + + def _boom(): + raise RuntimeError("config read failed") + + enabled, per_tick = _resolve_auto_decompose_settings(_boom) + assert enabled is False + assert per_tick == 3 + + +def test_non_dict_config_fails_safe(): + enabled, _ = _resolve_auto_decompose_settings(lambda: None) + assert enabled is True # no kanban key → default-on (not an error path) + enabled2, _ = _resolve_auto_decompose_settings(lambda: ["not", "a", "dict"]) + assert enabled2 is True + + +def test_live_toggle_takes_effect_between_calls(): + """Simulate a user flipping the flag while the dispatcher runs: a later + resolution reflects the new value without any restart.""" + state = {"kanban": {"auto_decompose": True}} + assert _resolve_auto_decompose_settings(lambda: state)[0] is True + # User edits config.yaml mid-run. + state["kanban"]["auto_decompose"] = False + assert _resolve_auto_decompose_settings(lambda: state)[0] is False