diff --git a/cron/scheduler.py b/cron/scheduler.py index 37c250b67d0..e0346d06e49 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -257,6 +257,30 @@ def _resolve_origin(job: dict) -> Optional[dict]: return None +def _cron_job_origin_log_suffix(job: dict) -> str: + """Return safe provenance details for security warnings about a cron job. + + The scheduler normally has no live HTTP request object when it detects a + bad stored ``context_from`` reference. Including the job's saved origin + makes future probe logs actionable without exposing secrets: platform/chat + metadata for gateway-created jobs, and optional source-IP fields for API + surfaces that persist them in origin metadata. + """ + origin = job.get("origin") + if not isinstance(origin, dict): + return "" + + fields = [] + for key in ("platform", "chat_id", "thread_id", "source_ip", "remote", "forwarded_for"): + value = origin.get(key) + if value is None: + continue + text = str(value).replace("\r", " ").replace("\n", " ").strip() + if text: + fields.append(f"origin_{key}={text[:200]!r}") + return " " + " ".join(fields) if fields else "" + + def _plugin_cron_env_var(platform_name: str) -> str: """Return the cron home-channel env var registered by a plugin platform. @@ -1027,7 +1051,13 @@ def _build_job_prompt(job: dict, prerun_script: Optional[tuple] = None) -> str: for source_job_id in context_from: # Guard against path traversal — valid job IDs are 12-char hex strings if not source_job_id or not all(c in "0123456789abcdef" for c in source_job_id): - logger.warning("context_from: skipping invalid job_id %r", source_job_id) + logger.warning( + "context_from: skipping invalid job_id %r for job_id=%r name=%r%s", + source_job_id, + job.get("id"), + job.get("name"), + _cron_job_origin_log_suffix(job), + ) continue try: job_output_dir = OUTPUT_DIR / source_job_id diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 1f02bde5a2a..a18630f85ce 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -763,6 +763,58 @@ class APIServerAdapter(BasePlatformAdapter): return "*" in self._cors_origins or origin in self._cors_origins + @staticmethod + def _clean_log_value(value: Any, *, max_len: int = 200) -> str: + """Sanitize request metadata before it reaches security logs.""" + if value is None: + return "" + text = str(value).replace("\r", " ").replace("\n", " ").strip() + return text[:max_len] + + def _request_audit_context(self, request: "web.Request") -> Dict[str, str]: + """Return non-secret source metadata for security/audit warnings.""" + peer_ip = "" + try: + peer = request.transport.get_extra_info("peername") if request.transport else None + if isinstance(peer, (tuple, list)) and peer: + peer_ip = str(peer[0]) + except Exception: + peer_ip = "" + + return { + "remote": self._clean_log_value(getattr(request, "remote", "") or peer_ip), + "peer_ip": self._clean_log_value(peer_ip), + "forwarded_for": self._clean_log_value(request.headers.get("X-Forwarded-For", "")), + "real_ip": self._clean_log_value(request.headers.get("X-Real-IP", "")), + "method": self._clean_log_value(request.method, max_len=16), + "path": self._clean_log_value(request.path_qs, max_len=500), + "user_agent": self._clean_log_value(request.headers.get("User-Agent", ""), max_len=300), + } + + def _request_audit_log_suffix(self, request: "web.Request") -> str: + ctx = self._request_audit_context(request) + fields = [f"{key}={value!r}" for key, value in ctx.items() if value] + return " ".join(fields) if fields else "source='unknown'" + + def _cron_origin_from_request(self, request: "web.Request") -> Dict[str, str]: + """Persist safe API source metadata on cron jobs created over HTTP.""" + ctx = self._request_audit_context(request) + origin = { + "platform": "api_server", + "chat_id": "api", + } + if ctx.get("remote"): + origin["source_ip"] = ctx["remote"] + if ctx.get("peer_ip"): + origin["peer_ip"] = ctx["peer_ip"] + if ctx.get("forwarded_for"): + origin["forwarded_for"] = ctx["forwarded_for"] + if ctx.get("real_ip"): + origin["real_ip"] = ctx["real_ip"] + if ctx.get("user_agent"): + origin["user_agent"] = ctx["user_agent"] + return origin + # ------------------------------------------------------------------ # Auth helper # ------------------------------------------------------------------ @@ -784,6 +836,10 @@ class APIServerAdapter(BasePlatformAdapter): if hmac.compare_digest(token, self._api_key): return None # Auth OK + logger.warning( + "API server rejected invalid API key: %s", + self._request_audit_log_suffix(request), + ) return web.json_response( {"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}}, status=401, @@ -2454,6 +2510,11 @@ class APIServerAdapter(BasePlatformAdapter): """Validate and extract job_id. Returns (job_id, error_response).""" job_id = request.match_info["job_id"] if not self._JOB_ID_RE.fullmatch(job_id): + logger.warning( + "Cron jobs API rejected invalid job_id %r: %s", + job_id, + self._request_audit_log_suffix(request), + ) return job_id, web.json_response( {"error": "Invalid job ID format"}, status=400, ) @@ -2511,6 +2572,7 @@ class APIServerAdapter(BasePlatformAdapter): "schedule": schedule, "name": name, "deliver": deliver, + "origin": self._cron_origin_from_request(request), } if skills: kwargs["skills"] = skills diff --git a/tests/cron/test_cron_context_from.py b/tests/cron/test_cron_context_from.py index 046d41f1e44..f0277d25e1c 100644 --- a/tests/cron/test_cron_context_from.py +++ b/tests/cron/test_cron_context_from.py @@ -1,5 +1,6 @@ """Tests for cron job context_from feature (issue #5439 Option C).""" +import logging import sys from pathlib import Path @@ -267,6 +268,35 @@ class TestBuildJobPromptContextFrom: assert "Process" in prompt assert "etc/passwd" not in prompt + def test_invalid_job_id_log_includes_job_origin(self, cron_env, caplog): + """Invalid stored context_from refs log job/source provenance.""" + from cron.jobs import create_job + from cron.scheduler import _build_job_prompt + + job = create_job( + prompt="Process", + schedule="every 2h", + name="suspicious-chain", + origin={ + "platform": "api_server", + "chat_id": "api", + "source_ip": "203.0.113.10", + "forwarded_for": "198.51.100.7", + }, + ) + job["context_from"] = ["../../../etc/passwd"] + + caplog.set_level(logging.WARNING, logger="cron.scheduler") + prompt = _build_job_prompt(job) + + assert "Process" in prompt + message = caplog.text + assert "context_from: skipping invalid job_id" in message + assert job["id"] in message + assert "suspicious-chain" in message + assert "203.0.113.10" in message + assert "198.51.100.7" in message + class TestUpdateContextFrom: diff --git a/tests/gateway/test_api_server_jobs.py b/tests/gateway/test_api_server_jobs.py index a1476578386..087bfc5b404 100644 --- a/tests/gateway/test_api_server_jobs.py +++ b/tests/gateway/test_api_server_jobs.py @@ -11,6 +11,7 @@ Covers: """ import json +import logging from unittest.mock import MagicMock, patch import pytest @@ -151,6 +152,9 @@ class TestCreateJob: "name": "test-job", "schedule": "*/5 * * * *", "prompt": "do something", + }, headers={ + "X-Forwarded-For": "203.0.113.11", + "User-Agent": "cron-client", }) assert resp.status == 200 data = await resp.json() @@ -160,6 +164,10 @@ class TestCreateJob: assert call_kwargs["name"] == "test-job" assert call_kwargs["schedule"] == "*/5 * * * *" assert call_kwargs["prompt"] == "do something" + assert call_kwargs["origin"]["platform"] == "api_server" + assert call_kwargs["origin"]["chat_id"] == "api" + assert call_kwargs["origin"]["forwarded_for"] == "203.0.113.11" + assert call_kwargs["origin"]["user_agent"] == "cron-client" @pytest.mark.asyncio async def test_create_job_missing_name(self, adapter): @@ -280,6 +288,29 @@ class TestGetJob: data = await resp.json() assert "Invalid" in data["error"] + @pytest.mark.asyncio + async def test_invalid_job_id_logs_source_context(self, adapter, caplog): + """Invalid job-id probes log source metadata for later investigation.""" + app = _create_app(adapter) + caplog.set_level(logging.WARNING, logger="gateway.platforms.api_server") + async with TestClient(TestServer(app)) as cli: + with patch(f"{_MOD}._CRON_AVAILABLE", True): + resp = await cli.get( + "/api/jobs/..%2F..%2F..%2Fetc%2Fpasswd", + headers={ + "X-Forwarded-For": "203.0.113.9", + "User-Agent": "probe scanner", + }, + ) + assert resp.status == 400 + + message = caplog.text + assert "Cron jobs API rejected invalid job_id" in message + assert "203.0.113.9" in message + assert "GET" in message + assert "/api/jobs/" in message + assert "probe scanner" in message + # --------------------------------------------------------------------------- # 11-12. test_update_job