diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index eee068d1209..e1cf73d2583 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -16,6 +16,7 @@ import json import logging import os import secrets +import stat import subprocess import sys import threading @@ -1686,7 +1687,25 @@ def _save_anthropic_oauth_creds(access_token: str, refresh_token: str, expires_a "expiresAt": expires_at_ms, } _HERMES_OAUTH_FILE.parent.mkdir(parents=True, exist_ok=True) - _HERMES_OAUTH_FILE.write_text(json.dumps(payload, indent=2), encoding="utf-8") + tmp_path = _HERMES_OAUTH_FILE.with_name( + f"{_HERMES_OAUTH_FILE.name}.tmp.{os.getpid()}.{secrets.token_hex(8)}" + ) + try: + with tmp_path.open("w", encoding="utf-8") as handle: + handle.write(json.dumps(payload, indent=2)) + handle.flush() + os.fsync(handle.fileno()) + os.replace(tmp_path, _HERMES_OAUTH_FILE) + try: + _HERMES_OAUTH_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR) + except OSError: + pass + finally: + try: + if tmp_path.exists(): + tmp_path.unlink() + except OSError: + pass # Best-effort credential-pool insert. Failure here doesn't invalidate # the file write — pool registration only matters for the rotation # strategy, not for runtime credential resolution. diff --git a/tests/hermes_cli/test_web_server_oauth_write.py b/tests/hermes_cli/test_web_server_oauth_write.py new file mode 100644 index 00000000000..0ef49fb2bc4 --- /dev/null +++ b/tests/hermes_cli/test_web_server_oauth_write.py @@ -0,0 +1,53 @@ +import os + +import pytest + +from hermes_cli.web_server import _save_anthropic_oauth_creds + + +class _DummyPool: + def entries(self): + return [] + + def remove_entry(self, _id): + return None + + def add_entry(self, _entry): + return None + + +@pytest.fixture +def oauth_file(monkeypatch, tmp_path): + target = tmp_path / '.anthropic_oauth.json' + monkeypatch.setattr('agent.anthropic_adapter._HERMES_OAUTH_FILE', target) + monkeypatch.setattr('agent.credential_pool.load_pool', lambda _provider: _DummyPool()) + return target + + +def test_dashboard_oauth_write_uses_owner_only_permissions(oauth_file): + old_umask = os.umask(0o022) + try: + _save_anthropic_oauth_creds('access-token', 'refresh-token', 123456) + finally: + os.umask(old_umask) + + assert oauth_file.exists() + mode = oauth_file.stat().st_mode & 0o777 + assert mode == 0o600 + + +def test_dashboard_oauth_write_uses_atomic_replace_and_cleans_temp_files(oauth_file, monkeypatch): + replace_calls = [] + + def flaky_replace(src, dst): + replace_calls.append((src, dst)) + raise OSError('simulated replace failure') + + monkeypatch.setattr('hermes_cli.web_server.os.replace', flaky_replace) + + with pytest.raises(OSError, match='simulated replace failure'): + _save_anthropic_oauth_creds('access-token', 'refresh-token', 123456) + + assert replace_calls, 'helper should attempt atomic os.replace()' + assert not oauth_file.exists() + assert not list(oauth_file.parent.glob(f'{oauth_file.name}.tmp*'))