mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-30 06:41:51 +00:00
fix(gateway): quiet corrupt kanban dispatcher boards
This commit is contained in:
parent
7fee1f61eb
commit
ea5b4ec2a0
3 changed files with 142 additions and 4 deletions
|
|
@ -37,6 +37,7 @@ import signal
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import sqlite3
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from contextvars import copy_context
|
from contextvars import copy_context
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -4678,6 +4679,28 @@ class GatewayRunner:
|
||||||
HEALTH_WINDOW = 6
|
HEALTH_WINDOW = 6
|
||||||
bad_ticks = 0
|
bad_ticks = 0
|
||||||
last_warn_at = 0
|
last_warn_at = 0
|
||||||
|
disabled_corrupt_boards: dict[str, tuple[str, int | None, int | None]] = {}
|
||||||
|
|
||||||
|
def _board_db_fingerprint(slug: str) -> tuple[str, int | None, int | None]:
|
||||||
|
path = _kb.kanban_db_path(slug)
|
||||||
|
try:
|
||||||
|
resolved = str(path.expanduser().resolve())
|
||||||
|
except Exception:
|
||||||
|
resolved = str(path)
|
||||||
|
try:
|
||||||
|
stat = path.stat()
|
||||||
|
except OSError:
|
||||||
|
return (resolved, None, None)
|
||||||
|
return (resolved, stat.st_mtime_ns, stat.st_size)
|
||||||
|
|
||||||
|
def _is_corrupt_board_db_error(exc: Exception) -> bool:
|
||||||
|
if not isinstance(exc, sqlite3.DatabaseError):
|
||||||
|
return False
|
||||||
|
msg = str(exc).lower()
|
||||||
|
return (
|
||||||
|
"file is not a database" in msg
|
||||||
|
or "database disk image is malformed" in msg
|
||||||
|
)
|
||||||
|
|
||||||
def _tick_once_for_board(slug: str) -> "Optional[object]":
|
def _tick_once_for_board(slug: str) -> "Optional[object]":
|
||||||
"""Run one dispatch_once for a specific board.
|
"""Run one dispatch_once for a specific board.
|
||||||
|
|
@ -4689,6 +4712,16 @@ class GatewayRunner:
|
||||||
connection handle or accidentally claim across each other.
|
connection handle or accidentally claim across each other.
|
||||||
"""
|
"""
|
||||||
conn = None
|
conn = None
|
||||||
|
fingerprint = _board_db_fingerprint(slug)
|
||||||
|
disabled_fingerprint = disabled_corrupt_boards.get(slug)
|
||||||
|
if disabled_fingerprint == fingerprint:
|
||||||
|
return None
|
||||||
|
if disabled_fingerprint is not None:
|
||||||
|
logger.info(
|
||||||
|
"kanban dispatcher: board %s database changed; retrying dispatch",
|
||||||
|
slug,
|
||||||
|
)
|
||||||
|
disabled_corrupt_boards.pop(slug, None)
|
||||||
try:
|
try:
|
||||||
conn = _kb.connect(board=slug)
|
conn = _kb.connect(board=slug)
|
||||||
# `connect()` runs the schema + idempotent migration on
|
# `connect()` runs the schema + idempotent migration on
|
||||||
|
|
@ -4703,6 +4736,21 @@ class GatewayRunner:
|
||||||
max_spawn=max_spawn,
|
max_spawn=max_spawn,
|
||||||
failure_limit=failure_limit,
|
failure_limit=failure_limit,
|
||||||
)
|
)
|
||||||
|
except sqlite3.DatabaseError as exc:
|
||||||
|
if _is_corrupt_board_db_error(exc):
|
||||||
|
disabled_corrupt_boards[slug] = fingerprint
|
||||||
|
logger.error(
|
||||||
|
"kanban dispatcher: board %s database %s is not a valid "
|
||||||
|
"SQLite database; disabling dispatch for this board "
|
||||||
|
"until the file changes or the gateway restarts. Move "
|
||||||
|
"or restore the file, then run `hermes kanban init` if "
|
||||||
|
"you need a fresh board.",
|
||||||
|
slug,
|
||||||
|
fingerprint[0],
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
logger.exception("kanban dispatcher: tick failed on board %s", slug)
|
||||||
|
return None
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("kanban dispatcher: tick failed on board %s", slug)
|
logger.exception("kanban dispatcher: tick failed on board %s", slug)
|
||||||
return None
|
return None
|
||||||
|
|
|
||||||
|
|
@ -3414,6 +3414,82 @@ def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_gateway_dispatcher_disables_corrupt_board_without_traceback(
|
||||||
|
monkeypatch, tmp_path, caplog
|
||||||
|
):
|
||||||
|
"""Corrupt board DBs log one actionable error and stop retrying per tick."""
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
import hermes_cli.config as _cfg_mod
|
||||||
|
import hermes_cli.kanban_db as _kb
|
||||||
|
|
||||||
|
runner = object.__new__(GatewayRunner)
|
||||||
|
runner._running = True
|
||||||
|
corrupt_db = tmp_path / "kanban.db"
|
||||||
|
corrupt_db.write_text("not sqlite", encoding="utf-8")
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
_cfg_mod,
|
||||||
|
"load_config",
|
||||||
|
lambda: {
|
||||||
|
"kanban": {
|
||||||
|
"dispatch_in_gateway": True,
|
||||||
|
"dispatch_interval_seconds": 1,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
_kb,
|
||||||
|
"list_boards",
|
||||||
|
lambda include_archived=False: [{"slug": _kb.DEFAULT_BOARD}],
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
_kb,
|
||||||
|
"read_board_metadata",
|
||||||
|
lambda slug: {"slug": slug},
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(_kb, "kanban_db_path", lambda board=None: corrupt_db)
|
||||||
|
|
||||||
|
calls = {"connect": 0, "to_thread": 0}
|
||||||
|
|
||||||
|
def _connect(*args, **kwargs):
|
||||||
|
calls["connect"] += 1
|
||||||
|
raise sqlite3.DatabaseError("file is not a database")
|
||||||
|
|
||||||
|
async def _to_thread(fn, *args, **kwargs):
|
||||||
|
calls["to_thread"] += 1
|
||||||
|
result = fn(*args, **kwargs)
|
||||||
|
if calls["to_thread"] >= 4:
|
||||||
|
runner._running = False
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _sleep(_delay):
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(_kb, "connect", _connect)
|
||||||
|
monkeypatch.setattr("gateway.run.asyncio.to_thread", _to_thread)
|
||||||
|
monkeypatch.setattr("gateway.run.asyncio.sleep", _sleep)
|
||||||
|
|
||||||
|
with caplog.at_level(logging.ERROR, logger="gateway.run"):
|
||||||
|
asyncio.run(
|
||||||
|
asyncio.wait_for(
|
||||||
|
runner._kanban_dispatcher_watcher(),
|
||||||
|
timeout=3.0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = [record.getMessage() for record in caplog.records]
|
||||||
|
assert sum("not a valid SQLite database" in msg for msg in messages) == 1
|
||||||
|
assert not any("tick failed on board" in msg for msg in messages)
|
||||||
|
assert not any(record.exc_info for record in caplog.records)
|
||||||
|
# First tick connect + two ready-queue probes. The second dispatch tick
|
||||||
|
# skips connect because the corrupt board fingerprint is disabled.
|
||||||
|
assert calls["connect"] == 3
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Hallucination gate (created_cards verify + prose scan)
|
# Hallucination gate (created_cards verify + prose scan)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -254,8 +254,12 @@ class TestDeveloperRoleSwap:
|
||||||
assert messages[0]["role"] == "system"
|
assert messages[0]["role"] == "system"
|
||||||
|
|
||||||
def test_developer_role_via_nous_portal(self, monkeypatch):
|
def test_developer_role_via_nous_portal(self, monkeypatch):
|
||||||
agent = _make_agent(monkeypatch, "nous", base_url="https://inference-api.nousresearch.com/v1")
|
agent = _make_agent(
|
||||||
agent.model = "gpt-5"
|
monkeypatch,
|
||||||
|
"nous",
|
||||||
|
base_url="https://inference-api.nousresearch.com/v1",
|
||||||
|
model="gpt-5",
|
||||||
|
)
|
||||||
messages = [
|
messages = [
|
||||||
{"role": "system", "content": "You are helpful."},
|
{"role": "system", "content": "You are helpful."},
|
||||||
{"role": "user", "content": "hi"},
|
{"role": "user", "content": "hi"},
|
||||||
|
|
@ -346,14 +350,24 @@ class TestBuildApiKwargsAIGateway:
|
||||||
class TestBuildApiKwargsNousPortal:
|
class TestBuildApiKwargsNousPortal:
|
||||||
def test_includes_nous_product_tags(self, monkeypatch):
|
def test_includes_nous_product_tags(self, monkeypatch):
|
||||||
from agent.portal_tags import nous_portal_tags
|
from agent.portal_tags import nous_portal_tags
|
||||||
agent = _make_agent(monkeypatch, "nous", base_url="https://inference-api.nousresearch.com/v1")
|
agent = _make_agent(
|
||||||
|
monkeypatch,
|
||||||
|
"nous",
|
||||||
|
base_url="https://inference-api.nousresearch.com/v1",
|
||||||
|
model="gpt-5",
|
||||||
|
)
|
||||||
messages = [{"role": "user", "content": "hi"}]
|
messages = [{"role": "user", "content": "hi"}]
|
||||||
kwargs = agent._build_api_kwargs(messages)
|
kwargs = agent._build_api_kwargs(messages)
|
||||||
extra = kwargs.get("extra_body", {})
|
extra = kwargs.get("extra_body", {})
|
||||||
assert extra.get("tags") == nous_portal_tags()
|
assert extra.get("tags") == nous_portal_tags()
|
||||||
|
|
||||||
def test_uses_chat_completions_format(self, monkeypatch):
|
def test_uses_chat_completions_format(self, monkeypatch):
|
||||||
agent = _make_agent(monkeypatch, "nous", base_url="https://inference-api.nousresearch.com/v1")
|
agent = _make_agent(
|
||||||
|
monkeypatch,
|
||||||
|
"nous",
|
||||||
|
base_url="https://inference-api.nousresearch.com/v1",
|
||||||
|
model="gpt-5",
|
||||||
|
)
|
||||||
messages = [{"role": "user", "content": "hi"}]
|
messages = [{"role": "user", "content": "hi"}]
|
||||||
kwargs = agent._build_api_kwargs(messages)
|
kwargs = agent._build_api_kwargs(messages)
|
||||||
assert "messages" in kwargs
|
assert "messages" in kwargs
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue