fix: add cron API provenance logging (#24889)

Co-authored-by: sgtworkman <178342791+sgtworkman@users.noreply.github.com>
This commit is contained in:
Glen Workman 2026-05-25 04:15:56 -04:00 committed by GitHub
parent 92d91365e7
commit d952b377aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 154 additions and 1 deletions

View file

@ -257,6 +257,30 @@ def _resolve_origin(job: dict) -> Optional[dict]:
return None
def _cron_job_origin_log_suffix(job: dict) -> str:
"""Return safe provenance details for security warnings about a cron job.
The scheduler normally has no live HTTP request object when it detects a
bad stored ``context_from`` reference. Including the job's saved origin
makes future probe logs actionable without exposing secrets: platform/chat
metadata for gateway-created jobs, and optional source-IP fields for API
surfaces that persist them in origin metadata.
"""
origin = job.get("origin")
if not isinstance(origin, dict):
return ""
fields = []
for key in ("platform", "chat_id", "thread_id", "source_ip", "remote", "forwarded_for"):
value = origin.get(key)
if value is None:
continue
text = str(value).replace("\r", " ").replace("\n", " ").strip()
if text:
fields.append(f"origin_{key}={text[:200]!r}")
return " " + " ".join(fields) if fields else ""
def _plugin_cron_env_var(platform_name: str) -> str:
"""Return the cron home-channel env var registered by a plugin platform.
@ -1027,7 +1051,13 @@ def _build_job_prompt(job: dict, prerun_script: Optional[tuple] = None) -> str:
for source_job_id in context_from:
# Guard against path traversal — valid job IDs are 12-char hex strings
if not source_job_id or not all(c in "0123456789abcdef" for c in source_job_id):
logger.warning("context_from: skipping invalid job_id %r", source_job_id)
logger.warning(
"context_from: skipping invalid job_id %r for job_id=%r name=%r%s",
source_job_id,
job.get("id"),
job.get("name"),
_cron_job_origin_log_suffix(job),
)
continue
try:
job_output_dir = OUTPUT_DIR / source_job_id

View file

@ -763,6 +763,58 @@ class APIServerAdapter(BasePlatformAdapter):
return "*" in self._cors_origins or origin in self._cors_origins
@staticmethod
def _clean_log_value(value: Any, *, max_len: int = 200) -> str:
"""Sanitize request metadata before it reaches security logs."""
if value is None:
return ""
text = str(value).replace("\r", " ").replace("\n", " ").strip()
return text[:max_len]
def _request_audit_context(self, request: "web.Request") -> Dict[str, str]:
"""Return non-secret source metadata for security/audit warnings."""
peer_ip = ""
try:
peer = request.transport.get_extra_info("peername") if request.transport else None
if isinstance(peer, (tuple, list)) and peer:
peer_ip = str(peer[0])
except Exception:
peer_ip = ""
return {
"remote": self._clean_log_value(getattr(request, "remote", "") or peer_ip),
"peer_ip": self._clean_log_value(peer_ip),
"forwarded_for": self._clean_log_value(request.headers.get("X-Forwarded-For", "")),
"real_ip": self._clean_log_value(request.headers.get("X-Real-IP", "")),
"method": self._clean_log_value(request.method, max_len=16),
"path": self._clean_log_value(request.path_qs, max_len=500),
"user_agent": self._clean_log_value(request.headers.get("User-Agent", ""), max_len=300),
}
def _request_audit_log_suffix(self, request: "web.Request") -> str:
ctx = self._request_audit_context(request)
fields = [f"{key}={value!r}" for key, value in ctx.items() if value]
return " ".join(fields) if fields else "source='unknown'"
def _cron_origin_from_request(self, request: "web.Request") -> Dict[str, str]:
"""Persist safe API source metadata on cron jobs created over HTTP."""
ctx = self._request_audit_context(request)
origin = {
"platform": "api_server",
"chat_id": "api",
}
if ctx.get("remote"):
origin["source_ip"] = ctx["remote"]
if ctx.get("peer_ip"):
origin["peer_ip"] = ctx["peer_ip"]
if ctx.get("forwarded_for"):
origin["forwarded_for"] = ctx["forwarded_for"]
if ctx.get("real_ip"):
origin["real_ip"] = ctx["real_ip"]
if ctx.get("user_agent"):
origin["user_agent"] = ctx["user_agent"]
return origin
# ------------------------------------------------------------------
# Auth helper
# ------------------------------------------------------------------
@ -784,6 +836,10 @@ class APIServerAdapter(BasePlatformAdapter):
if hmac.compare_digest(token, self._api_key):
return None # Auth OK
logger.warning(
"API server rejected invalid API key: %s",
self._request_audit_log_suffix(request),
)
return web.json_response(
{"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}},
status=401,
@ -2454,6 +2510,11 @@ class APIServerAdapter(BasePlatformAdapter):
"""Validate and extract job_id. Returns (job_id, error_response)."""
job_id = request.match_info["job_id"]
if not self._JOB_ID_RE.fullmatch(job_id):
logger.warning(
"Cron jobs API rejected invalid job_id %r: %s",
job_id,
self._request_audit_log_suffix(request),
)
return job_id, web.json_response(
{"error": "Invalid job ID format"}, status=400,
)
@ -2511,6 +2572,7 @@ class APIServerAdapter(BasePlatformAdapter):
"schedule": schedule,
"name": name,
"deliver": deliver,
"origin": self._cron_origin_from_request(request),
}
if skills:
kwargs["skills"] = skills

View file

@ -1,5 +1,6 @@
"""Tests for cron job context_from feature (issue #5439 Option C)."""
import logging
import sys
from pathlib import Path
@ -267,6 +268,35 @@ class TestBuildJobPromptContextFrom:
assert "Process" in prompt
assert "etc/passwd" not in prompt
def test_invalid_job_id_log_includes_job_origin(self, cron_env, caplog):
"""Invalid stored context_from refs log job/source provenance."""
from cron.jobs import create_job
from cron.scheduler import _build_job_prompt
job = create_job(
prompt="Process",
schedule="every 2h",
name="suspicious-chain",
origin={
"platform": "api_server",
"chat_id": "api",
"source_ip": "203.0.113.10",
"forwarded_for": "198.51.100.7",
},
)
job["context_from"] = ["../../../etc/passwd"]
caplog.set_level(logging.WARNING, logger="cron.scheduler")
prompt = _build_job_prompt(job)
assert "Process" in prompt
message = caplog.text
assert "context_from: skipping invalid job_id" in message
assert job["id"] in message
assert "suspicious-chain" in message
assert "203.0.113.10" in message
assert "198.51.100.7" in message
class TestUpdateContextFrom:

View file

@ -11,6 +11,7 @@ Covers:
"""
import json
import logging
from unittest.mock import MagicMock, patch
import pytest
@ -151,6 +152,9 @@ class TestCreateJob:
"name": "test-job",
"schedule": "*/5 * * * *",
"prompt": "do something",
}, headers={
"X-Forwarded-For": "203.0.113.11",
"User-Agent": "cron-client",
})
assert resp.status == 200
data = await resp.json()
@ -160,6 +164,10 @@ class TestCreateJob:
assert call_kwargs["name"] == "test-job"
assert call_kwargs["schedule"] == "*/5 * * * *"
assert call_kwargs["prompt"] == "do something"
assert call_kwargs["origin"]["platform"] == "api_server"
assert call_kwargs["origin"]["chat_id"] == "api"
assert call_kwargs["origin"]["forwarded_for"] == "203.0.113.11"
assert call_kwargs["origin"]["user_agent"] == "cron-client"
@pytest.mark.asyncio
async def test_create_job_missing_name(self, adapter):
@ -280,6 +288,29 @@ class TestGetJob:
data = await resp.json()
assert "Invalid" in data["error"]
@pytest.mark.asyncio
async def test_invalid_job_id_logs_source_context(self, adapter, caplog):
"""Invalid job-id probes log source metadata for later investigation."""
app = _create_app(adapter)
caplog.set_level(logging.WARNING, logger="gateway.platforms.api_server")
async with TestClient(TestServer(app)) as cli:
with patch(f"{_MOD}._CRON_AVAILABLE", True):
resp = await cli.get(
"/api/jobs/..%2F..%2F..%2Fetc%2Fpasswd",
headers={
"X-Forwarded-For": "203.0.113.9",
"User-Agent": "probe scanner",
},
)
assert resp.status == 400
message = caplog.text
assert "Cron jobs API rejected invalid job_id" in message
assert "203.0.113.9" in message
assert "GET" in message
assert "/api/jobs/" in message
assert "probe scanner" in message
# ---------------------------------------------------------------------------
# 11-12. test_update_job