mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-15 09:21:36 +00:00
fix(kanban): don't permanently block tasks that hit a provider rate limit (#38223)
A kanban worker that exhausted its retries purely on a provider rate limit / quota wall (e.g. opencode-go's 5-hour window) exited with code 1. The dispatcher counted that as a crash, and with DEFAULT_FAILURE_LIMIT=2 two quota-wall hits permanently blocked the card. Fanning out many workers against one shared quota made this routine. Now a rate-limited worker exits with EX_TEMPFAIL (75); the dispatcher classifies that as a 'rate_limited' exit, releases the task back to 'ready' WITHOUT incrementing consecutive_failures (the breaker can't trip on a transient throttle), and the respawn guard defers the next attempt on a cooldown (default 5min, HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS) until the quota window clears. Genuine crashes still count and trip the breaker as before. The 120s Retry-After cap is unchanged — no worker parks for hours holding a slot. - conversation_loop.py: surface failure_reason in the exhaustion return - cli.py: kanban worker picks exit 75 on rate_limit/billing failure - kanban_db.py: rate_limited exit kind, no-count requeue, cooldown guard
This commit is contained in:
parent
60b6352fe5
commit
4c544b633d
4 changed files with 408 additions and 16 deletions
|
|
@ -3378,6 +3378,12 @@ def run_conversation(
|
|||
"completed": False,
|
||||
"failed": True,
|
||||
"error": _final_summary,
|
||||
# Surface the classified reason so callers (notably the
|
||||
# kanban worker path in cli.py) can distinguish a
|
||||
# transient throttle from a real failure and choose a
|
||||
# different exit code. ``rate_limit`` / ``billing`` here
|
||||
# mean "quota wall, not a task error".
|
||||
"failure_reason": classified.reason.value,
|
||||
}
|
||||
|
||||
# For rate limits, respect the Retry-After header if present
|
||||
|
|
|
|||
30
cli.py
30
cli.py
|
|
@ -15807,9 +15807,33 @@ def main(
|
|||
|
||||
# Session ID goes to stderr so piped stdout is clean.
|
||||
print(f"\nsession_id: {cli.session_id}", file=sys.stderr)
|
||||
|
||||
# Ensure proper exit code for automation wrappers
|
||||
sys.exit(1 if isinstance(result, dict) and result.get("failed") else 0)
|
||||
|
||||
# Ensure proper exit code for automation wrappers.
|
||||
#
|
||||
# Kanban workers get a special case: when the run failed
|
||||
# purely because the provider rate-limited / exhausted
|
||||
# quota (not because the task itself is broken), exit with
|
||||
# the EX_TEMPFAIL sentinel instead of the generic 1. The
|
||||
# dispatcher's reap classifier maps that code to a
|
||||
# ``rate_limited`` exit and releases the task back to
|
||||
# ``ready`` WITHOUT incrementing the failure counter, so a
|
||||
# 5-hour quota window can't trip the circuit breaker and
|
||||
# permanently block the card. Non-kanban runs keep the
|
||||
# plain 0/1 contract automation wrappers expect.
|
||||
_exit_code = 0
|
||||
if isinstance(result, dict) and result.get("failed"):
|
||||
_exit_code = 1
|
||||
if os.environ.get("HERMES_KANBAN_TASK") and result.get(
|
||||
"failure_reason"
|
||||
) in ("rate_limit", "billing"):
|
||||
try:
|
||||
from hermes_cli.kanban_db import (
|
||||
KANBAN_RATE_LIMIT_EXIT_CODE as _RL_CODE,
|
||||
)
|
||||
_exit_code = _RL_CODE
|
||||
except Exception:
|
||||
_exit_code = 1
|
||||
sys.exit(_exit_code)
|
||||
|
||||
# Exit with error code if credentials or agent init fails
|
||||
sys.exit(1)
|
||||
|
|
|
|||
|
|
@ -153,6 +153,17 @@ def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int:
|
|||
DEFAULT_CRASH_GRACE_SECONDS = 30
|
||||
|
||||
|
||||
# Sentinel exit code a kanban worker uses to signal "I bailed because the
|
||||
# provider rate-limited / exhausted quota, not because the task failed."
|
||||
# The dispatcher's reap classifier maps this to a ``rate_limited`` exit kind
|
||||
# so ``detect_crashed_workers`` can release the task back to ``ready``
|
||||
# WITHOUT counting a failure (the circuit breaker must never trip on a
|
||||
# transient throttle). 75 == BSD ``EX_TEMPFAIL`` (sysexits.h) — the
|
||||
# conventional "temporary failure, retry later" code, and well clear of the
|
||||
# 0/1/2 codes the worker uses for success / generic failure / usage error.
|
||||
KANBAN_RATE_LIMIT_EXIT_CODE = 75
|
||||
|
||||
|
||||
def _resolve_crash_grace_seconds() -> int:
|
||||
"""Return the crash-detection grace period in seconds.
|
||||
|
||||
|
|
@ -172,6 +183,28 @@ def _resolve_crash_grace_seconds() -> int:
|
|||
return DEFAULT_CRASH_GRACE_SECONDS
|
||||
|
||||
|
||||
def _resolve_rate_limit_cooldown_seconds() -> int:
|
||||
"""Return the rate-limit requeue cooldown in seconds.
|
||||
|
||||
Reads ``HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS`` from the environment;
|
||||
falls back to ``DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS`` when absent, empty,
|
||||
non-integer, or negative. A value of 0 disables the cooldown (re-spawn on
|
||||
the next tick) — useful for tests that want to assert the task becomes
|
||||
spawnable again immediately.
|
||||
"""
|
||||
raw = os.environ.get(
|
||||
"HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS", ""
|
||||
).strip()
|
||||
if raw:
|
||||
try:
|
||||
parsed = int(raw)
|
||||
except ValueError:
|
||||
parsed = -1
|
||||
if parsed >= 0:
|
||||
return parsed
|
||||
return DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS
|
||||
|
||||
|
||||
# Worker-context caps so build_worker_context() stays bounded on
|
||||
# pathological boards (retry-heavy tasks, comment storms, giant
|
||||
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
|
||||
|
|
@ -4719,6 +4752,15 @@ _RESPAWN_BLOCKER_RE = re.compile(
|
|||
# Within this window a completed run counts as "recent proof"; don't re-spawn.
|
||||
_RESPAWN_GUARD_SUCCESS_WINDOW = 3600 # 1 hour
|
||||
|
||||
# Cooldown after a rate-limited (quota-wall) requeue before the dispatcher
|
||||
# re-spawns the worker. Without this, a task released by the rate-limit path
|
||||
# would be re-spawned on the very next tick and immediately bounce off the
|
||||
# same quota wall, burning a worker slot every tick for hours. The cooldown
|
||||
# spaces retries out so the board keeps cheaply probing whether quota is back
|
||||
# without thrashing. Overridable via ``HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS``
|
||||
# for operators who want a tighter/looser probe cadence.
|
||||
DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 300 # 5 minutes
|
||||
|
||||
# Within this window a GitHub PR URL in a comment blocks re-spawn.
|
||||
_RESPAWN_GUARD_PR_WINDOW = 86400 # 24 hours
|
||||
|
||||
|
|
@ -4776,6 +4818,11 @@ class DispatchResult:
|
|||
Reasons: ``"blocker_auth"`` (quota/auth error — also auto-blocked),
|
||||
``"recent_success"`` (completed run within guard window),
|
||||
``"active_pr"`` (GitHub PR URL in a recent comment)."""
|
||||
rate_limited: list[str] = field(default_factory=list)
|
||||
"""Task ids whose workers bailed on a provider rate-limit / quota wall
|
||||
(EX_TEMPFAIL sentinel exit) and were released back to ``ready`` WITHOUT
|
||||
counting a failure. These never trip the circuit breaker — a long quota
|
||||
window just makes the task bounce cheaply until the window clears."""
|
||||
|
||||
|
||||
# Bounded registry of recently-reaped worker child exits, populated by the
|
||||
|
|
@ -4823,14 +4870,20 @@ def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]":
|
|||
task is still ``running`` in the DB, this is a protocol violation
|
||||
(worker exited without calling ``kanban_complete`` / ``kanban_block``)
|
||||
and should be auto-blocked immediately — retrying will just loop.
|
||||
* ``"rate_limited"`` — ``WIFEXITED`` with status
|
||||
``KANBAN_RATE_LIMIT_EXIT_CODE``. The worker bailed because the
|
||||
provider rate-limited / exhausted quota, NOT because the task failed.
|
||||
``detect_crashed_workers`` releases the task back to ``ready`` without
|
||||
counting a failure, so a long quota window can't trip the breaker.
|
||||
* ``"nonzero_exit"`` — ``WIFEXITED`` with non-zero status. Real error.
|
||||
* ``"signaled"`` — ``WIFSIGNALED`` (OOM killer, SIGKILL, etc). Real crash.
|
||||
* ``"unknown"`` — pid was not in the reap registry (either reaped by
|
||||
something else, or died between reap tick and liveness check). Fall
|
||||
back to existing crashed-counter behavior.
|
||||
|
||||
``code`` is the exit status (for ``clean_exit`` / ``nonzero_exit``) or
|
||||
the signal number (for ``signaled``), or ``None`` for ``unknown``.
|
||||
``code`` is the exit status (for ``clean_exit`` / ``rate_limited`` /
|
||||
``nonzero_exit``) or the signal number (for ``signaled``), or ``None``
|
||||
for ``unknown``.
|
||||
"""
|
||||
entry = _recent_worker_exits.get(int(pid))
|
||||
if entry is None:
|
||||
|
|
@ -4841,6 +4894,8 @@ def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]":
|
|||
code = os.WEXITSTATUS(raw)
|
||||
if code == 0:
|
||||
return ("clean_exit", 0)
|
||||
if code == KANBAN_RATE_LIMIT_EXIT_CODE:
|
||||
return ("rate_limited", code)
|
||||
return ("nonzero_exit", code)
|
||||
if os.WIFSIGNALED(raw):
|
||||
return ("signaled", os.WTERMSIG(raw))
|
||||
|
|
@ -5311,8 +5366,18 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
``kanban_complete`` / ``kanban_block``) and trip the circuit breaker
|
||||
on the first occurrence — retrying a worker whose CLI keeps
|
||||
returning 0 without a terminal transition just loops forever.
|
||||
|
||||
When the reap registry shows the worker exited with the rate-limit
|
||||
sentinel (``KANBAN_RATE_LIMIT_EXIT_CODE``), the worker bailed on a
|
||||
provider quota wall, NOT a task failure. Such tasks are released back
|
||||
to ``ready`` WITHOUT counting a failure (so a long quota window can't
|
||||
trip the breaker) and stamped with a quota-blocker error so
|
||||
``check_respawn_guard`` defers their respawn until the window clears.
|
||||
The ids are returned via the ``_last_rate_limited`` function attribute
|
||||
(the public return stays the crashed-only ``list[str]``).
|
||||
"""
|
||||
crashed: list[str] = []
|
||||
rate_limited: list[str] = []
|
||||
# Per-crash details collected inside the main txn, used after it
|
||||
# closes to run ``_record_task_failure`` (which needs its own
|
||||
# write_txn so can't nest). ``protocol_violation`` flags the
|
||||
|
|
@ -5344,6 +5409,7 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
|
||||
pid = int(row["worker_pid"])
|
||||
kind, code = _classify_worker_exit(pid)
|
||||
rate_limited_exit = False
|
||||
if kind == "clean_exit":
|
||||
# Worker subprocess returned 0 but its task is still
|
||||
# ``running`` in the DB — it exited without calling
|
||||
|
|
@ -5360,6 +5426,26 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
"claimer": row["claim_lock"],
|
||||
"exit_code": code,
|
||||
}
|
||||
elif kind == "rate_limited":
|
||||
# Worker bailed because the provider rate-limited / exhausted
|
||||
# quota (EX_TEMPFAIL sentinel). This is NOT a task failure —
|
||||
# the task is fine, the account just hit a wall. Release it
|
||||
# back to ``ready`` so the respawn guard defers it until the
|
||||
# quota window clears, and crucially do NOT count a failure
|
||||
# (skip ``_record_task_failure``) so a long quota window can't
|
||||
# trip the circuit breaker and permanently block the card.
|
||||
protocol_violation = False
|
||||
rate_limited_exit = True
|
||||
error_text = (
|
||||
f"pid {pid} exited rate-limited (quota wall) — "
|
||||
f"requeued without counting a failure"
|
||||
)
|
||||
event_kind = "rate_limited"
|
||||
event_payload = {
|
||||
"pid": pid,
|
||||
"claimer": row["claim_lock"],
|
||||
"exit_code": code,
|
||||
}
|
||||
else:
|
||||
protocol_violation = False
|
||||
if kind == "nonzero_exit":
|
||||
|
|
@ -5381,9 +5467,13 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
(row["id"],),
|
||||
)
|
||||
if cur.rowcount == 1:
|
||||
# Rate-limited requeues are a clean release, not a crash —
|
||||
# record the run outcome as ``rate_limited`` so the board
|
||||
# history doesn't show a phantom crash for a quota wall.
|
||||
_run_outcome = "rate_limited" if rate_limited_exit else "crashed"
|
||||
run_id = _end_run(
|
||||
conn, row["id"],
|
||||
outcome="crashed", status="crashed",
|
||||
outcome=_run_outcome, status=_run_outcome,
|
||||
error=error_text,
|
||||
metadata=dict(event_payload),
|
||||
)
|
||||
|
|
@ -5392,11 +5482,23 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
event_payload,
|
||||
run_id=run_id,
|
||||
)
|
||||
crashed.append(row["id"])
|
||||
crash_details.append(
|
||||
(row["id"], pid, row["claim_lock"],
|
||||
protocol_violation, error_text)
|
||||
)
|
||||
if rate_limited_exit:
|
||||
# Stamp the failure-error column so ``check_respawn_guard``
|
||||
# recognizes this as a quota blocker and defers the
|
||||
# respawn until the window clears — WITHOUT touching
|
||||
# ``consecutive_failures`` (that's the whole point: no
|
||||
# breaker trip on a throttle).
|
||||
conn.execute(
|
||||
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
|
||||
(error_text[:500], row["id"]),
|
||||
)
|
||||
rate_limited.append(row["id"])
|
||||
else:
|
||||
crashed.append(row["id"])
|
||||
crash_details.append(
|
||||
(row["id"], pid, row["claim_lock"],
|
||||
protocol_violation, error_text)
|
||||
)
|
||||
# Outside the main txn: increment the unified failure counter for
|
||||
# each crashed task. If the breaker trips, the task transitions
|
||||
# ready → blocked with a ``gave_up`` event on top of the ``crashed``
|
||||
|
|
@ -5436,6 +5538,9 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
# and tests that destructure the result; ``dispatch_once`` reads this
|
||||
# side-channel attribute to populate ``DispatchResult.auto_blocked``.
|
||||
detect_crashed_workers._last_auto_blocked = auto_blocked # type: ignore[attr-defined]
|
||||
# Same side-channel for rate-limited requeues — these did NOT count a
|
||||
# failure and are NOT crashes, so they stay out of the ``crashed`` return.
|
||||
detect_crashed_workers._last_rate_limited = rate_limited # type: ignore[attr-defined]
|
||||
return crashed
|
||||
|
||||
|
||||
|
|
@ -5663,6 +5768,18 @@ def check_respawn_guard(conn: sqlite3.Connection, task_id: str) -> Optional[str]
|
|||
|
||||
Checks in priority order:
|
||||
|
||||
``"rate_limit_cooldown"``
|
||||
The task's most recent run ended with the ``rate_limited`` outcome
|
||||
(a worker bailed on a provider quota wall via the EX_TEMPFAIL
|
||||
sentinel) within ``_resolve_rate_limit_cooldown_seconds()``. The
|
||||
quota almost certainly hasn't reset yet, so defer the respawn until
|
||||
the cooldown elapses — then allow a cheap probe. This is checked
|
||||
BEFORE ``blocker_auth`` because the rate-limit requeue stamps a
|
||||
quota-flavored ``last_failure_error`` that would otherwise match the
|
||||
auth-blocker regex and park the task forever (the rate-limit path
|
||||
never increments ``consecutive_failures``, so the breaker can't free
|
||||
it). Once the cooldown elapses the task falls through and respawns.
|
||||
|
||||
``"blocker_auth"``
|
||||
The task's last failure error matches a quota / authentication
|
||||
pattern. Retrying immediately is unlikely to help (rate limits
|
||||
|
|
@ -5695,14 +5812,50 @@ def check_respawn_guard(conn: sqlite3.Connection, task_id: str) -> Optional[str]
|
|||
if row is None:
|
||||
return None
|
||||
|
||||
# 1. Quota / auth blocker: retrying immediately will not help.
|
||||
now = int(time.time())
|
||||
|
||||
# 1. Rate-limit cooldown. The most recent run ended ``rate_limited``
|
||||
# (quota wall) — defer while inside the cooldown window, then allow a
|
||||
# cheap probe. Must run BEFORE the blocker_auth regex check, because a
|
||||
# rate-limit requeue stamps a quota-flavored last_failure_error that
|
||||
# the regex would otherwise match → defer forever (no failure counter
|
||||
# increment on this path means the breaker can never free it).
|
||||
#
|
||||
# We look at the LATEST run only (ORDER BY ended_at DESC LIMIT 1): if a
|
||||
# newer crash/completion superseded the rate-limit run, this guard
|
||||
# no longer applies and the normal paths take over.
|
||||
rl_cooldown = _resolve_rate_limit_cooldown_seconds()
|
||||
latest_run = conn.execute(
|
||||
"SELECT outcome, ended_at FROM task_runs "
|
||||
"WHERE task_id = ? AND ended_at IS NOT NULL "
|
||||
"ORDER BY ended_at DESC LIMIT 1",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
if (
|
||||
latest_run is not None
|
||||
and latest_run["outcome"] == "rate_limited"
|
||||
):
|
||||
if rl_cooldown <= 0:
|
||||
# Cooldown disabled — respawn immediately, and skip the
|
||||
# blocker_auth regex so the stamped rate-limit text doesn't
|
||||
# re-trap the task.
|
||||
return None
|
||||
ended_at = latest_run["ended_at"]
|
||||
if ended_at is not None and (now - int(ended_at)) < rl_cooldown:
|
||||
return "rate_limit_cooldown"
|
||||
# Cooldown elapsed — allow the respawn. Return early so the
|
||||
# blocker_auth check below doesn't catch the rate-limit text we
|
||||
# stamped on the task; this path intentionally retries forever
|
||||
# (cheaply, spaced by the cooldown) until quota returns or a real
|
||||
# crash/completion supersedes it.
|
||||
return None
|
||||
|
||||
# 2. Quota / auth blocker: retrying immediately will not help.
|
||||
err = row["last_failure_error"]
|
||||
if err and _RESPAWN_BLOCKER_RE.search(err):
|
||||
return "blocker_auth"
|
||||
|
||||
now = int(time.time())
|
||||
|
||||
# 2. Completed run within guard window — proof of recent success.
|
||||
# 3. Completed run within guard window — proof of recent success.
|
||||
cutoff = now - _RESPAWN_GUARD_SUCCESS_WINDOW
|
||||
if conn.execute(
|
||||
"SELECT id FROM task_runs "
|
||||
|
|
@ -5711,7 +5864,7 @@ def check_respawn_guard(conn: sqlite3.Connection, task_id: str) -> Optional[str]
|
|||
).fetchone():
|
||||
return "recent_success"
|
||||
|
||||
# 3. GitHub PR URL in a recent comment — prior worker already opened a PR.
|
||||
# 4. GitHub PR URL in a recent comment — prior worker already opened a PR.
|
||||
pr_cutoff = now - _RESPAWN_GUARD_PR_WINDOW
|
||||
for c in conn.execute(
|
||||
"SELECT body FROM task_comments WHERE task_id = ? AND created_at >= ?",
|
||||
|
|
@ -5840,6 +5993,14 @@ def dispatch_once(
|
|||
)
|
||||
if _crash_auto_blocked:
|
||||
result.auto_blocked.extend(_crash_auto_blocked)
|
||||
# Rate-limited requeues (quota wall, no failure counted) — surface for
|
||||
# telemetry / tests. These tasks went back to ``ready`` and the respawn
|
||||
# guard will defer them until the quota window clears.
|
||||
_crash_rate_limited = getattr(
|
||||
detect_crashed_workers, "_last_rate_limited", []
|
||||
)
|
||||
if _crash_rate_limited:
|
||||
result.rate_limited.extend(_crash_rate_limited)
|
||||
result.timed_out = enforce_max_runtime(conn)
|
||||
result.promoted = recompute_ready(conn, failure_limit=failure_limit)
|
||||
|
||||
|
|
|
|||
|
|
@ -679,6 +679,207 @@ def test_resolve_crash_grace_seconds_handles_bad_env(monkeypatch):
|
|||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rate-limit requeue: a worker that bails on a provider quota wall must be
|
||||
# released back to ``ready`` WITHOUT counting a failure, so a long (e.g.
|
||||
# 5-hour) quota window can't trip the circuit breaker and permanently block
|
||||
# the card. The respawn guard then defers it on a cooldown until quota
|
||||
# returns. Regression coverage for the kanban-rate-limit-failure report.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _exited_status(code: int) -> int:
|
||||
"""Raw wait-status for a WIFEXITED child with the given exit code."""
|
||||
return code << 8
|
||||
|
||||
|
||||
def test_classify_worker_exit_recognizes_rate_limit_sentinel(kanban_home):
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
pid = 31337
|
||||
_kb._record_worker_exit(pid, _exited_status(_kb.KANBAN_RATE_LIMIT_EXIT_CODE))
|
||||
kind, code = _kb._classify_worker_exit(pid)
|
||||
assert kind == "rate_limited"
|
||||
assert code == _kb.KANBAN_RATE_LIMIT_EXIT_CODE
|
||||
|
||||
# Plain non-zero exit is still a normal crash, not rate-limited.
|
||||
_kb._record_worker_exit(pid + 1, _exited_status(1))
|
||||
assert _kb._classify_worker_exit(pid + 1) == ("nonzero_exit", 1)
|
||||
|
||||
|
||||
def test_rate_limit_exit_requeues_without_counting_failure(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""A rate-limit sentinel exit releases the task to ``ready`` and leaves
|
||||
``consecutive_failures`` untouched — the breaker must never trip on a
|
||||
transient throttle, even across many quota-wall hits."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "0")
|
||||
|
||||
with kb.connect() as conn:
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
tid = kb.create_task(conn, title="rl", assignee="a")
|
||||
|
||||
# Simulate FAR more quota-wall hits than DEFAULT_FAILURE_LIMIT (2).
|
||||
# If any of these counted as a failure the task would be blocked.
|
||||
for i in range(6):
|
||||
pid = 70000 + i
|
||||
# Claim to open a real run (so detect_crashed_workers can close
|
||||
# it with a rate_limited outcome), then point the claim at this
|
||||
# host + a dead pid so the crash path acts on it.
|
||||
kb.claim_task(conn, tid, claimer=f"{host}:w{i}")
|
||||
conn.execute(
|
||||
"UPDATE tasks SET worker_pid=?, consecutive_failures=? "
|
||||
"WHERE id=?",
|
||||
(pid, 0, tid),
|
||||
)
|
||||
conn.commit()
|
||||
_kb._record_worker_exit(
|
||||
pid, _exited_status(_kb.KANBAN_RATE_LIMIT_EXIT_CODE)
|
||||
)
|
||||
|
||||
crashed = kb.detect_crashed_workers(conn)
|
||||
# Rate-limited requeues are NOT crashes.
|
||||
assert tid not in crashed
|
||||
rl = getattr(_kb.detect_crashed_workers, "_last_rate_limited", [])
|
||||
assert tid in rl
|
||||
|
||||
task = kb.get_task(conn, tid)
|
||||
assert task.status == "ready", (
|
||||
f"hit {i}: should requeue ready, got {task.status}"
|
||||
)
|
||||
assert task.consecutive_failures == 0, (
|
||||
f"hit {i}: rate-limit must not count a failure, "
|
||||
f"got {task.consecutive_failures}"
|
||||
)
|
||||
|
||||
# Last failure error stamped so the respawn guard recognizes the
|
||||
# quota wall.
|
||||
assert task.last_failure_error and "rate-limited" in task.last_failure_error
|
||||
|
||||
# A ``rate_limited`` run outcome was recorded (not ``crashed``).
|
||||
outcomes = [
|
||||
r["outcome"] for r in conn.execute(
|
||||
"SELECT outcome FROM task_runs WHERE task_id=?", (tid,),
|
||||
).fetchall()
|
||||
]
|
||||
assert "rate_limited" in outcomes
|
||||
assert "crashed" not in outcomes
|
||||
|
||||
|
||||
def test_real_crash_still_counts_and_trips_breaker(kanban_home, monkeypatch):
|
||||
"""Sanity: a genuine non-zero crash (not the sentinel) still increments
|
||||
the failure counter and trips the breaker — the rate-limit carve-out is
|
||||
surgical, not a blanket "never count crashes"."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||
|
||||
with kb.connect() as conn:
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
tid = kb.create_task(conn, title="crash", assignee="a")
|
||||
|
||||
for i in range(2): # DEFAULT_FAILURE_LIMIT == 2
|
||||
pid = 60000 + i
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='running', worker_pid=?, "
|
||||
"claim_lock=? WHERE id=?",
|
||||
(pid, f"{host}:w{i}", tid),
|
||||
)
|
||||
conn.commit()
|
||||
_kb._record_worker_exit(pid, _exited_status(1)) # generic failure
|
||||
kb.detect_crashed_workers(conn)
|
||||
|
||||
task = kb.get_task(conn, tid)
|
||||
assert task.status == "blocked", (
|
||||
f"genuine crashes should still trip the breaker, got {task.status}"
|
||||
)
|
||||
|
||||
|
||||
def test_respawn_guard_defers_rate_limited_within_cooldown(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""Within the cooldown after a rate-limit requeue, the guard defers the
|
||||
respawn; after the cooldown it allows a probe — and crucially does NOT
|
||||
fall into ``blocker_auth`` (which would defer forever)."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setenv("HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS", "300")
|
||||
now = 5_000_000
|
||||
|
||||
with kb.connect() as conn:
|
||||
tid = kb.create_task(conn, title="rl-guard", assignee="a")
|
||||
# Seed a rate_limited run that just ended + the stamped error.
|
||||
kb.claim_task(conn, tid)
|
||||
run_id = kb.get_task(conn, tid).current_run_id
|
||||
conn.execute(
|
||||
"UPDATE task_runs SET outcome='rate_limited', status='rate_limited', "
|
||||
"ended_at=? WHERE id=?",
|
||||
(now, run_id),
|
||||
)
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='ready', current_run_id=NULL, "
|
||||
"claim_lock=NULL, claim_expires=NULL, worker_pid=NULL, "
|
||||
"last_failure_error=? WHERE id=?",
|
||||
("pid 1 exited rate-limited (quota wall) — requeued", tid),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Inside cooldown → defer with the rate-limit-specific reason.
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now + 100)
|
||||
assert kb.check_respawn_guard(conn, tid) == "rate_limit_cooldown"
|
||||
|
||||
# Past cooldown → allowed (None), NOT trapped by blocker_auth even
|
||||
# though last_failure_error contains "rate-limited".
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now + 400)
|
||||
assert kb.check_respawn_guard(conn, tid) is None
|
||||
|
||||
|
||||
def test_respawn_guard_rate_limit_cooldown_zero_allows_immediately(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""Cooldown of 0 disables the wait — task is spawnable on the next tick,
|
||||
and the stamped rate-limit text does not re-trap it via blocker_auth."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setenv("HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS", "0")
|
||||
now = 6_000_000
|
||||
|
||||
with kb.connect() as conn:
|
||||
tid = kb.create_task(conn, title="rl-zero", assignee="a")
|
||||
kb.claim_task(conn, tid)
|
||||
run_id = kb.get_task(conn, tid).current_run_id
|
||||
conn.execute(
|
||||
"UPDATE task_runs SET outcome='rate_limited', status='rate_limited', "
|
||||
"ended_at=? WHERE id=?",
|
||||
(now, run_id),
|
||||
)
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='ready', current_run_id=NULL, "
|
||||
"claim_lock=NULL, last_failure_error=? WHERE id=?",
|
||||
("pid 1 exited rate-limited (quota wall)", tid),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now + 1)
|
||||
assert kb.check_respawn_guard(conn, tid) is None
|
||||
|
||||
|
||||
def test_resolve_rate_limit_cooldown_handles_bad_env(monkeypatch):
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
for bad_val in ("notanumber", "-5", ""):
|
||||
monkeypatch.setenv(
|
||||
"HERMES_KANBAN_RATE_LIMIT_COOLDOWN_SECONDS", bad_val
|
||||
)
|
||||
assert (
|
||||
_kb._resolve_rate_limit_cooldown_seconds()
|
||||
== _kb.DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS
|
||||
)
|
||||
|
||||
|
||||
def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch):
|
||||
"""A retry should get a fresh max-runtime window.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue