diff --git a/gateway/run.py b/gateway/run.py index a80f42650e..7714ca99d8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -239,7 +239,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) # Resolve Hermes home directory (respects HERMES_HOME override) from hermes_constants import get_hermes_home -from utils import atomic_yaml_write, base_url_host_matches, is_truthy_value +from utils import atomic_json_write, atomic_yaml_write, base_url_host_matches, is_truthy_value _hermes_home = get_hermes_home() # Load environment variables from ~/.hermes/.env first. @@ -2245,7 +2245,7 @@ class GatewayRunner: # (they might become active again next restart) try: - path.write_text(json.dumps(new_counts)) + atomic_json_write(path, new_counts, indent=None) except Exception: pass @@ -2313,7 +2313,7 @@ class GatewayRunner: if session_key in counts: del counts[session_key] if counts: - path.write_text(json.dumps(counts)) + atomic_json_write(path, counts, indent=None) else: path.unlink(missing_ok=True) except Exception: @@ -6734,8 +6734,10 @@ class GatewayRunner: } if event.source.thread_id: notify_data["thread_id"] = event.source.thread_id - (_hermes_home / ".restart_notify.json").write_text( - json.dumps(notify_data) + atomic_json_write( + _hermes_home / ".restart_notify.json", + notify_data, + indent=None, ) except Exception as e: logger.debug("Failed to write restart notify file: %s", e) @@ -6752,8 +6754,10 @@ class GatewayRunner: } if event.platform_update_id is not None: dedup_data["update_id"] = event.platform_update_id - (_hermes_home / ".restart_last_processed.json").write_text( - json.dumps(dedup_data) + atomic_json_write( + _hermes_home / ".restart_last_processed.json", + dedup_data, + indent=None, ) except Exception as e: logger.debug("Failed to write restart dedup marker: %s", e) diff --git a/gateway/status.py b/gateway/status.py index 7f7df182f5..f329b25f08 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -21,6 +21,7 @@ from datetime import datetime, timezone from pathlib import Path from hermes_constants import get_hermes_home from typing import Any, Optional +from utils import atomic_json_write if sys.platform == "win32": import msvcrt @@ -34,6 +35,10 @@ _IS_WINDOWS = sys.platform == "win32" _UNSET = object() _GATEWAY_LOCK_FILENAME = "gateway.lock" _gateway_lock_handle = None +# Windows byte-range locks are mandatory for other readers. Lock a byte well +# past the JSON payload so runtime status / PID readers can still read the file +# while another process holds the mutual-exclusion lock. +_WINDOWS_LOCK_OFFSET = 1024 * 1024 def _get_pid_path() -> Path: @@ -205,8 +210,7 @@ def _read_json_file(path: Path) -> Optional[dict[str, Any]]: def _write_json_file(path: Path, payload: dict[str, Any]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(json.dumps(payload)) + atomic_json_write(path, payload, indent=None, separators=(",", ":")) def _read_pid_record(pid_path: Optional[Path] = None) -> Optional[dict]: @@ -286,7 +290,7 @@ def _try_acquire_file_lock(handle) -> bool: if handle.tell() == 0: handle.write("\n") handle.flush() - handle.seek(0) + handle.seek(_WINDOWS_LOCK_OFFSET) msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1) else: fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) @@ -298,7 +302,7 @@ def _try_acquire_file_lock(handle) -> bool: def _release_file_lock(handle) -> None: try: if _IS_WINDOWS: - handle.seek(0) + handle.seek(_WINDOWS_LOCK_OFFSET) msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1) else: fcntl.flock(handle.fileno(), fcntl.LOCK_UN) diff --git a/tests/gateway/test_restart_notification.py b/tests/gateway/test_restart_notification.py index c926596492..8297dfc32f 100644 --- a/tests/gateway/test_restart_notification.py +++ b/tests/gateway/test_restart_notification.py @@ -113,6 +113,36 @@ async def test_restart_command_preserves_thread_id(tmp_path, monkeypatch): assert data["thread_id"] == "topic_7" +@pytest.mark.asyncio +async def test_restart_command_uses_atomic_json_writes_for_marker_files(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + calls = [] + + def _fake_atomic_json_write(path, payload, **kwargs): + calls.append((Path(path).name, payload, kwargs)) + + monkeypatch.setattr(gateway_run, "atomic_json_write", _fake_atomic_json_write) + + runner, _adapter = make_restart_runner() + runner.request_restart = MagicMock(return_value=True) + + source = make_restart_source(chat_id="42") + event = MessageEvent( + text="/restart", + message_type=MessageType.TEXT, + source=source, + message_id="m1", + ) + + await runner._handle_restart_command(event) + + names = [name for name, _payload, _kwargs in calls] + assert names == [".restart_notify.json", ".restart_last_processed.json"] + assert calls[0][1]["chat_id"] == "42" + assert calls[1][1]["platform"] == "telegram" + + # ── _send_restart_notification ─────────────────────────────────────────── diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index b8937cd4df..77c639d05f 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -999,3 +999,65 @@ class TestStuckLoopEscalation: assert store._entries[entry.session_key].resume_pending is False assert not counts_file.exists() + + def test_increment_restart_failure_counts_uses_atomic_json_write( + self, tmp_path, monkeypatch + ): + from gateway.run import GatewayRunner + + source = _make_source() + session_key = _make_store(tmp_path).get_or_create_session(source).session_key + + monkeypatch.setattr("gateway.run._hermes_home", tmp_path) + calls = [] + + def _fake_atomic_json_write(path, payload, **kwargs): + calls.append((path, payload, kwargs)) + + monkeypatch.setattr("gateway.run.atomic_json_write", _fake_atomic_json_write) + + runner = object.__new__(GatewayRunner) + runner._increment_restart_failure_counts({session_key}) + + assert calls == [ + ( + tmp_path / ".restart_failure_counts", + {session_key: 1}, + {"indent": None}, + ) + ] + + def test_clear_restart_failure_count_uses_atomic_json_write_when_entries_remain( + self, tmp_path, monkeypatch + ): + import json + + from gateway.run import GatewayRunner + + source = _make_source() + session_key = _make_store(tmp_path).get_or_create_session(source).session_key + other_key = "agent:main:telegram:dm:other" + counts_file = tmp_path / ".restart_failure_counts" + counts_file.write_text( + json.dumps({session_key: 2, other_key: 1}), + encoding="utf-8", + ) + + monkeypatch.setattr("gateway.run._hermes_home", tmp_path) + calls = [] + + def _fake_atomic_json_write(path, payload, **kwargs): + calls.append((path, payload, kwargs)) + + monkeypatch.setattr("gateway.run.atomic_json_write", _fake_atomic_json_write) + + runner = object.__new__(GatewayRunner) + runner._clear_restart_failure_count(session_key) + + assert calls == [ + ( + tmp_path / ".restart_failure_counts", + {other_key: 1}, + {"indent": None}, + ) + ] diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index e56b2107e5..7138b6514e 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -2,6 +2,7 @@ import json import os +from pathlib import Path from types import SimpleNamespace from gateway import status @@ -245,6 +246,27 @@ class TestGatewayPidState: class TestGatewayRuntimeStatus: + def test_write_json_file_uses_atomic_json_write(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + calls = [] + + def _fake_atomic_json_write(path, payload, **kwargs): + calls.append((Path(path), payload, kwargs)) + + monkeypatch.setattr(status, "atomic_json_write", _fake_atomic_json_write) + + payload = {"gateway_state": "running"} + target = tmp_path / "gateway_state.json" + status._write_json_file(target, payload) + + assert calls == [ + ( + target, + payload, + {"indent": None, "separators": (",", ":")}, + ) + ] + def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch): """Regression: setdefault() preserved stale PID from previous process (#1631).""" monkeypatch.setenv("HERMES_HOME", str(tmp_path)) @@ -349,6 +371,35 @@ class TestTerminatePid: class TestScopedLocks: + def test_windows_file_lock_uses_high_offset(self, tmp_path, monkeypatch): + lock_path = tmp_path / "gateway.lock" + handle = open(lock_path, "a+", encoding="utf-8") + fd = handle.fileno() + calls = [] + + def fake_locking(fd, mode, size): + calls.append((fd, mode, size, handle.tell())) + + monkeypatch.setattr(status, "_IS_WINDOWS", True) + monkeypatch.setattr( + status, + "msvcrt", + SimpleNamespace(LK_NBLCK=1, LK_UNLCK=2, locking=fake_locking), + raising=False, + ) + + try: + assert status._try_acquire_file_lock(handle) is True + status._release_file_lock(handle) + finally: + handle.close() + + assert calls == [ + (fd, 1, 1, status._WINDOWS_LOCK_OFFSET), + (fd, 2, 1, status._WINDOWS_LOCK_OFFSET), + ] + assert lock_path.read_text(encoding="utf-8") == "\n" + def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch): monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks")) lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"