codebase: add encoding='utf-8' to all bare open() calls (PLW1514)

Closes the last Python-on-Windows UTF-8 exposure by making every
text-mode open() call explicit about its encoding.

Before: on Windows, bare open(path, 'r') defaults to the system
locale encoding (cp1252 on US-locale installs).  That means reading
any config/yaml/markdown/json file with non-ASCII content either
crashes with UnicodeDecodeError or silently mis-decodes bytes.

After: all 89 affected call sites in production code now pass
encoding='utf-8' explicitly.  Works identically on every platform
and every locale, no surprise behavior.

Mechanical sweep via:
  ruff check --preview --extend-select PLW1514 --unsafe-fixes --fix     --exclude 'tests,venv,.venv,node_modules,website,optional-skills,               skills,tinker-atropos,plugins' .

All 89 fixes have the same shape: open(x) or open(x, mode) became
open(x, encoding='utf-8') or open(x, mode, encoding='utf-8').  Nothing
else changed.  Every modified file still parses and the Windows/sandbox
test suite is still green (85 passed, 14 skipped, 0 failed across
tests/tools/test_code_execution_windows_env.py +
tests/tools/test_code_execution_modes.py + tests/tools/test_env_passthrough.py +
tests/test_hermes_bootstrap.py).

Scope notes:
  - tests/ excluded: test fixtures can use locale encoding intentionally
    (exercising edge cases).  If we want to tighten tests later that's
    a separate PR.
  - plugins/ excluded: plugin-specific conventions may differ; plugin
    authors own their code.
  - optional-skills/ and skills/ excluded: skill scripts are user-authored
    and we don't want to mass-edit them.
  - website/ and tinker-atropos/ excluded: vendored / generated content.

46 files touched, 89 +/- lines (symmetric replacement).  No behavior
change on POSIX or on Windows when the file is ASCII; bug fix on
Windows when the file contains non-ASCII.
This commit is contained in:
Teknium 2026-05-07 19:24:45 -07:00
parent d94fb47717
commit cbce5e93fc
46 changed files with 89 additions and 89 deletions

View file

@ -1607,7 +1607,7 @@ def _run_llm_review(prompt: str) -> Dict[str, Any]:
# terminal. The background-thread runner also hides it; this
# belt-and-suspenders path matters when a caller invokes
# run_curator_review(synchronous=True) from the CLI.
with open(os.devnull, "w") as _devnull, \
with open(os.devnull, "w", encoding="utf-8") as _devnull, \
contextlib.redirect_stdout(_devnull), \
contextlib.redirect_stderr(_devnull):
conv_result = review_agent.run_conversation(user_message=prompt)

View file

@ -754,7 +754,7 @@ def _load_context_cache() -> Dict[str, int]:
if not path.exists():
return {}
try:
with open(path) as f:
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data.get("context_lengths", {})
except Exception as e:
@ -776,7 +776,7 @@ def save_context_length(model: str, base_url: str, length: int) -> None:
path = _get_context_cache_path()
try:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
yaml.dump({"context_lengths": cache}, f, default_flow_style=False)
logger.info("Cached context length %s -> %s tokens", key, f"{length:,}")
except Exception as e:
@ -800,7 +800,7 @@ def _invalidate_cached_context_length(model: str, base_url: str) -> None:
path = _get_context_cache_path()
try:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
yaml.dump({"context_lengths": cache}, f, default_flow_style=False)
except Exception as e:
logger.debug("Failed to invalidate context length cache entry %s: %s", key, e)

View file

@ -144,7 +144,7 @@ def nous_rate_limit_remaining() -> Optional[float]:
"""
path = _state_path()
try:
with open(path) as f:
with open(path, encoding="utf-8") as f:
state = json.load(f)
reset_at = state.get("reset_at", 0)
remaining = reset_at - time.time()

View file

@ -617,7 +617,7 @@ def _locked_update_approvals() -> Iterator[Dict[str, Any]]:
save_allowlist(data)
return
with open(lock_path, "a+") as lock_fh:
with open(lock_path, "a+", encoding="utf-8") as lock_fh:
fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX)
try:
data = load_allowlist()

8
cli.py
View file

@ -819,7 +819,7 @@ def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
try:
existing = gitignore.read_text() if gitignore.exists() else ""
if _ignore_entry not in existing.splitlines():
with open(gitignore, "a") as f:
with open(gitignore, "a", encoding="utf-8") as f:
if existing and not existing.endswith("\n"):
f.write("\n")
f.write(f"{_ignore_entry}\n")
@ -2147,7 +2147,7 @@ def save_config_value(key_path: str, value: any) -> bool:
# Load existing config
if config_path.exists():
with open(config_path, 'r') as f:
with open(config_path, 'r', encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
else:
config = {}
@ -9843,7 +9843,7 @@ class HermesCLI:
# Debug: log to file (stdout may be devnull from redirect_stdout)
try:
_dbg = _hermes_home / "interrupt_debug.log"
with open(_dbg, "a") as _f:
with open(_dbg, "a", encoding="utf-8") as _f:
_f.write(f"{time.strftime('%H:%M:%S')} interrupt fired: msg={str(interrupt_msg)[:60]!r}, "
f"children={len(self.agent._active_children)}, "
f"parent._interrupt={self.agent._interrupt_requested}\n")
@ -10696,7 +10696,7 @@ class HermesCLI:
# Debug: log to file when message enters interrupt queue
try:
_dbg = _hermes_home / "interrupt_debug.log"
with open(_dbg, "a") as _f:
with open(_dbg, "a", encoding="utf-8") as _f:
_f.write(f"{time.strftime('%H:%M:%S')} ENTER: queued interrupt msg={str(payload)[:60]!r}, "
f"agent_running={self._agent_running}\n")
except Exception:

View file

@ -1268,7 +1268,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
import yaml
_cfg_path = str(_get_hermes_home() / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
with open(_cfg_path, encoding="utf-8") as _f:
_cfg = yaml.safe_load(_f) or {}
_cfg = _expand_env_vars(_cfg)
_model_cfg = _cfg.get("model", {})
@ -1651,7 +1651,7 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
# Cross-platform file locking: fcntl on Unix, msvcrt on Windows
lock_fd = None
try:
lock_fd = open(lock_file, "w")
lock_fd = open(lock_file, "w", encoding="utf-8")
if fcntl:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
elif msvcrt:

View file

@ -365,7 +365,7 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv):
os.makedirs(log_dir, exist_ok=True)
run_ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self._streaming_path = os.path.join(log_dir, f"samples_{run_ts}.jsonl")
self._streaming_file = open(self._streaming_path, "w")
self._streaming_file = open(self._streaming_path, "w", encoding="utf-8")
self._streaming_lock = __import__("threading").Lock()
print(f" Streaming results to: {self._streaming_path}")

View file

@ -422,7 +422,7 @@ class YCBenchEvalEnv(HermesAgentBaseEnv):
os.makedirs(log_dir, exist_ok=True)
run_ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self._streaming_path = os.path.join(log_dir, f"samples_{run_ts}.jsonl")
self._streaming_file = open(self._streaming_path, "w")
self._streaming_file = open(self._streaming_path, "w", encoding="utf-8")
self._streaming_lock = threading.Lock()
print(f"\nYC-Bench eval matrix: {len(self.all_eval_items)} runs")

View file

@ -744,7 +744,7 @@ class TelegramAdapter(BasePlatformAdapter):
return
import yaml as _yaml
with open(config_path, "r") as f:
with open(config_path, "r", encoding="utf-8") as f:
config = _yaml.safe_load(f) or {}
# Navigate to platforms.telegram.extra.dm_topics
@ -3516,7 +3516,7 @@ class TelegramAdapter(BasePlatformAdapter):
return
import yaml as _yaml
with open(config_path, "r") as f:
with open(config_path, "r", encoding="utf-8") as f:
config = _yaml.safe_load(f) or {}
dm_topics = (

View file

@ -526,7 +526,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
# messages are preserved for troubleshooting.
whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat")
self._bridge_log = self._session_path.parent / "bridge.log"
bridge_log_fh = open(self._bridge_log, "a")
bridge_log_fh = open(self._bridge_log, "a", encoding="utf-8")
self._bridge_log_fh = bridge_log_fh
# Build bridge subprocess environment.
@ -1170,7 +1170,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
if file_size > MAX_TEXT_INJECT_BYTES:
print(f"[{self.name}] Skipping text injection for {doc_path} ({file_size} bytes > {MAX_TEXT_INJECT_BYTES})", flush=True)
continue
content = Path(doc_path).read_text(errors="replace")
content = Path(doc_path).read_text(encoding="utf-8", errors="replace")
fname = Path(doc_path).name
# Remove the doc_<hex>_ prefix for display
display_name = fname

View file

@ -113,7 +113,7 @@ def _get_process_start_time(pid: int) -> Optional[int]:
stat_path = Path(f"/proc/{pid}/stat")
try:
# Field 22 in /proc/<pid>/stat is process start time (clock ticks).
return int(stat_path.read_text().split()[21])
return int(stat_path.read_text(encoding="utf-8").split()[21])
except (FileNotFoundError, IndexError, PermissionError, ValueError, OSError):
return None
@ -197,7 +197,7 @@ def _read_json_file(path: Path) -> Optional[dict[str, Any]]:
if not path.exists():
return None
try:
raw = path.read_text().strip()
raw = path.read_text(encoding="utf-8").strip()
except OSError:
return None
if not raw:
@ -523,7 +523,7 @@ def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str,
try:
_proc_status = Path(f"/proc/{existing_pid}/status")
if _proc_status.exists():
for _line in _proc_status.read_text().splitlines():
for _line in _proc_status.read_text(encoding="utf-8").splitlines():
if _line.startswith("State:"):
_state = _line.split()[1]
if _state in ("T", "t"): # stopped or tracing stop

View file

@ -573,7 +573,7 @@ def create_quick_snapshot(
"total_size": sum(manifest.values()),
"files": manifest,
}
with open(snap_dir / "manifest.json", "w") as f:
with open(snap_dir / "manifest.json", "w", encoding="utf-8") as f:
json.dump(meta, f, indent=2)
# Auto-prune
@ -599,7 +599,7 @@ def list_quick_snapshots(
manifest_path = d / "manifest.json"
if manifest_path.exists():
try:
with open(manifest_path) as f:
with open(manifest_path, encoding="utf-8") as f:
results.append(json.load(f))
except (json.JSONDecodeError, OSError):
results.append({"id": d.name, "file_count": 0, "total_size": 0})
@ -629,7 +629,7 @@ def restore_quick_snapshot(
if not manifest_path.exists():
return False
with open(manifest_path) as f:
with open(manifest_path, encoding="utf-8") as f:
meta = json.load(f)
restored = 0

View file

@ -221,7 +221,7 @@ def get_container_exec_info() -> Optional[dict]:
try:
info = {}
with open(container_mode_file, "r") as f:
with open(container_mode_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
@ -306,7 +306,7 @@ def _is_container() -> bool:
return True
# LXC / cgroup-based detection
try:
with open("/proc/1/cgroup", "r") as f:
with open("/proc/1/cgroup", "r", encoding="utf-8") as f:
cgroup_content = f.read()
if "docker" in cgroup_content or "lxc" in cgroup_content or "kubepods" in cgroup_content:
return True
@ -3461,7 +3461,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
if not manifest_file.exists():
continue
try:
with open(manifest_file) as _mf:
with open(manifest_file, encoding="utf-8") as _mf:
manifest = yaml.safe_load(_mf) or {}
except Exception:
manifest = {}

View file

@ -598,7 +598,7 @@ def run_doctor(args):
# Detect stale root-level model keys (known bug source — PR #4329)
try:
import yaml
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
raw_config = yaml.safe_load(f) or {}
stale_root_keys = [k for k in ("provider", "base_url") if k in raw_config and isinstance(raw_config[k], str)]
if stale_root_keys:
@ -1406,7 +1406,7 @@ def run_doctor(args):
import yaml as _yaml
_mem_cfg_path = HERMES_HOME / "config.yaml"
if _mem_cfg_path.exists():
with open(_mem_cfg_path) as _f:
with open(_mem_cfg_path, encoding="utf-8") as _f:
_raw_cfg = _yaml.safe_load(_f) or {}
_active_memory_provider = (_raw_cfg.get("memory") or {}).get("provider", "")
except Exception:

View file

@ -205,7 +205,7 @@ def _cmd_test(args) -> None:
if getattr(args, "payload_file", None):
try:
custom = json.loads(Path(args.payload_file).read_text())
custom = json.loads(Path(args.payload_file).read_text(encoding="utf-8"))
if isinstance(custom, dict):
payload.update(custom)
else:

View file

@ -2835,7 +2835,7 @@ def _pid_alive(pid: Optional[int]) -> bool:
# where we have a cheap, deterministic process-state probe.
if sys.platform == "linux":
try:
with open(f"/proc/{int(pid)}/status", "r") as f:
with open(f"/proc/{int(pid)}/status", "r", encoding="utf-8") as f:
for line in f:
if line.startswith("State:"):
# "State:\tZ (zombie)" → dead

View file

@ -69,7 +69,7 @@ def _install_dependencies(provider_name: str) -> None:
try:
import yaml
with open(yaml_path) as f:
with open(yaml_path, encoding="utf-8") as f:
meta = yaml.safe_load(f) or {}
except Exception:
return
@ -377,7 +377,7 @@ def _write_env_vars(env_path: Path, env_writes: dict) -> None:
if key not in updated_keys:
new_lines.append(f"{key}={val}")
env_path.write_text("\n".join(new_lines) + "\n")
env_path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
# ---------------------------------------------------------------------------

View file

@ -173,7 +173,7 @@ def _read_disk_cache() -> tuple[dict[str, Any] | None, float]:
except (OSError, FileNotFoundError):
return (None, 0.0)
try:
with open(path) as fh:
with open(path, encoding="utf-8") as fh:
data = json.load(fh)
except (OSError, json.JSONDecodeError):
return (None, 0.0)
@ -187,7 +187,7 @@ def _write_disk_cache(data: dict[str, Any]) -> None:
try:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w") as fh:
with open(tmp, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2)
fh.write("\n")
atomic_replace(tmp, path)

View file

@ -174,7 +174,7 @@ def run_oneshot(
# Redirect stderr AND stdout to devnull for the entire call tree.
# We'll print the final response to the real stdout at the end.
real_stdout = sys.stdout
devnull = open(os.devnull, "w")
devnull = open(os.devnull, "w", encoding="utf-8")
try:
with redirect_stdout(devnull), redirect_stderr(devnull):

View file

@ -870,7 +870,7 @@ class PluginManager:
if yaml is None:
logger.warning("PyYAML not installed cannot load %s", manifest_file)
return None
data = yaml.safe_load(manifest_file.read_text()) or {}
data = yaml.safe_load(manifest_file.read_text(encoding="utf-8")) or {}
name = data.get("name", plugin_dir.name)
key = f"{prefix}/{plugin_dir.name}" if prefix else name

View file

@ -127,7 +127,7 @@ def _read_manifest(plugin_dir: Path) -> dict:
try:
import yaml
with open(manifest_file) as f:
with open(manifest_file, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.warning("Failed to read plugin.yaml in %s: %s", plugin_dir, e)
@ -703,7 +703,7 @@ def _discover_all_plugins() -> list:
description = ""
if yaml:
try:
with open(manifest_file) as f:
with open(manifest_file, encoding="utf-8") as f:
manifest = yaml.safe_load(f) or {}
name = manifest.get("name", d.name)
version = manifest.get("version", "")

View file

@ -395,7 +395,7 @@ def _read_config_model(profile_dir: Path) -> tuple:
return None, None
try:
import yaml
with open(config_path, "r") as f:
with open(config_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, str):

View file

@ -1257,7 +1257,7 @@ def do_snapshot_export(output_path: str, console: Optional[Console] = None) -> N
sys.stdout.write(payload)
else:
out = Path(output_path)
out.write_text(payload)
out.write_text(payload, encoding="utf-8")
c.print(f"[bold green]Snapshot exported:[/] {out}")
c.print(f"[dim]{len(installed)} skill(s), {len(tap_list)} tap(s)[/]\n")
@ -1274,7 +1274,7 @@ def do_snapshot_import(input_path: str, force: bool = False,
return
try:
snapshot = json.loads(inp.read_text())
snapshot = json.loads(inp.read_text(encoding="utf-8"))
except json.JSONDecodeError:
c.print(f"[bold red]Error:[/] Invalid JSON in {inp}\n")
return

View file

@ -692,7 +692,7 @@ def _tail_lines(path: Path, n: int) -> List[str]:
if not path.exists():
return []
try:
text = path.read_text(errors="replace")
text = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return []
lines = text.splitlines()

View file

@ -233,7 +233,7 @@ def is_wsl() -> bool:
if _wsl_detected is not None:
return _wsl_detected
try:
with open("/proc/version", "r") as f:
with open("/proc/version", "r", encoding="utf-8") as f:
_wsl_detected = "microsoft" in f.read().lower()
except Exception:
_wsl_detected = False
@ -260,7 +260,7 @@ def is_container() -> bool:
_container_detected = True
return True
try:
with open("/proc/1/cgroup", "r") as f:
with open("/proc/1/cgroup", "r", encoding="utf-8") as f:
cgroup = f.read()
if "docker" in cgroup or "podman" in cgroup or "/lxc/" in cgroup:
_container_detected = True

View file

@ -50,7 +50,7 @@ def _resolve_timezone_name() -> str:
import yaml
config_path = get_config_path()
if config_path.exists():
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
tz_cfg = cfg.get("timezone", "")
if isinstance(tz_cfg, str) and tz_cfg.strip():

View file

@ -82,7 +82,7 @@ def load_hermes_config() -> dict:
if config_path.exists():
try:
with open(config_path, "r") as f:
with open(config_path, "r", encoding='utf-8') as f:
file_config = yaml.safe_load(f) or {}
# Get model from config

View file

@ -3824,7 +3824,7 @@ class AIAgent:
pass
review_agent = None
try:
with open(os.devnull, "w") as _devnull, \
with open(os.devnull, "w", encoding="utf-8") as _devnull, \
contextlib.redirect_stdout(_devnull), \
contextlib.redirect_stderr(_devnull):
# Inherit the parent agent's live runtime (provider, model,

View file

@ -81,7 +81,7 @@ def build_catalog() -> dict:
def main() -> int:
catalog = build_catalog()
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
with open(OUTPUT_PATH, "w") as fh:
with open(OUTPUT_PATH, "w", encoding="utf-8") as fh:
json.dump(catalog, fh, indent=2)
fh.write("\n")

View file

@ -304,7 +304,7 @@ def main():
}
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
with open(OUTPUT_PATH, "w") as f:
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
json.dump(index, f, separators=(",", ":"), ensure_ascii=False)
elapsed = time.time() - overall_start

View file

@ -291,7 +291,7 @@ def check_release_file(release_file, all_contributors):
missing: set of handles NOT found in the file
"""
try:
content = Path(release_file).read_text()
content = Path(release_file).read_text(encoding="utf-8")
except FileNotFoundError:
print(f" [error] Release file not found: {release_file}", file=sys.stderr)
return set(), set(all_contributors)

View file

@ -242,7 +242,7 @@ def check_config(groq_key, eleven_key):
if config_path.exists():
try:
import yaml
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
stt_provider = cfg.get("stt", {}).get("provider", "local")

View file

@ -111,7 +111,7 @@ def summarize(log: Path, since_ts_ms: int) -> dict[str, Any]:
frame_events: list[dict[str, Any]] = []
if not log.exists():
return {"error": f"no log at {log}", "react": [], "frame": []}
for line in log.read_text().splitlines():
for line in log.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
@ -505,7 +505,7 @@ def main() -> int:
if args.save:
path = Path(f"/tmp/perf-{args.save}.json")
path.write_text(json.dumps(metrics, indent=2))
path.write_text(json.dumps(metrics, indent=2), encoding="utf-8")
print(f"\n• saved: {path}")
if args.compare:

View file

@ -1365,7 +1365,7 @@ def main():
)
if args.output:
Path(args.output).write_text(changelog)
Path(args.output).write_text(changelog, encoding="utf-8")
print(f"Changelog written to {args.output}")
else:
print(changelog)

View file

@ -751,7 +751,7 @@ def _run_chrome_fallback_command(
proc.wait()
return {"success": False, "error": f"Chrome fallback '{cmd}' timed out"}
try:
with open(stdout_path, "r") as f:
with open(stdout_path, "r", encoding="utf-8") as f:
stdout = f.read().strip()
if stdout:
return json.loads(stdout.split("\n")[-1])
@ -1110,7 +1110,7 @@ def _write_owner_pid(socket_dir: str, session_name: str) -> None:
"""
try:
path = os.path.join(socket_dir, f"{session_name}.owner_pid")
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
f.write(str(os.getpid()))
except OSError as exc:
logger.debug("Could not write owner_pid file for %s: %s",
@ -1174,7 +1174,7 @@ def _reap_orphaned_browser_sessions():
owner_alive: Optional[bool] = None # None = owner_pid missing/unreadable
if os.path.isfile(owner_pid_file):
try:
owner_pid = int(Path(owner_pid_file).read_text().strip())
owner_pid = int(Path(owner_pid_file).read_text(encoding="utf-8").strip())
try:
os.kill(owner_pid, 0)
owner_alive = True
@ -1209,7 +1209,7 @@ def _reap_orphaned_browser_sessions():
continue
try:
daemon_pid = int(Path(pid_file).read_text().strip())
daemon_pid = int(Path(pid_file).read_text(encoding="utf-8").strip())
except (ValueError, OSError):
shutil.rmtree(socket_dir, ignore_errors=True)
continue
@ -1834,7 +1834,7 @@ def _run_browser_command(
# Detect AppArmor user namespace restrictions (Ubuntu 23.10+)
_userns_restrict = "/proc/sys/kernel/apparmor_restrict_unprivileged_userns"
try:
with open(_userns_restrict) as _f:
with open(_userns_restrict, encoding="utf-8") as _f:
if _f.read().strip() == "1":
_needs_sandbox_bypass = True
logger.debug(
@ -1879,9 +1879,9 @@ def _run_browser_command(
result = {"success": False, "error": f"Command timed out after {timeout} seconds"}
# Fall through to fallback check below
else:
with open(stdout_path, "r") as f:
with open(stdout_path, "r", encoding="utf-8") as f:
stdout = f.read()
with open(stderr_path, "r") as f:
with open(stderr_path, "r", encoding="utf-8") as f:
stderr = f.read()
returncode = proc.returncode
@ -3180,7 +3180,7 @@ def _cleanup_single_browser_session(task_id: str) -> None:
pid_file = os.path.join(socket_dir, f"{session_name}.pid")
if os.path.isfile(pid_file):
try:
daemon_pid = int(Path(pid_file).read_text().strip())
daemon_pid = int(Path(pid_file).read_text(encoding="utf-8").strip())
os.kill(daemon_pid, signal.SIGTERM)
logger.debug("Killed daemon pid %s for %s", daemon_pid, session_name)
except (ProcessLookupError, ValueError, PermissionError, OSError):
@ -3323,7 +3323,7 @@ def _running_in_docker() -> bool:
if os.path.exists("/.dockerenv"):
return True
try:
with open("/proc/1/cgroup", "rt") as fp:
with open("/proc/1/cgroup", "rt", encoding="utf-8") as fp:
return "docker" in fp.read()
except OSError:
return False

View file

@ -517,7 +517,7 @@ def _rpc_server_loop(
# their status prints don't leak into the CLI spinner.
try:
_real_stdout, _real_stderr = sys.stdout, sys.stderr
devnull = open(os.devnull, "w")
devnull = open(os.devnull, "w", encoding="utf-8")
try:
sys.stdout = devnull
sys.stderr = devnull
@ -791,7 +791,7 @@ def _rpc_poll_loop(
# Dispatch through the standard tool handler
try:
_real_stdout, _real_stderr = sys.stdout, sys.stderr
devnull = open(os.devnull, "w")
devnull = open(os.devnull, "w", encoding="utf-8")
try:
sys.stdout = devnull
sys.stderr = devnull

View file

@ -158,7 +158,7 @@ def _load_json_store(path: Path) -> dict:
"""Load a JSON file as a dict, returning ``{}`` on any error."""
if path.exists():
try:
return json.loads(path.read_text())
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
pass
return {}
@ -167,7 +167,7 @@ def _load_json_store(path: Path) -> dict:
def _save_json_store(path: Path, data: dict) -> None:
"""Write *data* as pretty-printed JSON to *path*."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def _file_mtime_key(host_path: str) -> tuple[float, int] | None:

View file

@ -284,7 +284,7 @@ class FileSyncManager:
# Windows: no flock — run without serialization
self._sync_back_impl()
return
lock_fd = open(lock_path, "w")
lock_fd = open(lock_path, "w", encoding="utf-8")
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX)
self._sync_back_impl()

View file

@ -562,7 +562,7 @@ class LocalEnvironment(BaseEnvironment):
``_run_bash`` recovery path will resolve a safe fallback if needed.
"""
try:
with open(self._cwd_file) as f:
with open(self._cwd_file, encoding="utf-8") as f:
cwd_path = f.read().strip()
if cwd_path and os.path.isdir(cwd_path):
self.cwd = cwd_path

View file

@ -1992,7 +1992,7 @@ def _snapshot_child_pids() -> set:
# Linux: read from /proc
try:
children_path = f"/proc/{my_pid}/task/{my_pid}/children"
with open(children_path) as f:
with open(children_path, encoding="utf-8") as f:
return {int(p) for p in f.read().split() if p.strip()}
except (FileNotFoundError, OSError, ValueError):
pass

View file

@ -169,7 +169,7 @@ def _scan_environments() -> List[EnvironmentInfo]:
continue
try:
with open(py_file, "r") as f:
with open(py_file, "r", encoding="utf-8") as f:
tree = ast.parse(f.read())
for node in ast.walk(tree):
@ -333,7 +333,7 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
# File must stay open while the subprocess runs; we store the handle
# on run_state so _stop_training_run() can close it when done.
api_log_file = open(api_log, "w") # closed by _stop_training_run
api_log_file = open(api_log, "w", encoding="utf-8") # closed by _stop_training_run
run_state.api_log_file = api_log_file
run_state.api_process = subprocess.Popen(
["run-api"],
@ -356,7 +356,7 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
# Step 2: Start the Tinker trainer
logger.info("[%s] Starting Tinker trainer: launch_training.py --config %s", run_id, config_path)
trainer_log_file = open(trainer_log, "w") # closed by _stop_training_run
trainer_log_file = open(trainer_log, "w", encoding="utf-8") # closed by _stop_training_run
run_state.trainer_log_file = trainer_log_file
run_state.trainer_process = subprocess.Popen(
[sys.executable, "launch_training.py", "--config", str(config_path)],
@ -397,7 +397,7 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
logger.info("[%s] Starting environment: %s serve", run_id, env_info.file_path)
env_log_file = open(env_log, "w") # closed by _stop_training_run
env_log_file = open(env_log, "w", encoding="utf-8") # closed by _stop_training_run
run_state.env_log_file = env_log_file
run_state.env_process = subprocess.Popen(
[sys.executable, str(env_info.file_path), "serve", "--config", str(config_path)],
@ -777,7 +777,7 @@ async def rl_start_training() -> str:
if "wandb_name" in _current_config and _current_config["wandb_name"]:
run_config["env"]["wandb_name"] = _current_config["wandb_name"]
with open(config_path, "w") as f:
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(run_config, f, default_flow_style=False)
# Create run state
@ -1206,7 +1206,7 @@ async def rl_test_inference(
stderr_text = "\n".join(stderr_lines)
# Write logs to files for inspection outside CLI
with open(log_file, "w") as f:
with open(log_file, "w", encoding="utf-8") as f:
f.write(f"Command: {cmd_display}\n")
f.write(f"Working dir: {TINKER_ATROPOS_ROOT}\n")
f.write(f"Return code: {process.returncode}\n")
@ -1238,7 +1238,7 @@ async def rl_test_inference(
# Parse the output JSONL file
if output_file.exists():
# Read JSONL file (one JSON object per line = one step)
with open(output_file, "r") as f:
with open(output_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:

View file

@ -219,7 +219,7 @@ class GitHubAuth:
key_file = Path(key_path)
if not key_file.exists():
return None
private_key = key_file.read_text()
private_key = key_file.read_text(encoding="utf-8")
now = int(time.time())
payload = {
@ -2667,7 +2667,7 @@ def append_audit_log(action: str, skill_name: str, source: str,
parts.append(extra)
line = " ".join(parts) + "\n"
try:
with open(AUDIT_LOG, "a") as f:
with open(AUDIT_LOG, "a", encoding="utf-8") as f:
f.write(line)
except OSError as e:
logger.debug("Could not write audit log: %s", e)

View file

@ -126,7 +126,7 @@ def _read_failure_reason() -> str | None:
mtime = os.path.getmtime(p)
if (time.time() - mtime) >= _MARKER_TTL:
return None
with open(p, "r") as f:
with open(p, "r", encoding="utf-8") as f:
return f.read().strip()
except OSError:
return None
@ -160,7 +160,7 @@ def _mark_install_failed(reason: str = ""):
try:
p = _failure_marker_path()
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, "w") as f:
with open(p, "w", encoding="utf-8") as f:
f.write(reason)
except OSError:
pass
@ -257,7 +257,7 @@ def _verify_cosign(checksums_path: str, sig_path: str, cert_path: str) -> bool |
def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str) -> bool:
"""Verify SHA-256 of the archive against checksums.txt."""
expected = None
with open(checksums_path) as f:
with open(checksums_path, encoding="utf-8") as f:
for line in f:
# Format: "<hash> <filename>"
parts = line.strip().split(" ", 1)

View file

@ -110,7 +110,7 @@ def detect_audio_environment() -> dict:
# WSL detection — PulseAudio bridge makes audio work in WSL.
# Only block if PULSE_SERVER is not configured.
try:
with open('/proc/version', 'r') as f:
with open('/proc/version', 'r', encoding="utf-8") as f:
if 'microsoft' in f.read().lower():
if os.environ.get('PULSE_SERVER'):
notices.append("Running in WSL with PulseAudio bridge")

View file

@ -125,7 +125,7 @@ class CompressionConfig:
@classmethod
def from_yaml(cls, yaml_path: str) -> "CompressionConfig":
"""Load configuration from YAML file."""
with open(yaml_path, 'r') as f:
with open(yaml_path, 'r', encoding="utf-8") as f:
data = yaml.safe_load(f)
config = cls()
@ -1174,7 +1174,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
# Save metrics
if self.config.metrics_enabled:
metrics_path = output_dir / self.config.metrics_output_file
with open(metrics_path, 'w') as f:
with open(metrics_path, 'w', encoding="utf-8") as f:
json.dump(self.aggregate_metrics.to_dict(), f, indent=2)
console.print(f"\n💾 Metrics saved to {metrics_path}")

View file

@ -660,7 +660,7 @@ def _load_cfg() -> dict:
if _cfg_cache is not None and _cfg_mtime == mtime and _cfg_path == p:
return copy.deepcopy(_cfg_cache)
if p.exists():
with open(p) as f:
with open(p, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
else:
data = {}
@ -679,7 +679,7 @@ def _save_cfg(cfg: dict):
import yaml
path = _hermes_home / "config.yaml"
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(cfg, f)
with _cfg_lock:
_cfg_cache = copy.deepcopy(cfg)
@ -2613,7 +2613,7 @@ def _(rid, params: dict) -> dict:
f"hermes_conversation_{_time.strftime('%Y%m%d_%H%M%S')}.json"
)
try:
with open(filename, "w") as f:
with open(filename, "w", encoding="utf-8") as f:
json.dump(
{
"model": getattr(session["agent"], "model", ""),