From 58b19a4f6988f2fda2cddb5c620628afce750a36 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:26:29 +1000 Subject: [PATCH] refactor(cron): extract run_one_job shared firing helper from tick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4A. Factor tick's per-job closure (_process_job: execute → save → deliver → mark) into a module-level run_one_job(job, *, adapters, loop, verbose) so the external Chronos provider's fire_due (Phase 4D) reuses the IDENTICAL body — no duplicated correctness. tick's _process_job is now a thin wrapper calling run_one_job; the pool/in-flight-guard/contextvars dispatch logic is unchanged. run_one_job fires ONE given job; it does NOT decide due-ness, claim, or compute next_run (tick advances next_run_at under the file lock; an external provider claims via the store CAS in Phase 4C). Pure refactor, no behavior change. TDD: test_run_one_job.py characterizes the sequence through tick() first (test_tick_process_job_sequence, passed pre-extraction), then unit-tests the helper directly: success sequence, [SILENT]→skip delivery, empty-response soft failure (#8585), failed-job-still-delivers, exception→mark-failed. Verified: tests/cron/ 459 passed (was 453 + 6 new); tick behavior unchanged. --- cron/scheduler.py | 105 +++++++++++++++++------------ tests/cron/test_run_one_job.py | 119 +++++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+), 42 deletions(-) create mode 100644 tests/cron/test_run_one_job.py diff --git a/cron/scheduler.py b/cron/scheduler.py index 35906996619..9bab59456ea 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -1967,6 +1967,64 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) +def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -> bool: + """Run ONE due job end-to-end: execute → save output → deliver → mark. + + This is the shared firing body extracted from ``tick``'s per-job closure so + that BOTH the built-in ticker and an external provider's ``fire_due`` (e.g. + Chronos) run the identical sequence — no duplicated correctness. + + It does NOT decide whether the job is due, claim it, or compute the next + run — those are the caller's concern (``tick`` advances ``next_run_at`` + under the file lock before dispatch; an external provider claims via the + store CAS). This function only fires the given job once. + + Returns True if the job was processed (even if the job itself failed — + failure is recorded via ``mark_job_run``), False only if processing raised. + """ + try: + success, output, final_response, error = run_job(job) + + output_file = save_job_output(job["id"], output) + if verbose: + logger.info("Output saved to: %s", output_file) + + # Deliver the final response to the origin/target chat. + # If the agent responded with [SILENT], skip delivery (but + # output is already saved above). Failed jobs always deliver. + deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" + # Treat whitespace-only final responses the same as empty + # responses: do not deliver a blank message, and let the + # empty-response guard below mark the run as a soft failure. + should_deliver = bool(deliver_content.strip()) + if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): + logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) + should_deliver = False + + delivery_error = None + if should_deliver: + try: + delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) + except Exception as de: + delivery_error = str(de) + logger.error("Delivery failed for job %s: %s", job["id"], de) + + # Treat empty final_response as a soft failure so last_status + # is not "ok" — the agent ran but produced nothing useful. + # (issue #8585) + if success and not final_response.strip(): + success = False + error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" + + mark_job_run(job["id"], success, error, delivery_error=delivery_error) + return True + + except Exception as e: + logger.error("Error processing job %s: %s", job['id'], e) + mark_job_run(job["id"], False, str(e)) + return False + + def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int: """ Check and run all due jobs. @@ -2045,48 +2103,11 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i ) def _process_job(job: dict) -> bool: - """Run one due job end-to-end: execute, save, deliver, mark.""" - try: - success, output, final_response, error = run_job(job) - - output_file = save_job_output(job["id"], output) - if verbose: - logger.info("Output saved to: %s", output_file) - - # Deliver the final response to the origin/target chat. - # If the agent responded with [SILENT], skip delivery (but - # output is already saved above). Failed jobs always deliver. - deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" - # Treat whitespace-only final responses the same as empty - # responses: do not deliver a blank message, and let the - # empty-response guard below mark the run as a soft failure. - should_deliver = bool(deliver_content.strip()) - if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): - logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) - should_deliver = False - - delivery_error = None - if should_deliver: - try: - delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) - except Exception as de: - delivery_error = str(de) - logger.error("Delivery failed for job %s: %s", job["id"], de) - - # Treat empty final_response as a soft failure so last_status - # is not "ok" — the agent ran but produced nothing useful. - # (issue #8585) - if success and not final_response.strip(): - success = False - error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" - - mark_job_run(job["id"], success, error, delivery_error=delivery_error) - return True - - except Exception as e: - logger.error("Error processing job %s: %s", job['id'], e) - mark_job_run(job["id"], False, str(e)) - return False + """Run one due job end-to-end. Thin wrapper around the shared + module-level ``run_one_job`` so ``tick`` and external providers + (Chronos ``fire_due``) use the identical execute→save→deliver→mark + body.""" + return run_one_job(job, adapters=adapters, loop=loop, verbose=verbose) # Partition due jobs: those with a per-job workdir mutate # os.environ["TERMINAL_CWD"] inside run_job, which is process-global — diff --git a/tests/cron/test_run_one_job.py b/tests/cron/test_run_one_job.py new file mode 100644 index 00000000000..7da6b1c14f4 --- /dev/null +++ b/tests/cron/test_run_one_job.py @@ -0,0 +1,119 @@ +"""Characterization + unit tests for the `run_one_job` shared helper (Phase 4A). + +`tick`'s per-job body (`_process_job`) is the execute → save → deliver → mark +sequence that fires ONE due job. Phase 4A extracts it into a module-level +`run_one_job(job, *, adapters=None, loop=None, verbose=False)` so the external +Chronos provider's `fire_due` can reuse the IDENTICAL body — no duplicated +correctness. + +The first test characterizes the sequence as driven through `tick()` (proving +the extraction didn't change `tick`'s behavior); the rest unit-test the +extracted helper directly. +""" +import cron.scheduler as s + + +def _patch_pipeline(monkeypatch, *, success=True, output="out", final="final response", + error=None, silent_marker_in=None): + """Patch the job pipeline primitives and record the call order.""" + calls = [] + + def fake_run_job(job): + calls.append(("run_job", job["id"])) + fr = final if silent_marker_in is None else silent_marker_in + return (success, output, fr, error) + + def fake_save(jid, out): + calls.append(("save", jid)) + return f"/tmp/{jid}.txt" + + def fake_deliver(job, content, adapters=None, loop=None): + calls.append(("deliver", job["id"])) + return None + + def fake_mark(jid, ok, err=None, delivery_error=None): + calls.append(("mark", jid, ok)) + + monkeypatch.setattr(s, "run_job", fake_run_job) + monkeypatch.setattr(s, "save_job_output", fake_save) + monkeypatch.setattr(s, "_deliver_result", fake_deliver) + monkeypatch.setattr(s, "mark_job_run", fake_mark) + return calls + + +def test_tick_process_job_sequence(monkeypatch): + """Characterization: a single due job driven through tick() runs the + sequence run_job → save → deliver → mark, in that order.""" + calls = _patch_pipeline(monkeypatch) + monkeypatch.setattr(s, "get_due_jobs", lambda: [{"id": "j1", "name": "t"}]) + monkeypatch.setattr(s, "advance_next_run", lambda jid: True) + + s.tick(verbose=False, sync=True) + + assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"] + assert calls[-1] == ("mark", "j1", True) + + +def test_run_one_job_success_sequence(monkeypatch): + """The extracted helper runs the same execute→save→deliver→mark sequence + for a successful job.""" + calls = _patch_pipeline(monkeypatch) + + ok = s.run_one_job({"id": "j2", "name": "t"}) + + assert ok is True + assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"] + assert calls[-1] == ("mark", "j2", True) + + +def test_run_one_job_silent_skips_delivery(monkeypatch): + """A [SILENT] final response saves output + marks the run but does NOT + deliver.""" + calls = _patch_pipeline(monkeypatch, silent_marker_in="[SILENT]") + + s.run_one_job({"id": "j3", "name": "t"}) + + kinds = [c[0] for c in calls] + assert "run_job" in kinds and "save" in kinds and "mark" in kinds + assert "deliver" not in kinds + + +def test_run_one_job_empty_response_is_soft_failure(monkeypatch): + """An empty final response marks the run as NOT ok (issue #8585).""" + calls = _patch_pipeline(monkeypatch, final=" ") + + s.run_one_job({"id": "j4", "name": "t"}) + + mark = [c for c in calls if c[0] == "mark"][0] + assert mark == ("mark", "j4", False) + + +def test_run_one_job_failed_job_delivers_error(monkeypatch): + """A failed job still delivers (the error notice) and marks not-ok.""" + calls = _patch_pipeline(monkeypatch, success=False, final="", error="boom") + + s.run_one_job({"id": "j5", "name": "t"}) + + kinds = [c[0] for c in calls] + assert "deliver" in kinds # failures always deliver + mark = [c for c in calls if c[0] == "mark"][0] + assert mark == ("mark", "j5", False) + + +def test_run_one_job_exception_marks_failure(monkeypatch): + """If run_job raises, the helper marks the run failed and returns False + rather than propagating.""" + def boom(job): + raise RuntimeError("kaboom") + + monkeypatch.setattr(s, "run_job", boom) + marks = [] + monkeypatch.setattr( + s, "mark_job_run", + lambda jid, ok, err=None, delivery_error=None: marks.append((jid, ok)), + ) + + ok = s.run_one_job({"id": "j6", "name": "t"}) + + assert ok is False + assert marks == [("j6", False)]