mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Merge PR #309: fix(timezone): timezone-aware now() for prompt, cron, and execute_code
Authored by areu01or00. Adds timezone support via hermes_time.now() helper with IANA timezone resolution (HERMES_TIMEZONE env → config.yaml → server-local). Updates system prompt timestamp, cron scheduling, and execute_code sandbox TZ injection. Includes config migration (v4→v5) and comprehensive test coverage.
This commit is contained in:
commit
69a36a3361
9 changed files with 474 additions and 28 deletions
45
cron/jobs.py
45
cron/jobs.py
|
|
@ -14,6 +14,8 @@ from datetime import datetime, timedelta
|
|||
from pathlib import Path
|
||||
from typing import Optional, Dict, List, Any
|
||||
|
||||
from hermes_time import now as _hermes_now
|
||||
|
||||
try:
|
||||
from croniter import croniter
|
||||
HAS_CRONITER = True
|
||||
|
|
@ -128,7 +130,7 @@ def parse_schedule(schedule: str) -> Dict[str, Any]:
|
|||
# Duration like "30m", "2h", "1d" → one-shot from now
|
||||
try:
|
||||
minutes = parse_duration(schedule)
|
||||
run_at = datetime.now() + timedelta(minutes=minutes)
|
||||
run_at = _hermes_now() + timedelta(minutes=minutes)
|
||||
return {
|
||||
"kind": "once",
|
||||
"run_at": run_at.isoformat(),
|
||||
|
|
@ -146,37 +148,50 @@ def parse_schedule(schedule: str) -> Dict[str, Any]:
|
|||
)
|
||||
|
||||
|
||||
def _ensure_aware(dt: datetime) -> datetime:
|
||||
"""Make a naive datetime tz-aware using the configured timezone.
|
||||
|
||||
Handles backward compatibility: timestamps stored before timezone support
|
||||
are naive (server-local). We assume they were in the same timezone as
|
||||
the current configuration so comparisons work without crashing.
|
||||
"""
|
||||
if dt.tzinfo is None:
|
||||
tz = _hermes_now().tzinfo
|
||||
return dt.replace(tzinfo=tz)
|
||||
return dt
|
||||
|
||||
|
||||
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
|
||||
"""
|
||||
Compute the next run time for a schedule.
|
||||
|
||||
|
||||
Returns ISO timestamp string, or None if no more runs.
|
||||
"""
|
||||
now = datetime.now()
|
||||
|
||||
now = _hermes_now()
|
||||
|
||||
if schedule["kind"] == "once":
|
||||
run_at = datetime.fromisoformat(schedule["run_at"])
|
||||
run_at = _ensure_aware(datetime.fromisoformat(schedule["run_at"]))
|
||||
# If in the future, return it; if in the past, no more runs
|
||||
return schedule["run_at"] if run_at > now else None
|
||||
|
||||
|
||||
elif schedule["kind"] == "interval":
|
||||
minutes = schedule["minutes"]
|
||||
if last_run_at:
|
||||
# Next run is last_run + interval
|
||||
last = datetime.fromisoformat(last_run_at)
|
||||
last = _ensure_aware(datetime.fromisoformat(last_run_at))
|
||||
next_run = last + timedelta(minutes=minutes)
|
||||
else:
|
||||
# First run is now + interval
|
||||
next_run = now + timedelta(minutes=minutes)
|
||||
return next_run.isoformat()
|
||||
|
||||
|
||||
elif schedule["kind"] == "cron":
|
||||
if not HAS_CRONITER:
|
||||
return None
|
||||
cron = croniter(schedule["expr"], now)
|
||||
next_run = cron.get_next(datetime)
|
||||
return next_run.isoformat()
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
|
@ -204,7 +219,7 @@ def save_jobs(jobs: List[Dict[str, Any]]):
|
|||
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
|
||||
try:
|
||||
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||
json.dump({"jobs": jobs, "updated_at": datetime.now().isoformat()}, f, indent=2)
|
||||
json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, JOBS_FILE)
|
||||
|
|
@ -249,7 +264,7 @@ def create_job(
|
|||
deliver = "origin" if origin else "local"
|
||||
|
||||
job_id = uuid.uuid4().hex[:12]
|
||||
now = datetime.now().isoformat()
|
||||
now = _hermes_now().isoformat()
|
||||
|
||||
job = {
|
||||
"id": job_id,
|
||||
|
|
@ -328,7 +343,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
|||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
if job["id"] == job_id:
|
||||
now = datetime.now().isoformat()
|
||||
now = _hermes_now().isoformat()
|
||||
job["last_run_at"] = now
|
||||
job["last_status"] = "ok" if success else "error"
|
||||
job["last_error"] = error if not success else None
|
||||
|
|
@ -361,7 +376,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
|||
|
||||
def get_due_jobs() -> List[Dict[str, Any]]:
|
||||
"""Get all jobs that are due to run now."""
|
||||
now = datetime.now()
|
||||
now = _hermes_now()
|
||||
jobs = load_jobs()
|
||||
due = []
|
||||
|
||||
|
|
@ -373,7 +388,7 @@ def get_due_jobs() -> List[Dict[str, Any]]:
|
|||
if not next_run:
|
||||
continue
|
||||
|
||||
next_run_dt = datetime.fromisoformat(next_run)
|
||||
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
|
||||
if next_run_dt <= now:
|
||||
due.append(job)
|
||||
|
||||
|
|
@ -386,7 +401,7 @@ def save_job_output(job_id: str, output: str):
|
|||
job_output_dir = OUTPUT_DIR / job_id
|
||||
job_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
output_file = job_output_dir / f"{timestamp}.md"
|
||||
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ from datetime import datetime
|
|||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from hermes_time import now as _hermes_now
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Add parent directory to path for imports
|
||||
|
|
@ -207,7 +209,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
provider=runtime.get("provider"),
|
||||
api_mode=runtime.get("api_mode"),
|
||||
quiet_mode=True,
|
||||
session_id=f"cron_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
session_id=f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
|
||||
)
|
||||
|
||||
result = agent.run_conversation(prompt)
|
||||
|
|
@ -219,7 +221,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
output = f"""# Cron Job: {job_name}
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Prompt
|
||||
|
|
@ -241,7 +243,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
output = f"""# Cron Job: {job_name} (FAILED)
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Prompt
|
||||
|
|
@ -297,11 +299,11 @@ def tick(verbose: bool = True) -> int:
|
|||
due_jobs = get_due_jobs()
|
||||
|
||||
if verbose and not due_jobs:
|
||||
logger.info("%s - No jobs due", datetime.now().strftime('%H:%M:%S'))
|
||||
logger.info("%s - No jobs due", _hermes_now().strftime('%H:%M:%S'))
|
||||
return 0
|
||||
|
||||
if verbose:
|
||||
logger.info("%s - %s job(s) due", datetime.now().strftime('%H:%M:%S'), len(due_jobs))
|
||||
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
||||
|
||||
executed = 0
|
||||
for job in due_jobs:
|
||||
|
|
|
|||
|
|
@ -93,6 +93,11 @@ if _config_path.exists():
|
|||
if _agent_cfg and isinstance(_agent_cfg, dict):
|
||||
if "max_turns" in _agent_cfg:
|
||||
os.environ["HERMES_MAX_ITERATIONS"] = str(_agent_cfg["max_turns"])
|
||||
# Timezone: bridge config.yaml → HERMES_TIMEZONE env var.
|
||||
# HERMES_TIMEZONE from .env takes precedence (already in os.environ).
|
||||
_tz_cfg = _cfg.get("timezone", "")
|
||||
if _tz_cfg and isinstance(_tz_cfg, str) and "HERMES_TIMEZONE" not in os.environ:
|
||||
os.environ["HERMES_TIMEZONE"] = _tz_cfg.strip()
|
||||
except Exception:
|
||||
pass # Non-fatal; gateway can still run with .env values
|
||||
|
||||
|
|
|
|||
|
|
@ -141,9 +141,13 @@ DEFAULT_CONFIG = {
|
|||
# (apiKey, workspace, peerName, sessions, enabled) comes from the global config.
|
||||
"honcho": {},
|
||||
|
||||
# IANA timezone (e.g. "Asia/Kolkata", "America/New_York").
|
||||
# Empty string means use server-local time.
|
||||
"timezone": "",
|
||||
|
||||
# Permanently allowed dangerous command patterns (added via "always" approval)
|
||||
"command_allowlist": [],
|
||||
|
||||
|
||||
# Config schema version - bump this when adding new required fields
|
||||
"_config_version": 5,
|
||||
}
|
||||
|
|
@ -565,6 +569,22 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
|
|||
if not quiet:
|
||||
print(f" ✓ Migrated tool progress to config.yaml: {display['tool_progress']}")
|
||||
|
||||
# ── Version 4 → 5: add timezone field ──
|
||||
if current_ver < 5:
|
||||
config = load_config()
|
||||
if "timezone" not in config:
|
||||
old_tz = get_env_value("HERMES_TIMEZONE") if "get_env_value" in dir() else os.getenv("HERMES_TIMEZONE", "")
|
||||
if old_tz and old_tz.strip():
|
||||
config["timezone"] = old_tz.strip()
|
||||
results["config_added"].append(f"timezone={old_tz.strip()} (from HERMES_TIMEZONE)")
|
||||
else:
|
||||
config["timezone"] = ""
|
||||
results["config_added"].append("timezone= (empty, uses server-local)")
|
||||
save_config(config)
|
||||
if not quiet:
|
||||
tz_display = config["timezone"] or "(server-local)"
|
||||
print(f" ✓ Added timezone to config.yaml: {tz_display}")
|
||||
|
||||
if current_ver < latest_ver and not quiet:
|
||||
print(f"Config version: {current_ver} → {latest_ver}")
|
||||
|
||||
|
|
@ -852,6 +872,15 @@ def show_config():
|
|||
print(f" SSH host: {ssh_host or '(not set)'}")
|
||||
print(f" SSH user: {ssh_user or '(not set)'}")
|
||||
|
||||
# Timezone
|
||||
print()
|
||||
print(color("◆ Timezone", Colors.CYAN, Colors.BOLD))
|
||||
tz = config.get('timezone', '')
|
||||
if tz:
|
||||
print(f" Timezone: {tz}")
|
||||
else:
|
||||
print(f" Timezone: {color('(server-local)', Colors.DIM)}")
|
||||
|
||||
# Compression
|
||||
print()
|
||||
print(color("◆ Context Compression", Colors.CYAN, Colors.BOLD))
|
||||
|
|
|
|||
119
hermes_time.py
Normal file
119
hermes_time.py
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
"""
|
||||
Timezone-aware clock for Hermes.
|
||||
|
||||
Provides a single ``now()`` helper that returns a timezone-aware datetime
|
||||
based on the user's configured IANA timezone (e.g. ``Asia/Kolkata``).
|
||||
|
||||
Resolution order:
|
||||
1. ``HERMES_TIMEZONE`` environment variable
|
||||
2. ``timezone`` key in ``~/.hermes/config.yaml``
|
||||
3. Falls back to the server's local time (``datetime.now().astimezone()``)
|
||||
|
||||
Invalid timezone values log a warning and fall back safely — Hermes never
|
||||
crashes due to a bad timezone string.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone as _tz
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
from zoneinfo import ZoneInfo
|
||||
except ImportError:
|
||||
# Python 3.8 fallback (shouldn't be needed — Hermes requires 3.9+)
|
||||
from backports.zoneinfo import ZoneInfo # type: ignore[no-redef]
|
||||
|
||||
# Cached state — resolved once, reused on every call.
|
||||
# Call reset_cache() to force re-resolution (e.g. after config changes).
|
||||
_cached_tz: Optional[ZoneInfo] = None
|
||||
_cached_tz_name: Optional[str] = None
|
||||
_cache_resolved: bool = False
|
||||
|
||||
|
||||
def _resolve_timezone_name() -> str:
|
||||
"""Read the configured IANA timezone string (or empty string).
|
||||
|
||||
This does file I/O when falling through to config.yaml, so callers
|
||||
should cache the result rather than calling on every ``now()``.
|
||||
"""
|
||||
# 1. Environment variable (highest priority — set by Supervisor, etc.)
|
||||
tz_env = os.getenv("HERMES_TIMEZONE", "").strip()
|
||||
if tz_env:
|
||||
return tz_env
|
||||
|
||||
# 2. config.yaml ``timezone`` key
|
||||
try:
|
||||
import yaml
|
||||
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
config_path = hermes_home / "config.yaml"
|
||||
if config_path.exists():
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
tz_cfg = cfg.get("timezone", "")
|
||||
if isinstance(tz_cfg, str) and tz_cfg.strip():
|
||||
return tz_cfg.strip()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def _get_zoneinfo(name: str) -> Optional[ZoneInfo]:
|
||||
"""Validate and return a ZoneInfo, or None if invalid."""
|
||||
if not name:
|
||||
return None
|
||||
try:
|
||||
return ZoneInfo(name)
|
||||
except (KeyError, Exception) as exc:
|
||||
logger.warning(
|
||||
"Invalid timezone '%s': %s. Falling back to server local time.",
|
||||
name, exc,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def get_timezone() -> Optional[ZoneInfo]:
|
||||
"""Return the user's configured ZoneInfo, or None (meaning server-local).
|
||||
|
||||
Resolved once and cached. Call ``reset_cache()`` after config changes.
|
||||
"""
|
||||
global _cached_tz, _cached_tz_name, _cache_resolved
|
||||
if not _cache_resolved:
|
||||
_cached_tz_name = _resolve_timezone_name()
|
||||
_cached_tz = _get_zoneinfo(_cached_tz_name)
|
||||
_cache_resolved = True
|
||||
return _cached_tz
|
||||
|
||||
|
||||
def get_timezone_name() -> str:
|
||||
"""Return the IANA name of the configured timezone, or empty string."""
|
||||
global _cached_tz_name, _cache_resolved
|
||||
if not _cache_resolved:
|
||||
get_timezone() # populates cache
|
||||
return _cached_tz_name or ""
|
||||
|
||||
|
||||
def now() -> datetime:
|
||||
"""
|
||||
Return the current time as a timezone-aware datetime.
|
||||
|
||||
If a valid timezone is configured, returns wall-clock time in that zone.
|
||||
Otherwise returns the server's local time (via ``astimezone()``).
|
||||
"""
|
||||
tz = get_timezone()
|
||||
if tz is not None:
|
||||
return datetime.now(tz)
|
||||
# No timezone configured — use server-local (still tz-aware)
|
||||
return datetime.now().astimezone()
|
||||
|
||||
|
||||
def reset_cache() -> None:
|
||||
"""Clear the cached timezone. Used by tests and after config changes."""
|
||||
global _cached_tz, _cached_tz_name, _cache_resolved
|
||||
_cached_tz = None
|
||||
_cached_tz_name = None
|
||||
_cache_resolved = False
|
||||
|
|
@ -1363,7 +1363,8 @@ class AIAgent:
|
|||
if context_files_prompt:
|
||||
prompt_parts.append(context_files_prompt)
|
||||
|
||||
now = datetime.now()
|
||||
from hermes_time import now as _hermes_now
|
||||
now = _hermes_now()
|
||||
prompt_parts.append(
|
||||
f"Conversation started: {now.strftime('%A, %B %d, %Y %I:%M %p')}"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -75,8 +75,9 @@ class TestParseSchedule:
|
|||
run_at_str = result["run_at"]
|
||||
assert isinstance(run_at_str, str)
|
||||
run_at = datetime.fromisoformat(run_at_str)
|
||||
assert run_at > datetime.now()
|
||||
assert run_at < datetime.now() + timedelta(minutes=31)
|
||||
now = datetime.now().astimezone()
|
||||
assert run_at > now
|
||||
assert run_at < now + timedelta(minutes=31)
|
||||
|
||||
def test_every_becomes_interval(self):
|
||||
result = parse_schedule("every 2h")
|
||||
|
|
@ -129,15 +130,15 @@ class TestComputeNextRun:
|
|||
result = compute_next_run(schedule)
|
||||
next_dt = datetime.fromisoformat(result)
|
||||
# Should be ~60 minutes from now
|
||||
assert next_dt > datetime.now() + timedelta(minutes=59)
|
||||
assert next_dt > datetime.now().astimezone() + timedelta(minutes=59)
|
||||
|
||||
def test_interval_subsequent_run(self):
|
||||
schedule = {"kind": "interval", "minutes": 30}
|
||||
last = datetime.now().isoformat()
|
||||
last = datetime.now().astimezone().isoformat()
|
||||
result = compute_next_run(schedule, last_run_at=last)
|
||||
next_dt = datetime.fromisoformat(result)
|
||||
# Should be ~30 minutes from last run
|
||||
assert next_dt > datetime.now() + timedelta(minutes=29)
|
||||
assert next_dt > datetime.now().astimezone() + timedelta(minutes=29)
|
||||
|
||||
def test_cron_returns_future(self):
|
||||
pytest.importorskip("croniter")
|
||||
|
|
@ -147,7 +148,7 @@ class TestComputeNextRun:
|
|||
assert len(result) > 0
|
||||
next_dt = datetime.fromisoformat(result)
|
||||
assert isinstance(next_dt, datetime)
|
||||
assert next_dt > datetime.now()
|
||||
assert next_dt > datetime.now().astimezone()
|
||||
|
||||
def test_unknown_kind_returns_none(self):
|
||||
assert compute_next_run({"kind": "unknown"}) is None
|
||||
|
|
|
|||
269
tests/test_timezone.py
Normal file
269
tests/test_timezone.py
Normal file
|
|
@ -0,0 +1,269 @@
|
|||
"""
|
||||
Tests for timezone support (hermes_time module + integration points).
|
||||
|
||||
Covers:
|
||||
- Valid timezone applies correctly
|
||||
- Invalid timezone falls back safely (no crash, warning logged)
|
||||
- execute_code child env receives TZ
|
||||
- Cron uses timezone-aware now()
|
||||
- Backward compatibility with naive timestamps
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import sys
|
||||
import pytest
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import patch, MagicMock
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import hermes_time
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# hermes_time.now() — core helper
|
||||
# =========================================================================
|
||||
|
||||
class TestHermesTimeNow:
|
||||
"""Test the timezone-aware now() helper."""
|
||||
|
||||
def setup_method(self):
|
||||
hermes_time.reset_cache()
|
||||
|
||||
def teardown_method(self):
|
||||
hermes_time.reset_cache()
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
|
||||
def test_valid_timezone_applies(self):
|
||||
"""With a valid IANA timezone, now() returns time in that zone."""
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
result = hermes_time.now()
|
||||
assert result.tzinfo is not None
|
||||
# IST is UTC+5:30
|
||||
offset = result.utcoffset()
|
||||
assert offset == timedelta(hours=5, minutes=30)
|
||||
|
||||
def test_utc_timezone(self):
|
||||
"""UTC timezone works."""
|
||||
os.environ["HERMES_TIMEZONE"] = "UTC"
|
||||
result = hermes_time.now()
|
||||
assert result.utcoffset() == timedelta(0)
|
||||
|
||||
def test_us_eastern(self):
|
||||
"""US/Eastern timezone works (DST-aware zone)."""
|
||||
os.environ["HERMES_TIMEZONE"] = "America/New_York"
|
||||
result = hermes_time.now()
|
||||
assert result.tzinfo is not None
|
||||
# Offset is -5h or -4h depending on DST
|
||||
offset_hours = result.utcoffset().total_seconds() / 3600
|
||||
assert offset_hours in (-5, -4)
|
||||
|
||||
def test_invalid_timezone_falls_back(self, caplog):
|
||||
"""Invalid timezone logs warning and falls back to server-local."""
|
||||
os.environ["HERMES_TIMEZONE"] = "Mars/Olympus_Mons"
|
||||
with caplog.at_level(logging.WARNING, logger="hermes_time"):
|
||||
result = hermes_time.now()
|
||||
assert result.tzinfo is not None # Still tz-aware (server-local)
|
||||
assert "Invalid timezone" in caplog.text
|
||||
assert "Mars/Olympus_Mons" in caplog.text
|
||||
|
||||
def test_empty_timezone_uses_local(self):
|
||||
"""No timezone configured → server-local time (still tz-aware)."""
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
result = hermes_time.now()
|
||||
assert result.tzinfo is not None
|
||||
|
||||
def test_format_unchanged(self):
|
||||
"""Timestamp formatting matches original strftime pattern."""
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
result = hermes_time.now()
|
||||
formatted = result.strftime("%A, %B %d, %Y %I:%M %p")
|
||||
# Should produce something like "Monday, March 03, 2026 05:30 PM"
|
||||
assert len(formatted) > 10
|
||||
# No timezone abbreviation in the format (matching original behavior)
|
||||
assert "+" not in formatted
|
||||
|
||||
def test_cache_invalidation(self):
|
||||
"""Changing env var + reset_cache picks up new timezone."""
|
||||
os.environ["HERMES_TIMEZONE"] = "UTC"
|
||||
hermes_time.reset_cache()
|
||||
r1 = hermes_time.now()
|
||||
assert r1.utcoffset() == timedelta(0)
|
||||
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
hermes_time.reset_cache()
|
||||
r2 = hermes_time.now()
|
||||
assert r2.utcoffset() == timedelta(hours=5, minutes=30)
|
||||
|
||||
|
||||
class TestGetTimezone:
|
||||
"""Test get_timezone() and get_timezone_name()."""
|
||||
|
||||
def setup_method(self):
|
||||
hermes_time.reset_cache()
|
||||
|
||||
def teardown_method(self):
|
||||
hermes_time.reset_cache()
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
|
||||
def test_returns_zoneinfo_for_valid(self):
|
||||
os.environ["HERMES_TIMEZONE"] = "Europe/London"
|
||||
tz = hermes_time.get_timezone()
|
||||
assert isinstance(tz, ZoneInfo)
|
||||
assert str(tz) == "Europe/London"
|
||||
|
||||
def test_returns_none_for_empty(self):
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
tz = hermes_time.get_timezone()
|
||||
assert tz is None
|
||||
|
||||
def test_returns_none_for_invalid(self):
|
||||
os.environ["HERMES_TIMEZONE"] = "Not/A/Timezone"
|
||||
tz = hermes_time.get_timezone()
|
||||
assert tz is None
|
||||
|
||||
def test_get_timezone_name(self):
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Tokyo"
|
||||
assert hermes_time.get_timezone_name() == "Asia/Tokyo"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# execute_code child env — TZ injection
|
||||
# =========================================================================
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="UDS not available on Windows")
|
||||
class TestCodeExecutionTZ:
|
||||
"""Verify TZ env var is passed to sandboxed child process via real execute_code."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _import_execute_code(self):
|
||||
"""Lazy-import execute_code to avoid pulling in firecrawl at collection time."""
|
||||
try:
|
||||
from tools.code_execution_tool import execute_code
|
||||
self._execute_code = execute_code
|
||||
except ImportError:
|
||||
pytest.skip("tools.code_execution_tool not importable (missing deps)")
|
||||
|
||||
def teardown_method(self):
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
|
||||
def _mock_handle(self, function_name, function_args, task_id=None, user_task=None):
|
||||
import json as _json
|
||||
return _json.dumps({"error": f"unexpected tool call: {function_name}"})
|
||||
|
||||
def test_tz_injected_when_configured(self):
|
||||
"""When HERMES_TIMEZONE is set, child process sees TZ env var."""
|
||||
import json as _json
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
|
||||
with patch("model_tools.handle_function_call", side_effect=self._mock_handle):
|
||||
result = _json.loads(self._execute_code(
|
||||
code='import os; print(os.environ.get("TZ", "NOT_SET"))',
|
||||
task_id="tz-test",
|
||||
enabled_tools=[],
|
||||
))
|
||||
assert result["status"] == "success"
|
||||
assert "Asia/Kolkata" in result["output"]
|
||||
|
||||
def test_tz_not_injected_when_empty(self):
|
||||
"""When HERMES_TIMEZONE is not set, child process has no TZ."""
|
||||
import json as _json
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
|
||||
with patch("model_tools.handle_function_call", side_effect=self._mock_handle):
|
||||
result = _json.loads(self._execute_code(
|
||||
code='import os; print(os.environ.get("TZ", "NOT_SET"))',
|
||||
task_id="tz-test-empty",
|
||||
enabled_tools=[],
|
||||
))
|
||||
assert result["status"] == "success"
|
||||
assert "NOT_SET" in result["output"]
|
||||
|
||||
def test_hermes_timezone_not_leaked_to_child(self):
|
||||
"""HERMES_TIMEZONE itself must NOT appear in child env (only TZ)."""
|
||||
import json as _json
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
|
||||
with patch("model_tools.handle_function_call", side_effect=self._mock_handle):
|
||||
result = _json.loads(self._execute_code(
|
||||
code='import os; print(os.environ.get("HERMES_TIMEZONE", "NOT_SET"))',
|
||||
task_id="tz-leak-test",
|
||||
enabled_tools=[],
|
||||
))
|
||||
assert result["status"] == "success"
|
||||
assert "NOT_SET" in result["output"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Cron timezone-aware scheduling
|
||||
# =========================================================================
|
||||
|
||||
class TestCronTimezone:
|
||||
"""Verify cron paths use timezone-aware now()."""
|
||||
|
||||
def setup_method(self):
|
||||
hermes_time.reset_cache()
|
||||
|
||||
def teardown_method(self):
|
||||
hermes_time.reset_cache()
|
||||
os.environ.pop("HERMES_TIMEZONE", None)
|
||||
|
||||
def test_parse_schedule_duration_uses_tz_aware_now(self):
|
||||
"""parse_schedule('30m') should produce a tz-aware run_at."""
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
from cron.jobs import parse_schedule
|
||||
result = parse_schedule("30m")
|
||||
run_at = datetime.fromisoformat(result["run_at"])
|
||||
# The stored timestamp should be tz-aware
|
||||
assert run_at.tzinfo is not None
|
||||
|
||||
def test_compute_next_run_tz_aware(self):
|
||||
"""compute_next_run returns tz-aware timestamps."""
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
from cron.jobs import compute_next_run
|
||||
schedule = {"kind": "interval", "minutes": 60}
|
||||
result = compute_next_run(schedule)
|
||||
next_dt = datetime.fromisoformat(result)
|
||||
assert next_dt.tzinfo is not None
|
||||
|
||||
def test_get_due_jobs_handles_naive_timestamps(self, tmp_path, monkeypatch):
|
||||
"""Backward compat: naive timestamps from before tz support don't crash."""
|
||||
import cron.jobs as jobs_module
|
||||
monkeypatch.setattr(jobs_module, "CRON_DIR", tmp_path / "cron")
|
||||
monkeypatch.setattr(jobs_module, "JOBS_FILE", tmp_path / "cron" / "jobs.json")
|
||||
monkeypatch.setattr(jobs_module, "OUTPUT_DIR", tmp_path / "cron" / "output")
|
||||
|
||||
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
|
||||
hermes_time.reset_cache()
|
||||
|
||||
# Create a job with a NAIVE past timestamp (simulating pre-tz data)
|
||||
from cron.jobs import create_job, load_jobs, save_jobs, get_due_jobs
|
||||
job = create_job(prompt="Test job", schedule="every 1h")
|
||||
jobs = load_jobs()
|
||||
# Force a naive (no timezone) past timestamp
|
||||
naive_past = (datetime.now() - timedelta(minutes=5)).isoformat()
|
||||
jobs[0]["next_run_at"] = naive_past
|
||||
save_jobs(jobs)
|
||||
|
||||
# Should not crash — _ensure_aware handles the naive timestamp
|
||||
due = get_due_jobs()
|
||||
assert len(due) == 1
|
||||
|
||||
def test_create_job_stores_tz_aware_timestamps(self, tmp_path, monkeypatch):
|
||||
"""New jobs store timezone-aware created_at and next_run_at."""
|
||||
import cron.jobs as jobs_module
|
||||
monkeypatch.setattr(jobs_module, "CRON_DIR", tmp_path / "cron")
|
||||
monkeypatch.setattr(jobs_module, "JOBS_FILE", tmp_path / "cron" / "jobs.json")
|
||||
monkeypatch.setattr(jobs_module, "OUTPUT_DIR", tmp_path / "cron" / "output")
|
||||
|
||||
os.environ["HERMES_TIMEZONE"] = "US/Eastern"
|
||||
hermes_time.reset_cache()
|
||||
|
||||
from cron.jobs import create_job
|
||||
job = create_job(prompt="TZ test", schedule="every 2h")
|
||||
|
||||
created = datetime.fromisoformat(job["created_at"])
|
||||
assert created.tzinfo is not None
|
||||
|
||||
next_run = datetime.fromisoformat(job["next_run_at"])
|
||||
assert next_run.tzinfo is not None
|
||||
|
|
@ -435,6 +435,11 @@ def execute_code(
|
|||
child_env[k] = v
|
||||
child_env["HERMES_RPC_SOCKET"] = sock_path
|
||||
child_env["PYTHONDONTWRITEBYTECODE"] = "1"
|
||||
# Inject user's configured timezone so datetime.now() in sandboxed
|
||||
# code reflects the correct wall-clock time.
|
||||
_tz_name = os.getenv("HERMES_TIMEZONE", "").strip()
|
||||
if _tz_name:
|
||||
child_env["TZ"] = _tz_name
|
||||
|
||||
proc = subprocess.Popen(
|
||||
[sys.executable, "script.py"],
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue