fix(gateway): close --replace race completely by claiming PID before adapter startup

Follow-up on top of opriz's atomic PID file fix. The prior change caught
the race AFTER runner.start(), so the loser still opened Telegram polling
and Discord gateway sockets before detecting the conflict and exiting.

Hoist the PID-claim block to BEFORE runner.start(). Now the loser of the
O_CREAT|O_EXCL race returns from start_gateway() without ever bringing up
any platform adapter — no Telegram conflict, no Discord duplicate session.

Also add regression tests:
- test_write_pid_file_is_atomic_against_concurrent_writers: second
  write_pid_file() raises FileExistsError rather than clobbering.
- Two existing replace-path tests updated to stateful mocks since the
  real post-kill state (get_running_pid None after remove_pid_file)
  is now exercised by the hoisted re-check.
This commit is contained in:
Teknium 2026-04-21 00:36:25 -07:00 committed by Teknium
parent 56b99e8239
commit ce9c91c8f7
3 changed files with 64 additions and 27 deletions

View file

@ -10951,6 +10951,30 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
else: else:
logger.info("Skipping signal handlers (not running in main thread).") logger.info("Skipping signal handlers (not running in main thread).")
# Claim the PID file BEFORE bringing up any platform adapters.
# This closes the --replace race window: two concurrent `gateway run
# --replace` invocations both pass the termination-wait above, but
# only the winner of the O_CREAT|O_EXCL race below will ever open
# Telegram polling, Discord gateway sockets, etc. The loser exits
# cleanly before touching any external service.
import atexit
from gateway.status import write_pid_file, remove_pid_file, get_running_pid
_current_pid = get_running_pid()
if _current_pid is not None and _current_pid != os.getpid():
logger.error(
"Another gateway instance (PID %d) started during our startup. "
"Exiting to avoid double-running.", _current_pid
)
return False
try:
write_pid_file()
except FileExistsError:
logger.error(
"PID file race lost to another gateway instance. Exiting."
)
return False
atexit.register(remove_pid_file)
# Start the gateway # Start the gateway
success = await runner.start() success = await runner.start()
if not success: if not success:
@ -10960,29 +10984,6 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
logger.error("Gateway exiting cleanly: %s", runner.exit_reason) logger.error("Gateway exiting cleanly: %s", runner.exit_reason)
return True return True
# Write PID file so CLI can detect gateway is running
import atexit
from gateway.status import write_pid_file, remove_pid_file, get_running_pid
# Defensive re-check: another --replace racer may have started
# while we were initializing. If so, yield and exit.
_current_pid = get_running_pid()
if _current_pid is not None and _current_pid != os.getpid():
logger.error(
"Another gateway instance (PID %d) started during our startup. "
"Exiting to avoid double-running.", _current_pid
)
await runner.stop()
return False
try:
write_pid_file()
except FileExistsError:
logger.error(
"PID file race lost to another gateway instance. Exiting."
)
await runner.stop()
return False
atexit.register(remove_pid_file)
# Start background cron ticker so scheduled jobs fire automatically. # Start background cron ticker so scheduled jobs fire automatically.
# Pass the event loop so cron delivery can use live adapters (E2EE support). # Pass the event loop so cron delivery can use live adapters (E2EE support).
cron_stop = threading.Event() cron_stop = threading.Event()

View file

@ -184,8 +184,15 @@ async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_p
async def stop(self): async def stop(self):
return None return None
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42) # get_running_pid returns 42 before we kill the old gateway, then None
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None) # after remove_pid_file() clears the record (reflects real behavior).
_pid_state = {"alive": True}
def _mock_get_running_pid():
return 42 if _pid_state["alive"] else None
def _mock_remove_pid_file():
_pid_state["alive"] = False
monkeypatch.setattr("gateway.status.get_running_pid", _mock_get_running_pid)
monkeypatch.setattr("gateway.status.remove_pid_file", _mock_remove_pid_file)
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0) monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force))) monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100) monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
@ -253,8 +260,13 @@ async def test_start_gateway_replace_writes_takeover_marker_before_sigterm(
async def stop(self): async def stop(self):
return None return None
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42) _pid_state = {"alive": True}
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None) def _mock_get_running_pid():
return 42 if _pid_state["alive"] else None
def _mock_remove_pid_file():
_pid_state["alive"] = False
monkeypatch.setattr("gateway.status.get_running_pid", _mock_get_running_pid)
monkeypatch.setattr("gateway.status.remove_pid_file", _mock_remove_pid_file)
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0) monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
monkeypatch.setattr("gateway.status.write_takeover_marker", record_write_marker) monkeypatch.setattr("gateway.status.write_takeover_marker", record_write_marker)
monkeypatch.setattr("gateway.status.terminate_pid", record_terminate) monkeypatch.setattr("gateway.status.terminate_pid", record_terminate)

View file

@ -19,6 +19,30 @@ class TestGatewayPidState:
assert isinstance(payload["argv"], list) assert isinstance(payload["argv"], list)
assert payload["argv"] assert payload["argv"]
def test_write_pid_file_is_atomic_against_concurrent_writers(self, tmp_path, monkeypatch):
"""Regression: two concurrent --replace invocations must not both win.
Without O_CREAT|O_EXCL, two processes racing through start_gateway()'s
termination-wait would both write to gateway.pid, silently overwriting
each other and leaving multiple gateway instances alive (#11718).
"""
import pytest
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# First write wins.
status.write_pid_file()
assert (tmp_path / "gateway.pid").exists()
# Second write (simulating a racing --replace that missed the earlier
# guards) must raise FileExistsError rather than clobber the record.
with pytest.raises(FileExistsError):
status.write_pid_file()
# Original record is preserved.
payload = json.loads((tmp_path / "gateway.pid").read_text())
assert payload["pid"] == os.getpid()
def test_get_running_pid_rejects_live_non_gateway_pid(self, tmp_path, monkeypatch): def test_get_running_pid_rejects_live_non_gateway_pid(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path)) monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid" pid_path = tmp_path / "gateway.pid"