From b7bcae49c6395a5bea662842928e5043bd920693 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Fri, 27 Mar 2026 05:22:57 -0700 Subject: [PATCH] fix: SQLite WAL write-lock contention causing 15-20s TUI freeze (#3385) Multiple hermes processes (gateway + CLI sessions + worktree agents) sharing one state.db caused WAL write-lock convoy effects. SQLite's built-in busy handler uses deterministic sleep intervals (up to 100ms) that synchronize competing writers, creating 15-20 second freezes during agent init. Root cause: timeout=30.0 with 7+ concurrent connections meant: - WAL never checkpointed (294MB, readers always blocked it) - Bloated WAL slowed all reads and writes - Deterministic backoff caused convoy effects under contention Fix: - Replace 30s SQLite timeout with 1s + app-level retry (15 attempts, random 20-150ms jitter between retries to break convoys) - Use BEGIN IMMEDIATE for explicit write-lock acquisition (fail fast) - Set isolation_level=None for manual transaction control - PASSIVE WAL checkpoint on close() and every 50 writes - All 12 write methods converted to _execute_write() helper Before: 15-20s frozen at create_session during agent init After: <1s to API call, WAL stays at ~4MB Tested: 4355 tests pass, 3 concurrent live sessions with simultaneous writes showed zero contention on every py-spy sample. --- hermes_state.py | 294 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 204 insertions(+), 90 deletions(-) diff --git a/hermes_state.py b/hermes_state.py index b39c9c1f7..af74ed6ff 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -15,15 +15,20 @@ Key design decisions: """ import json +import logging import os +import random import re import sqlite3 import threading import time from pathlib import Path from hermes_constants import get_hermes_home -from typing import Dict, Any, List, Optional +from typing import Any, Callable, Dict, List, Optional, TypeVar +logger = logging.getLogger(__name__) + +T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" @@ -116,18 +121,38 @@ class SessionDB: single writer via WAL mode). Each method opens its own cursor. """ + # ── Write-contention tuning ── + # With multiple hermes processes (gateway + CLI sessions + worktree agents) + # all sharing one state.db, WAL write-lock contention causes visible TUI + # freezes. SQLite's built-in busy handler uses a deterministic sleep + # schedule that causes convoy effects under high concurrency. + # + # Instead, we keep the SQLite timeout short (1s) and handle retries at the + # application level with random jitter, which naturally staggers competing + # writers and avoids the convoy. + _WRITE_MAX_RETRIES = 15 + _WRITE_RETRY_MIN_S = 0.020 # 20ms + _WRITE_RETRY_MAX_S = 0.150 # 150ms + # Attempt a PASSIVE WAL checkpoint every N successful writes. + _CHECKPOINT_EVERY_N_WRITES = 50 + def __init__(self, db_path: Path = None): self.db_path = db_path or DEFAULT_DB_PATH self.db_path.parent.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() + self._write_count = 0 self._conn = sqlite3.connect( str(self.db_path), check_same_thread=False, - # 30s gives the WAL writer (CLI or gateway) time to finish a batch - # flush before the concurrent reader/writer gives up. 10s was too - # short when the CLI is doing frequent memory flushes. - timeout=30.0, + # Short timeout — application-level retry with random jitter + # handles contention instead of sitting in SQLite's internal + # busy handler for up to 30s. + timeout=1.0, + # Autocommit mode: Python's default isolation_level="" auto-starts + # transactions on DML, which conflicts with our explicit + # BEGIN IMMEDIATE. None = we manage transactions ourselves. + isolation_level=None, ) self._conn.row_factory = sqlite3.Row self._conn.execute("PRAGMA journal_mode=WAL") @@ -135,6 +160,96 @@ class SessionDB: self._init_schema() + # ── Core write helper ── + + def _execute_write(self, fn: Callable[[sqlite3.Connection], T]) -> T: + """Execute a write transaction with BEGIN IMMEDIATE and jitter retry. + + *fn* receives the connection and should perform INSERT/UPDATE/DELETE + statements. The caller must NOT call ``commit()`` — that's handled + here after *fn* returns. + + BEGIN IMMEDIATE acquires the WAL write lock at transaction start + (not at commit time), so lock contention surfaces immediately. + On ``database is locked``, we release the Python lock, sleep a + random 20-150ms, and retry — breaking the convoy pattern that + SQLite's built-in deterministic backoff creates. + + Returns whatever *fn* returns. + """ + last_err: Optional[Exception] = None + for attempt in range(self._WRITE_MAX_RETRIES): + try: + with self._lock: + self._conn.execute("BEGIN IMMEDIATE") + try: + result = fn(self._conn) + self._conn.commit() + except BaseException: + try: + self._conn.rollback() + except Exception: + pass + raise + # Success — periodic best-effort checkpoint. + self._write_count += 1 + if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0: + self._try_wal_checkpoint() + return result + except sqlite3.OperationalError as exc: + err_msg = str(exc).lower() + if "locked" in err_msg or "busy" in err_msg: + last_err = exc + if attempt < self._WRITE_MAX_RETRIES - 1: + jitter = random.uniform( + self._WRITE_RETRY_MIN_S, + self._WRITE_RETRY_MAX_S, + ) + time.sleep(jitter) + continue + # Non-lock error or retries exhausted — propagate. + raise + # Retries exhausted (shouldn't normally reach here). + raise last_err or sqlite3.OperationalError( + "database is locked after max retries" + ) + + def _try_wal_checkpoint(self) -> None: + """Best-effort PASSIVE WAL checkpoint. Never blocks, never raises. + + Flushes committed WAL frames back into the main DB file for any + frames that no other connection currently needs. Keeps the WAL + from growing unbounded when many processes hold persistent + connections. + """ + try: + with self._lock: + result = self._conn.execute( + "PRAGMA wal_checkpoint(PASSIVE)" + ).fetchone() + if result and result[1] > 0: + logger.debug( + "WAL checkpoint: %d/%d pages checkpointed", + result[2], result[1], + ) + except Exception: + pass # Best effort — never fatal. + + def close(self): + """Close the database connection. + + Attempts a PASSIVE WAL checkpoint first so that exiting processes + help keep the WAL file from growing unbounded. + """ + with self._lock: + if self._conn: + try: + self._conn.execute("PRAGMA wal_checkpoint(PASSIVE)") + except Exception: + pass + self._conn.close() + self._conn = None + def _init_schema(self): """Create tables and FTS if they don't exist, run migrations.""" cursor = self._conn.cursor() @@ -256,8 +371,8 @@ class SessionDB: parent_session_id: str = None, ) -> str: """Create a new session record. Returns the session_id.""" - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( """INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config, system_prompt, parent_session_id, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", @@ -272,35 +387,35 @@ class SessionDB: time.time(), ), ) - self._conn.commit() + self._execute_write(_do) return session_id def end_session(self, session_id: str, end_reason: str) -> None: """Mark a session as ended.""" - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( "UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?", (time.time(), end_reason, session_id), ) - self._conn.commit() + self._execute_write(_do) def reopen_session(self, session_id: str) -> None: """Clear ended_at/end_reason so a session can be resumed.""" - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( "UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?", (session_id,), ) - self._conn.commit() + self._execute_write(_do) def update_system_prompt(self, session_id: str, system_prompt: str) -> None: """Store the full assembled system prompt snapshot.""" - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( "UPDATE sessions SET system_prompt = ? WHERE id = ?", (system_prompt, session_id), ) - self._conn.commit() + self._execute_write(_do) def update_token_counts( self, @@ -370,29 +485,27 @@ class SessionDB: billing_mode = COALESCE(billing_mode, ?), model = COALESCE(model, ?) WHERE id = ?""" - with self._lock: - self._conn.execute( - sql, - ( - input_tokens, - output_tokens, - cache_read_tokens, - cache_write_tokens, - reasoning_tokens, - estimated_cost_usd, - actual_cost_usd, - actual_cost_usd, - cost_status, - cost_source, - pricing_version, - billing_provider, - billing_base_url, - billing_mode, - model, - session_id, - ), - ) - self._conn.commit() + params = ( + input_tokens, + output_tokens, + cache_read_tokens, + cache_write_tokens, + reasoning_tokens, + estimated_cost_usd, + actual_cost_usd, + actual_cost_usd, + cost_status, + cost_source, + pricing_version, + billing_provider, + billing_base_url, + billing_mode, + model, + session_id, + ) + def _do(conn): + conn.execute(sql, params) + self._execute_write(_do) def ensure_session( self, @@ -406,14 +519,14 @@ class SessionDB: create_session() call (e.g. transient SQLite lock at agent startup). INSERT OR IGNORE is safe to call even when the row already exists. """ - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( """INSERT OR IGNORE INTO sessions (id, source, model, started_at) VALUES (?, ?, ?, ?)""", (session_id, source, model, time.time()), ) - self._conn.commit() + self._execute_write(_do) def set_token_counts( self, @@ -439,8 +552,8 @@ class SessionDB: conversation run (e.g. the gateway, where the cached agent's session_prompt_tokens already reflects the running total). """ - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( """UPDATE sessions SET input_tokens = ?, output_tokens = ?, @@ -479,7 +592,7 @@ class SessionDB: session_id, ), ) - self._conn.commit() + self._execute_write(_do) def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Get a session by ID.""" @@ -573,10 +686,10 @@ class SessionDB: Empty/whitespace-only strings are normalized to None (clearing the title). """ title = self.sanitize_title(title) - with self._lock: + def _do(conn): if title: # Check uniqueness (allow the same session to keep its own title) - cursor = self._conn.execute( + cursor = conn.execute( "SELECT id FROM sessions WHERE title = ? AND id != ?", (title, session_id), ) @@ -585,12 +698,12 @@ class SessionDB: raise ValueError( f"Title '{title}' is already in use by session {conflict['id']}" ) - cursor = self._conn.execute( + cursor = conn.execute( "UPDATE sessions SET title = ? WHERE id = ?", (title, session_id), ) - self._conn.commit() - rowcount = cursor.rowcount + return cursor.rowcount + rowcount = self._execute_write(_do) return rowcount > 0 def get_session_title(self, session_id: str) -> Optional[str]: @@ -762,17 +875,24 @@ class SessionDB: Also increments the session's message_count (and tool_call_count if role is 'tool' or tool_calls is present). """ - with self._lock: - # Serialize structured fields to JSON for storage - reasoning_details_json = ( - json.dumps(reasoning_details) - if reasoning_details else None - ) - codex_items_json = ( - json.dumps(codex_reasoning_items) - if codex_reasoning_items else None - ) - cursor = self._conn.execute( + # Serialize structured fields to JSON before entering the write txn + reasoning_details_json = ( + json.dumps(reasoning_details) + if reasoning_details else None + ) + codex_items_json = ( + json.dumps(codex_reasoning_items) + if codex_reasoning_items else None + ) + tool_calls_json = json.dumps(tool_calls) if tool_calls else None + + # Pre-compute tool call count + num_tool_calls = 0 + if tool_calls is not None: + num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1 + + def _do(conn): + cursor = conn.execute( """INSERT INTO messages (session_id, role, content, tool_call_id, tool_calls, tool_name, timestamp, token_count, finish_reason, reasoning, reasoning_details, codex_reasoning_items) @@ -782,7 +902,7 @@ class SessionDB: role, content, tool_call_id, - json.dumps(tool_calls) if tool_calls else None, + tool_calls_json, tool_name, time.time(), token_count, @@ -795,25 +915,20 @@ class SessionDB: msg_id = cursor.lastrowid # Update counters - # Count actual tool calls from the tool_calls list (not from tool responses). - # A single assistant message can contain multiple parallel tool calls. - num_tool_calls = 0 - if tool_calls is not None: - num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1 if num_tool_calls > 0: - self._conn.execute( + conn.execute( """UPDATE sessions SET message_count = message_count + 1, tool_call_count = tool_call_count + ? WHERE id = ?""", (num_tool_calls, session_id), ) else: - self._conn.execute( + conn.execute( "UPDATE sessions SET message_count = message_count + 1 WHERE id = ?", (session_id,), ) + return msg_id - self._conn.commit() - return msg_id + return self._execute_write(_do) def get_messages(self, session_id: str) -> List[Dict[str, Any]]: """Load all messages for a session, ordered by timestamp.""" @@ -1107,54 +1222,53 @@ class SessionDB: def clear_messages(self, session_id: str) -> None: """Delete all messages for a session and reset its counters.""" - with self._lock: - self._conn.execute( + def _do(conn): + conn.execute( "DELETE FROM messages WHERE session_id = ?", (session_id,) ) - self._conn.execute( + conn.execute( "UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?", (session_id,), ) - self._conn.commit() + self._execute_write(_do) def delete_session(self, session_id: str) -> bool: """Delete a session and all its messages. Returns True if found.""" - with self._lock: - cursor = self._conn.execute( + def _do(conn): + cursor = conn.execute( "SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,) ) if cursor.fetchone()[0] == 0: return False - self._conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,)) - self._conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,)) - self._conn.commit() + conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,)) + conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,)) return True + return self._execute_write(_do) def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int: """ Delete sessions older than N days. Returns count of deleted sessions. Only prunes ended sessions (not active ones). """ - import time as _time - cutoff = _time.time() - (older_than_days * 86400) + cutoff = time.time() - (older_than_days * 86400) - with self._lock: + def _do(conn): if source: - cursor = self._conn.execute( + cursor = conn.execute( """SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""", (cutoff, source), ) else: - cursor = self._conn.execute( + cursor = conn.execute( "SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL", (cutoff,), ) session_ids = [row["id"] for row in cursor.fetchall()] for sid in session_ids: - self._conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,)) - self._conn.execute("DELETE FROM sessions WHERE id = ?", (sid,)) + conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,)) + conn.execute("DELETE FROM sessions WHERE id = ?", (sid,)) + return len(session_ids) - self._conn.commit() - return len(session_ids) + return self._execute_write(_do)