fix(kanban): align failure diagnostics with retry limit

This commit is contained in:
qWaitCrypto 2026-05-14 17:07:57 +08:00 committed by Teknium
parent 6e60a8a092
commit d9fef0c8ab
4 changed files with 134 additions and 15 deletions

View file

@ -1393,6 +1393,11 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
the dashboard uses, so CLI output matches what the UI shows.
"""
from hermes_cli import kanban_diagnostics as kd
from hermes_cli.config import load_config
diag_config = kd.config_from_kanban_config(
(load_config().get("kanban") or {})
)
with kb.connect() as conn:
# Either one-task mode or fleet mode.
@ -1406,6 +1411,7 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
task,
kb.list_events(conn, args.task),
kb.list_runs(conn, args.task),
config=diag_config,
)
}
else:
@ -1433,7 +1439,12 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
diags_by_task = {}
for r in rows:
tid = r["id"]
dl = kd.compute_task_diagnostics(r, ev_by.get(tid, []), run_by.get(tid, []))
dl = kd.compute_task_diagnostics(
r,
ev_by.get(tid, []),
run_by.get(tid, []),
config=diag_config,
)
if dl:
diags_by_task[tid] = dl

View file

@ -230,6 +230,14 @@ def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAct
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
def _positive_int(value: Any, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed >= 1 else default
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Blocked-hallucination gate fires: a worker called kanban_complete
with created_cards that didn't exist or weren't created by the
@ -319,18 +327,19 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
all look the same: the kernel keeps retrying and the operator
needs to intervene.
Threshold: cfg["failure_threshold"] (default 3). A threshold of 3
is one below the circuit-breaker's default (5), so the diagnostic
surfaces BEFORE the breaker trips giving operators a window to
fix the problem while the dispatcher's still retrying.
Threshold: cfg["failure_threshold"]. Runtime callers should derive
this from ``kanban.failure_limit`` unless the user explicitly set a
diagnostics threshold, so the signal does not lag behind the
dispatcher's circuit breaker.
Accepts the legacy ``spawn_failure_threshold`` config key for
back-compat.
"""
threshold = int(cfg.get(
threshold = _positive_int(cfg.get(
"failure_threshold",
cfg.get("spawn_failure_threshold", 3),
))
), 3)
failure_limit = _positive_int(cfg.get("failure_limit"), threshold)
# Read the new unified counter name, with a fallback to the legacy
# column name so this rule keeps working against old DB rows the
# caller somehow materialised without running the migration.
@ -402,10 +411,9 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
f"This task has failed {failures} times in a row "
f"(most recent: {outcome_label}). Full last error:\n\n"
f"{err_snippet}\n\n"
f"The dispatcher will keep retrying until the consecutive-"
f"failures counter trips the circuit breaker (default 5), "
f"at which point the task auto-blocks. Fix the root cause "
f"and reclaim to retry."
f"The dispatcher circuit breaker is configured for "
f"{failure_limit} consecutive non-success attempts. Fix the "
f"root cause and reclaim or unblock the task to retry."
)
else:
title = f"Agent {outcome_label} x{failures} (no error recorded)"
@ -427,6 +435,8 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
"consecutive_failures": failures,
"most_recent_outcome": most_recent_outcome,
"last_error": last_err,
"failure_threshold": threshold,
"failure_limit": failure_limit,
},
)]
@ -716,9 +726,11 @@ DIAGNOSTIC_KINDS = (
DEFAULT_CONFIG = {
"failure_threshold": 3,
# Match the dispatcher default (kanban.failure_limit) so repeated-failure
# diagnostics do not lag behind the default auto-block threshold.
"failure_threshold": 2,
# Legacy alias accepted at read time by _rule_repeated_failures.
"spawn_failure_threshold": 3,
"spawn_failure_threshold": 2,
"crash_threshold": 2,
"blocked_stale_hours": 24,
# Stranded-task threshold. 30 min by default — below that, the
@ -728,6 +740,28 @@ DEFAULT_CONFIG = {
}
def config_from_kanban_config(kanban_cfg: Optional[dict]) -> dict:
"""Build diagnostics config from the runtime ``kanban`` config section.
``kanban.diagnostics.failure_threshold`` remains an explicit override.
Otherwise, derive the repeated-failure threshold from
``kanban.failure_limit`` so CLI/dashboard diagnostics match the
dispatcher's actual circuit-breaker threshold.
"""
kanban_cfg = kanban_cfg or {}
diag_cfg = dict(kanban_cfg.get("diagnostics") or {})
diag_cfg.setdefault(
"failure_limit",
kanban_cfg.get("failure_limit", DEFAULT_CONFIG["failure_threshold"]),
)
if (
"failure_threshold" not in diag_cfg
and "spawn_failure_threshold" not in diag_cfg
):
diag_cfg["failure_threshold"] = diag_cfg["failure_limit"]
return diag_cfg
def compute_task_diagnostics(
task,
events: list,
@ -743,7 +777,17 @@ def compute_task_diagnostics(
most-recent ``last_seen_at``.
"""
now_ts = int(now if now is not None else time.time())
cfg = {**DEFAULT_CONFIG, **(config or {})}
config = config or {}
cfg = {**DEFAULT_CONFIG, **config}
if (
"failure_threshold" not in config
and "spawn_failure_threshold" not in config
and "failure_limit" in config
):
cfg["failure_threshold"] = _positive_int(
config.get("failure_limit"),
DEFAULT_CONFIG["failure_threshold"],
)
out: list[Diagnostic] = []
for rule in _RULES:
try: