feat(kanban): make worker log retention configurable

This commit is contained in:
qWaitCrypto 2026-05-14 17:46:58 +08:00 committed by Teknium
parent 8831eb5c70
commit 6e60a8a092
3 changed files with 103 additions and 10 deletions

View file

@ -1486,6 +1486,11 @@ DEFAULT_CONFIG = {
# same task/profile (spawn_failed, timed_out, or crashed). Reassignment
# resets the streak for the new profile.
"failure_limit": 2,
# Worker stdout/stderr logs rotate at spawn time. Defaults preserve
# the historical 2 MiB + one-backup behavior; long-running workers can
# raise these to keep more early failure evidence.
"worker_log_rotate_bytes": 2 * 1024 * 1024,
"worker_log_backup_count": 1,
# Profile that decomposes tasks in the Triage column. When unset,
# falls back to the default profile (the one `hermes` launches with
# no -p flag). Set this to a dedicated 'orchestrator' profile if you

View file

@ -3066,6 +3066,7 @@ DEFAULT_SPAWN_FAILURE_LIMIT = DEFAULT_FAILURE_LIMIT
# Max bytes to keep in a single worker log file. The dispatcher truncates
# and rotates on spawn if the file is larger than this at spawn time.
DEFAULT_LOG_ROTATE_BYTES = 2 * 1024 * 1024 # 2 MiB
DEFAULT_LOG_BACKUP_COUNT = 1
# Keep a little wall-clock budget for the worker to observe a terminal timeout
# and call kanban_block/kanban_complete before max_runtime_seconds kills it.
@ -4029,25 +4030,84 @@ def dispatch_once(
return result
def _rotate_worker_log(log_path: Path, max_bytes: int) -> None:
"""Rotate ``<log>`` to ``<log>.1`` if it exceeds ``max_bytes``.
def _positive_int(value: Any, default: int, *, minimum: int = 1) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed >= minimum else default
Single-generation rotation one old file kept, newer one replaces it.
Keeps disk usage bounded while still giving the user a chance to grab
the prior run's output.
def worker_log_rotation_config(kanban_cfg: Optional[dict] = None) -> tuple[int, int]:
"""Return ``(rotate_bytes, backup_count)`` for worker log rotation.
Defaults preserve the historical behavior: rotate at 2 MiB and keep one
backup generation (``.log.1``). Operators with long-running workers can
raise either value from ``config.yaml`` without changing dispatcher code.
"""
if kanban_cfg is None:
try:
from hermes_cli.config import load_config
kanban_cfg = (load_config().get("kanban") or {})
except Exception:
kanban_cfg = {}
max_bytes = _positive_int(
(kanban_cfg or {}).get("worker_log_rotate_bytes"),
DEFAULT_LOG_ROTATE_BYTES,
minimum=1,
)
backup_count = _positive_int(
(kanban_cfg or {}).get("worker_log_backup_count"),
DEFAULT_LOG_BACKUP_COUNT,
minimum=0,
)
return max_bytes, backup_count
def _rotated_log_path(log_path: Path, generation: int) -> Path:
return log_path.with_suffix(log_path.suffix + f".{generation}")
def _rotate_worker_log(
log_path: Path,
max_bytes: int,
backup_count: int = DEFAULT_LOG_BACKUP_COUNT,
) -> None:
"""Rotate ``<log>`` when it exceeds ``max_bytes``.
``backup_count=1`` preserves the legacy single-generation behavior:
``<log>`` moves to ``<log>.1`` and any previous ``.1`` is replaced.
Higher values shift older generations up to ``backup_count``.
"""
try:
if not log_path.exists():
return
if log_path.stat().st_size <= max_bytes:
return
rotated = log_path.with_suffix(log_path.suffix + ".1")
backup_count = _positive_int(
backup_count,
DEFAULT_LOG_BACKUP_COUNT,
minimum=0,
)
if backup_count == 0:
log_path.unlink()
return
oldest = _rotated_log_path(log_path, backup_count)
try:
if rotated.exists():
rotated.unlink()
if oldest.exists():
oldest.unlink()
except OSError:
pass
log_path.rename(rotated)
for generation in range(backup_count - 1, 0, -1):
src = _rotated_log_path(log_path, generation)
if not src.exists():
continue
try:
src.rename(_rotated_log_path(log_path, generation + 1))
except OSError:
pass
log_path.rename(_rotated_log_path(log_path, 1))
except OSError:
pass
@ -4232,7 +4292,8 @@ def _default_spawn(
log_dir = worker_logs_dir(board=board)
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / f"{task.id}.log"
_rotate_worker_log(log_path, DEFAULT_LOG_ROTATE_BYTES)
rotate_bytes, backup_count = worker_log_rotation_config()
_rotate_worker_log(log_path, rotate_bytes, backup_count)
# Use 'a' so a re-run on unblock appends rather than overwrites.
log_f = open(log_path, "ab")

View file

@ -679,6 +679,33 @@ def test_worker_log_rotation_keeps_one_generation(kanban_home, tmp_path):
assert (log_dir / "t_aaaa.log.1").exists()
def test_worker_log_rotation_keeps_configured_generations(kanban_home):
log_dir = kanban_home / "kanban" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
target = log_dir / "t_multi.log"
target.write_text("current")
(log_dir / "t_multi.log.1").write_text("one")
(log_dir / "t_multi.log.2").write_text("two")
kb._rotate_worker_log(target, max_bytes=1, backup_count=3)
assert not target.exists()
assert (log_dir / "t_multi.log.1").read_text() == "current"
assert (log_dir / "t_multi.log.2").read_text() == "one"
assert (log_dir / "t_multi.log.3").read_text() == "two"
def test_worker_log_rotation_config_defaults_and_overrides():
assert kb.worker_log_rotation_config({}) == (
kb.DEFAULT_LOG_ROTATE_BYTES,
kb.DEFAULT_LOG_BACKUP_COUNT,
)
assert kb.worker_log_rotation_config({
"worker_log_rotate_bytes": 10,
"worker_log_backup_count": 4,
}) == (10, 4)
def test_read_worker_log_tail(kanban_home):
log_dir = kanban_home / "kanban" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)