From 5fa3e24b762076a6e6aabf471248da8d890c6225 Mon Sep 17 00:00:00 2001 From: aydnOktay Date: Tue, 3 Mar 2026 02:44:01 +0300 Subject: [PATCH] Make process_registry checkpoint writes atomic --- tools/process_registry.py | 49 +++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/tools/process_registry.py b/tools/process_registry.py index 58bc788a3..bd8838913 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -34,6 +34,7 @@ import logging import os import signal import subprocess +import tempfile import threading import time import uuid @@ -650,7 +651,7 @@ class ProcessRegistry: # ----- Checkpoint (crash recovery) ----- def _write_checkpoint(self): - """Write running process metadata to checkpoint file.""" + """Write running process metadata to checkpoint file atomically.""" try: with self._lock: entries = [] @@ -665,12 +666,28 @@ class ProcessRegistry: "task_id": s.task_id, "session_key": s.session_key, }) + + # Atomic write: temp file + os.replace to avoid corruption on crash CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True) - CHECKPOINT_PATH.write_text( - json.dumps(entries, indent=2), encoding="utf-8" + fd, tmp_path = tempfile.mkstemp( + dir=str(CHECKPOINT_PATH.parent), + prefix='.checkpoint_', + suffix='.tmp', ) - except Exception: - pass # Best-effort + try: + with os.fdopen(fd, 'w', encoding='utf-8') as f: + json.dump(entries, f, indent=2, ensure_ascii=False) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, CHECKPOINT_PATH) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + except Exception as e: + logger.debug("Failed to write checkpoint file: %s", e, exc_info=True) def recover_from_checkpoint(self) -> int: """ @@ -717,10 +734,28 @@ class ProcessRegistry: logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid) # Clear the checkpoint (will be rewritten as processes finish) + # Use atomic write to avoid corruption try: - CHECKPOINT_PATH.write_text("[]", encoding="utf-8") + CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp( + dir=str(CHECKPOINT_PATH.parent), + prefix='.checkpoint_', + suffix='.tmp', + ) + try: + with os.fdopen(fd, 'w', encoding='utf-8') as f: + f.write("[]") + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, CHECKPOINT_PATH) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise except Exception as e: - logger.debug("Could not write checkpoint file: %s", e) + logger.debug("Could not clear checkpoint file: %s", e, exc_info=True) return recovered