refactor(cron): extract run_one_job shared firing helper from tick

Phase 4A. Factor tick's per-job closure (_process_job: execute → save →
deliver → mark) into a module-level run_one_job(job, *, adapters, loop,
verbose) so the external Chronos provider's fire_due (Phase 4D) reuses the
IDENTICAL body — no duplicated correctness. tick's _process_job is now a thin
wrapper calling run_one_job; the pool/in-flight-guard/contextvars dispatch
logic is unchanged.

run_one_job fires ONE given job; it does NOT decide due-ness, claim, or compute
next_run (tick advances next_run_at under the file lock; an external provider
claims via the store CAS in Phase 4C). Pure refactor, no behavior change.

TDD: test_run_one_job.py characterizes the sequence through tick() first
(test_tick_process_job_sequence, passed pre-extraction), then unit-tests the
helper directly: success sequence, [SILENT]→skip delivery, empty-response soft
failure (#8585), failed-job-still-delivers, exception→mark-failed.

Verified: tests/cron/ 459 passed (was 453 + 6 new); tick behavior unchanged.
This commit is contained in:
Ben 2026-06-18 14:26:29 +10:00
parent bfb6e0bb33
commit 58b19a4f69
2 changed files with 182 additions and 42 deletions

View file

@ -1967,6 +1967,64 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -> bool:
"""Run ONE due job end-to-end: execute → save output → deliver → mark.
This is the shared firing body extracted from ``tick``'s per-job closure so
that BOTH the built-in ticker and an external provider's ``fire_due`` (e.g.
Chronos) run the identical sequence no duplicated correctness.
It does NOT decide whether the job is due, claim it, or compute the next
run those are the caller's concern (``tick`` advances ``next_run_at``
under the file lock before dispatch; an external provider claims via the
store CAS). This function only fires the given job once.
Returns True if the job was processed (even if the job itself failed
failure is recorded via ``mark_job_run``), False only if processing raised.
"""
try:
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
# Treat whitespace-only final responses the same as empty
# responses: do not deliver a blank message, and let the
# empty-response guard below mark the run as a soft failure.
should_deliver = bool(deliver_content.strip())
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
# Treat empty final_response as a soft failure so last_status
# is not "ok" — the agent ran but produced nothing useful.
# (issue #8585)
if success and not final_response.strip():
success = False
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
return True
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return False
def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int:
"""
Check and run all due jobs.
@ -2045,48 +2103,11 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i
)
def _process_job(job: dict) -> bool:
"""Run one due job end-to-end: execute, save, deliver, mark."""
try:
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
# Treat whitespace-only final responses the same as empty
# responses: do not deliver a blank message, and let the
# empty-response guard below mark the run as a soft failure.
should_deliver = bool(deliver_content.strip())
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
# Treat empty final_response as a soft failure so last_status
# is not "ok" — the agent ran but produced nothing useful.
# (issue #8585)
if success and not final_response.strip():
success = False
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
return True
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return False
"""Run one due job end-to-end. Thin wrapper around the shared
module-level ``run_one_job`` so ``tick`` and external providers
(Chronos ``fire_due``) use the identical executesavedelivermark
body."""
return run_one_job(job, adapters=adapters, loop=loop, verbose=verbose)
# Partition due jobs: those with a per-job workdir mutate
# os.environ["TERMINAL_CWD"] inside run_job, which is process-global —

View file

@ -0,0 +1,119 @@
"""Characterization + unit tests for the `run_one_job` shared helper (Phase 4A).
`tick`'s per-job body (`_process_job`) is the execute → save → deliver → mark
sequence that fires ONE due job. Phase 4A extracts it into a module-level
`run_one_job(job, *, adapters=None, loop=None, verbose=False)` so the external
Chronos provider's `fire_due` can reuse the IDENTICAL body — no duplicated
correctness.
The first test characterizes the sequence as driven through `tick()` (proving
the extraction didn't change `tick`'s behavior); the rest unit-test the
extracted helper directly.
"""
import cron.scheduler as s
def _patch_pipeline(monkeypatch, *, success=True, output="out", final="final response",
error=None, silent_marker_in=None):
"""Patch the job pipeline primitives and record the call order."""
calls = []
def fake_run_job(job):
calls.append(("run_job", job["id"]))
fr = final if silent_marker_in is None else silent_marker_in
return (success, output, fr, error)
def fake_save(jid, out):
calls.append(("save", jid))
return f"/tmp/{jid}.txt"
def fake_deliver(job, content, adapters=None, loop=None):
calls.append(("deliver", job["id"]))
return None
def fake_mark(jid, ok, err=None, delivery_error=None):
calls.append(("mark", jid, ok))
monkeypatch.setattr(s, "run_job", fake_run_job)
monkeypatch.setattr(s, "save_job_output", fake_save)
monkeypatch.setattr(s, "_deliver_result", fake_deliver)
monkeypatch.setattr(s, "mark_job_run", fake_mark)
return calls
def test_tick_process_job_sequence(monkeypatch):
"""Characterization: a single due job driven through tick() runs the
sequence run_job save deliver mark, in that order."""
calls = _patch_pipeline(monkeypatch)
monkeypatch.setattr(s, "get_due_jobs", lambda: [{"id": "j1", "name": "t"}])
monkeypatch.setattr(s, "advance_next_run", lambda jid: True)
s.tick(verbose=False, sync=True)
assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"]
assert calls[-1] == ("mark", "j1", True)
def test_run_one_job_success_sequence(monkeypatch):
"""The extracted helper runs the same execute→save→deliver→mark sequence
for a successful job."""
calls = _patch_pipeline(monkeypatch)
ok = s.run_one_job({"id": "j2", "name": "t"})
assert ok is True
assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"]
assert calls[-1] == ("mark", "j2", True)
def test_run_one_job_silent_skips_delivery(monkeypatch):
"""A [SILENT] final response saves output + marks the run but does NOT
deliver."""
calls = _patch_pipeline(monkeypatch, silent_marker_in="[SILENT]")
s.run_one_job({"id": "j3", "name": "t"})
kinds = [c[0] for c in calls]
assert "run_job" in kinds and "save" in kinds and "mark" in kinds
assert "deliver" not in kinds
def test_run_one_job_empty_response_is_soft_failure(monkeypatch):
"""An empty final response marks the run as NOT ok (issue #8585)."""
calls = _patch_pipeline(monkeypatch, final=" ")
s.run_one_job({"id": "j4", "name": "t"})
mark = [c for c in calls if c[0] == "mark"][0]
assert mark == ("mark", "j4", False)
def test_run_one_job_failed_job_delivers_error(monkeypatch):
"""A failed job still delivers (the error notice) and marks not-ok."""
calls = _patch_pipeline(monkeypatch, success=False, final="", error="boom")
s.run_one_job({"id": "j5", "name": "t"})
kinds = [c[0] for c in calls]
assert "deliver" in kinds # failures always deliver
mark = [c for c in calls if c[0] == "mark"][0]
assert mark == ("mark", "j5", False)
def test_run_one_job_exception_marks_failure(monkeypatch):
"""If run_job raises, the helper marks the run failed and returns False
rather than propagating."""
def boom(job):
raise RuntimeError("kaboom")
monkeypatch.setattr(s, "run_job", boom)
marks = []
monkeypatch.setattr(
s, "mark_job_run",
lambda jid, ok, err=None, delivery_error=None: marks.append((jid, ok)),
)
ok = s.run_one_job({"id": "j6", "name": "t"})
assert ok is False
assert marks == [("j6", False)]