diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index cdcdf25a75d..00b03e9b2bf 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -21,6 +21,24 @@ import time from unittest.mock import patch +def _wait_until(predicate, timeout=10.0, interval=0.005): + """Block until ``predicate()`` is truthy or ``timeout`` elapses. + + Returns the predicate's final value. Used instead of a fixed + ``time.sleep`` before asserting that a background ticker thread has called + tick()/heartbeat() at least N times — under loaded CI the worker thread may + not be scheduled within a short fixed sleep, which made these tests flake + (``assert 0 >= 1`` / ``provider never called tick()``). + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + value = predicate() + if value: + return value + time.sleep(interval) + return predicate() + + def test_ticker_calls_tick_at_least_once_then_stops(): """The gateway in-process ticker loop calls cron.scheduler.tick repeatedly and exits promptly once the stop_event is set.""" @@ -34,7 +52,7 @@ def test_ticker_calls_tick_at_least_once_then_stops(): return 0 with patch("cron.scheduler.tick", side_effect=fake_tick): - # interval=0 keeps the loop tight; stop after a brief beat. + # interval=0 keeps the loop tight; stop after the first observed tick. t = threading.Thread( target=_start_cron_ticker, args=(stop,), @@ -42,7 +60,7 @@ def test_ticker_calls_tick_at_least_once_then_stops(): daemon=True, ) t.start() - time.sleep(0.2) + assert _wait_until(lambda: len(calls) >= 1), "ticker never called tick()" stop.set() t.join(timeout=5) @@ -74,7 +92,7 @@ def test_desktop_ticker_calls_tick_then_stops(): daemon=True, ) t.start() - time.sleep(0.2) + assert _wait_until(lambda: len(calls) >= 1), "desktop ticker never called tick()" stop.set() t.join(timeout=5) @@ -144,7 +162,10 @@ def test_inprocess_provider_ticks_and_stops(): target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True ) t.start() - time.sleep(0.2) + # Wait for the loop to actually call tick() at least once rather than + # sleeping a fixed window — under loaded CI the worker thread may not be + # scheduled within a short sleep, which made this flake (assert 0 >= 1). + assert _wait_until(lambda: len(calls) >= 1), "provider never called tick()" stop.set() t.join(timeout=5) @@ -378,7 +399,9 @@ def test_ticker_survives_baseexception_from_tick(): patch("cron.jobs.record_ticker_heartbeat"): t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True) t.start() - time.sleep(0.2) + # Survive the BaseException AND keep ticking: wait for ≥2 calls. + assert _wait_until(lambda: len(calls) >= 2), \ + "ticker did not keep ticking after the BaseException" stop.set() t.join(timeout=5) @@ -399,7 +422,10 @@ def test_ticker_records_heartbeat_each_iteration(): side_effect=lambda success=False: beats.append(success)): t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True) t.start() - time.sleep(0.2) + # Wait for the pre-loop liveness beat AND at least one successful + # post-tick beat before stopping (was a fixed 0.2s sleep → flaky). + assert _wait_until(lambda: any(b is True for b in beats[1:])), \ + "successful tick did not bump success marker" stop.set() t.join(timeout=5) @@ -422,7 +448,9 @@ def test_failing_tick_records_liveness_but_not_success(): side_effect=lambda success=False: beats.append(success)): t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True) t.start() - time.sleep(0.2) + # Wait for the pre-loop beat + at least one post-tick beat (was flaky + # with a fixed 0.2s sleep under loaded CI). + assert _wait_until(lambda: len(beats) >= 2), "ticker did not record heartbeats" stop.set() t.join(timeout=5)