diff --git a/tests/hermes_cli/test_active_sessions.py b/tests/hermes_cli/test_active_sessions.py index dda461d686b..560803dc852 100644 --- a/tests/hermes_cli/test_active_sessions.py +++ b/tests/hermes_cli/test_active_sessions.py @@ -235,6 +235,8 @@ def test_cross_process_acquire_claims_only_one_last_slot(tmp_path, monkeypatch): repo_root = Path(__file__).resolve().parents[2] ready_dir = tmp_path / "ready" ready_dir.mkdir() + results_dir = tmp_path / "results" + results_dir.mkdir() go_file = tmp_path / "go" env = os.environ.copy() env["HERMES_HOME"] = str(home) @@ -244,7 +246,10 @@ def test_cross_process_acquire_claims_only_one_last_slot(tmp_path, monkeypatch): "from pathlib import Path\n" "from hermes_cli.active_sessions import try_acquire_active_session\n" "idx = os.environ['WORKER_INDEX']\n" + "worker_count = int(os.environ['WORKER_COUNT'])\n" + "delayed_worker = os.environ.get('DELAYED_WORKER_INDEX')\n" "ready_dir = Path(os.environ['READY_DIR'])\n" + "results_dir = Path(os.environ['RESULTS_DIR'])\n" "go_file = Path(os.environ['GO_FILE'])\n" "(ready_dir / idx).write_text('ready', encoding='utf-8')\n" "deadline = time.time() + 10\n" @@ -252,16 +257,24 @@ def test_cross_process_acquire_claims_only_one_last_slot(tmp_path, monkeypatch): " if time.time() > deadline:\n" " raise RuntimeError('timed out waiting for go file')\n" " time.sleep(0.01)\n" + "if idx == delayed_worker:\n" + " time.sleep(2.5)\n" "lease, message = try_acquire_active_session(\n" " session_id=f'process-{idx}',\n" " surface='cli',\n" " config={'max_concurrent_sessions': 1},\n" ")\n" "if lease is None:\n" + " (results_dir / idx).write_text('BLOCK', encoding='utf-8')\n" " print('BLOCK', flush=True)\n" "else:\n" + " (results_dir / idx).write_text('OK', encoding='utf-8')\n" " print('OK', flush=True)\n" - " time.sleep(2.0)\n" + " deadline = time.time() + 10\n" + " while len(list(results_dir.iterdir())) < worker_count:\n" + " if time.time() > deadline:\n" + " raise RuntimeError('timed out waiting for all workers to attempt acquire')\n" + " time.sleep(0.01)\n" " lease.release()\n" ) workers: list[subprocess.Popen[str]] = [] @@ -269,7 +282,10 @@ def test_cross_process_acquire_claims_only_one_last_slot(tmp_path, monkeypatch): for index in range(6): worker_env = env.copy() worker_env["WORKER_INDEX"] = str(index) + worker_env["WORKER_COUNT"] = "6" + worker_env["DELAYED_WORKER_INDEX"] = "5" worker_env["READY_DIR"] = str(ready_dir) + worker_env["RESULTS_DIR"] = str(results_dir) worker_env["GO_FILE"] = str(go_file) workers.append( subprocess.Popen(