From a44e041acf39f6f4b3a6760d527171db8c68cc5c Mon Sep 17 00:00:00 2001 From: teknium1 Date: Thu, 5 Mar 2026 18:39:37 -0800 Subject: [PATCH] test: strengthen assertions across 7 test files (batch 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced weak 'is not None' / '> 0' / 'len >= 1' assertions with concrete value checks across the most flagged test files: gateway/test_pairing.py (11 weak → 0): - Code assertions verify isinstance + len == CODE_LENGTH - Approval results verify dict structure + specific user_id/user_name - Added code2 != code1 check in rate_limit_expires test_hermes_state.py (6 weak → 0): - ended_at verified as float timestamp - Search result counts exact (== 2, not >= 1) - Context verified as non-empty list - Export verified as dict, session ID verified test_cli_init.py (4 weak → 0): - max_turns asserts exact value (60) - model asserts string with provider/name format gateway/test_hooks.py (2 zero-assert tests → fixed): - test_no_handlers_for_event: verifies no handler registered - test_handler_error_does_not_propagate: verifies handler count + return gateway/test_platform_base.py (9 weak image tests → fixed): - extract_images tests now verify actual URL and alt_text - truncate_message verifies content preservation after splitting cron/test_scheduler.py (1 weak → 0): - resolve_origin verifies dict equality, not just existence cron/test_jobs.py (2 weak → 0 + 4 new tests): - Schedule parsing verifies ISO timestamp type - Cron expression verifies result is valid datetime string - NEW: 4 tests for update_job() (was completely untested) --- tests/cron/test_jobs.py | 53 +++++++++++++++++++++++++++-- tests/cron/test_scheduler.py | 4 ++- tests/gateway/test_hooks.py | 10 ++++-- tests/gateway/test_pairing.py | 31 ++++++++++------- tests/gateway/test_platform_base.py | 23 +++++++++++++ tests/test_cli_init.py | 8 ++--- tests/test_hermes_state.py | 14 +++++--- 7 files changed, 115 insertions(+), 28 deletions(-) diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index 13e9c6998..0a3623662 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -15,6 +15,7 @@ from cron.jobs import ( save_jobs, get_job, list_jobs, + update_job, remove_job, mark_job_run, get_due_jobs, @@ -70,8 +71,10 @@ class TestParseSchedule: result = parse_schedule("30m") assert result["kind"] == "once" assert "run_at" in result - # run_at should be ~30 minutes from now - run_at = datetime.fromisoformat(result["run_at"]) + # run_at should be a valid ISO timestamp string ~30 minutes from now + run_at_str = result["run_at"] + assert isinstance(run_at_str, str) + run_at = datetime.fromisoformat(run_at_str) assert run_at > datetime.now() assert run_at < datetime.now() + timedelta(minutes=31) @@ -140,8 +143,10 @@ class TestComputeNextRun: pytest.importorskip("croniter") schedule = {"kind": "cron", "expr": "* * * * *"} # every minute result = compute_next_run(schedule) - assert result is not None + assert isinstance(result, str), f"Expected ISO timestamp string, got {type(result)}" + assert len(result) > 0 next_dt = datetime.fromisoformat(result) + assert isinstance(next_dt, datetime) assert next_dt > datetime.now() def test_unknown_kind_returns_none(self): @@ -207,6 +212,48 @@ class TestJobCRUD: assert job["deliver"] == "local" +class TestUpdateJob: + def test_update_name(self, tmp_cron_dir): + job = create_job(prompt="Check server status", schedule="every 1h", name="Old Name") + assert job["name"] == "Old Name" + updated = update_job(job["id"], {"name": "New Name"}) + assert updated is not None + assert isinstance(updated, dict) + assert updated["name"] == "New Name" + # Verify other fields are preserved + assert updated["prompt"] == "Check server status" + assert updated["id"] == job["id"] + assert updated["schedule"] == job["schedule"] + # Verify persisted to disk + fetched = get_job(job["id"]) + assert fetched["name"] == "New Name" + + def test_update_schedule(self, tmp_cron_dir): + job = create_job(prompt="Daily report", schedule="every 1h") + assert job["schedule"]["kind"] == "interval" + assert job["schedule"]["minutes"] == 60 + new_schedule = parse_schedule("every 2h") + updated = update_job(job["id"], {"schedule": new_schedule}) + assert updated is not None + assert updated["schedule"]["kind"] == "interval" + assert updated["schedule"]["minutes"] == 120 + # Verify persisted to disk + fetched = get_job(job["id"]) + assert fetched["schedule"]["minutes"] == 120 + + def test_update_enable_disable(self, tmp_cron_dir): + job = create_job(prompt="Toggle me", schedule="every 1h") + assert job["enabled"] is True + updated = update_job(job["id"], {"enabled": False}) + assert updated["enabled"] is False + fetched = get_job(job["id"]) + assert fetched["enabled"] is False + + def test_update_nonexistent_returns_none(self, tmp_cron_dir): + result = update_job("nonexistent_id", {"name": "X"}) + assert result is None + + class TestMarkJobRun: def test_increments_completed(self, tmp_cron_dir): job = create_job(prompt="Test", schedule="every 1h") diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 3c22893a5..33096c49b 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -15,9 +15,11 @@ class TestResolveOrigin: } } result = _resolve_origin(job) - assert result is not None + assert isinstance(result, dict) + assert result == job["origin"] assert result["platform"] == "telegram" assert result["chat_id"] == "123456" + assert result["chat_name"] == "Test Chat" def test_no_origin(self): assert _resolve_origin({}) is None diff --git a/tests/gateway/test_hooks.py b/tests/gateway/test_hooks.py index 3c0113553..039ce6b2e 100644 --- a/tests/gateway/test_hooks.py +++ b/tests/gateway/test_hooks.py @@ -177,8 +177,10 @@ class TestEmit: @pytest.mark.asyncio async def test_no_handlers_for_event(self, tmp_path): reg = HookRegistry() - # Should not raise - await reg.emit("unknown:event", {}) + # Should not raise and should have no handlers registered + result = await reg.emit("unknown:event", {}) + assert result is None + assert not reg._handlers.get("unknown:event") @pytest.mark.asyncio async def test_handler_error_does_not_propagate(self, tmp_path): @@ -190,8 +192,10 @@ class TestEmit: with patch("gateway.hooks.HOOKS_DIR", tmp_path): reg.discover_and_load() + assert len(reg._handlers.get("agent:start", [])) == 1 # Should not raise even though handler throws - await reg.emit("agent:start", {}) + result = await reg.emit("agent:start", {}) + assert result is None @pytest.mark.asyncio async def test_emit_default_context(self, tmp_path): diff --git a/tests/gateway/test_pairing.py b/tests/gateway/test_pairing.py index e9e2e6f2e..da14e2526 100644 --- a/tests/gateway/test_pairing.py +++ b/tests/gateway/test_pairing.py @@ -54,7 +54,7 @@ class TestCodeGeneration: with patch("gateway.pairing.PAIRING_DIR", tmp_path): store = PairingStore() code = store.generate_code("telegram", "user1", "Alice") - assert code is not None + assert isinstance(code, str) and len(code) == CODE_LENGTH assert len(code) == CODE_LENGTH assert all(c in ALPHABET for c in code) @@ -65,7 +65,7 @@ class TestCodeGeneration: codes = set() for i in range(3): code = store.generate_code("telegram", f"user{i}") - assert code is not None + assert isinstance(code, str) and len(code) == CODE_LENGTH codes.add(code) assert len(codes) == 3 @@ -91,7 +91,7 @@ class TestRateLimiting: store = PairingStore() code1 = store.generate_code("telegram", "user1") code2 = store.generate_code("telegram", "user1") - assert code1 is not None + assert isinstance(code1, str) and len(code1) == CODE_LENGTH assert code2 is None # rate limited def test_different_users_not_rate_limited(self, tmp_path): @@ -99,14 +99,14 @@ class TestRateLimiting: store = PairingStore() code1 = store.generate_code("telegram", "user1") code2 = store.generate_code("telegram", "user2") - assert code1 is not None - assert code2 is not None + assert isinstance(code1, str) and len(code1) == CODE_LENGTH + assert isinstance(code2, str) and len(code2) == CODE_LENGTH def test_rate_limit_expires(self, tmp_path): with patch("gateway.pairing.PAIRING_DIR", tmp_path): store = PairingStore() code1 = store.generate_code("telegram", "user1") - assert code1 is not None + assert isinstance(code1, str) and len(code1) == CODE_LENGTH # Simulate rate limit expiry limits = store._load_json(store._rate_limit_path()) @@ -114,7 +114,8 @@ class TestRateLimiting: store._save_json(store._rate_limit_path(), limits) code2 = store.generate_code("telegram", "user1") - assert code2 is not None + assert isinstance(code2, str) and len(code2) == CODE_LENGTH + assert code2 != code1 # --------------------------------------------------------------------------- @@ -132,7 +133,7 @@ class TestMaxPending: codes.append(code) # First MAX_PENDING_PER_PLATFORM should succeed - assert all(c is not None for c in codes[:MAX_PENDING_PER_PLATFORM]) + assert all(isinstance(c, str) and len(c) == CODE_LENGTH for c in codes[:MAX_PENDING_PER_PLATFORM]) # Next one should be blocked assert codes[MAX_PENDING_PER_PLATFORM] is None @@ -143,7 +144,7 @@ class TestMaxPending: store.generate_code("telegram", f"user{i}") # Different platform should still work code = store.generate_code("discord", "user0") - assert code is not None + assert isinstance(code, str) and len(code) == CODE_LENGTH # --------------------------------------------------------------------------- @@ -158,7 +159,9 @@ class TestApprovalFlow: code = store.generate_code("telegram", "user1", "Alice") result = store.approve_code("telegram", code) - assert result is not None + assert isinstance(result, dict) + assert "user_id" in result + assert "user_name" in result assert result["user_id"] == "user1" assert result["user_name"] == "Alice" @@ -187,14 +190,18 @@ class TestApprovalFlow: store = PairingStore() code = store.generate_code("telegram", "user1", "Alice") result = store.approve_code("telegram", code.lower()) - assert result is not None + assert isinstance(result, dict) + assert result["user_id"] == "user1" + assert result["user_name"] == "Alice" def test_approve_strips_whitespace(self, tmp_path): with patch("gateway.pairing.PAIRING_DIR", tmp_path): store = PairingStore() code = store.generate_code("telegram", "user1", "Alice") result = store.approve_code("telegram", f" {code} ") - assert result is not None + assert isinstance(result, dict) + assert result["user_id"] == "user1" + assert result["user_name"] == "Alice" def test_invalid_code_returns_none(self, tmp_path): with patch("gateway.pairing.PAIRING_DIR", tmp_path): diff --git a/tests/gateway/test_platform_base.py b/tests/gateway/test_platform_base.py index b6745316e..145b6576f 100644 --- a/tests/gateway/test_platform_base.py +++ b/tests/gateway/test_platform_base.py @@ -92,36 +92,50 @@ class TestExtractImages: content = "![photo](https://example.com/photo.jpg)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://example.com/photo.jpg" + assert images[0][1] == "photo" def test_markdown_image_jpeg(self): content = "![](https://example.com/photo.jpeg)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://example.com/photo.jpeg" + assert images[0][1] == "" def test_markdown_image_gif(self): content = "![anim](https://example.com/anim.gif)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://example.com/anim.gif" + assert images[0][1] == "anim" def test_markdown_image_webp(self): content = "![](https://example.com/img.webp)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://example.com/img.webp" + assert images[0][1] == "" def test_fal_media_cdn(self): content = "![gen](https://fal.media/files/abc123/output.png)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://fal.media/files/abc123/output.png" + assert images[0][1] == "gen" def test_fal_cdn_url(self): content = "![](https://fal-cdn.example.com/result)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://fal-cdn.example.com/result" + assert images[0][1] == "" def test_replicate_delivery(self): content = "![](https://replicate.delivery/pbxt/abc/output)" images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://replicate.delivery/pbxt/abc/output" + assert images[0][1] == "" def test_non_image_ext_not_extracted(self): """Markdown image with non-image extension should not be extracted.""" @@ -142,11 +156,15 @@ class TestExtractImages: content = '' images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://example.com/photo.png" + assert images[0][1] == "" def test_html_img_with_closing_tag(self): content = '' images, _ = BasePlatformAdapter.extract_images(content) assert len(images) == 1 + assert images[0][0] == "https://example.com/photo.png" + assert images[0][1] == "" def test_multiple_images(self): content = "![a](https://example.com/a.png)\n![b](https://example.com/b.jpg)" @@ -267,6 +285,11 @@ class TestTruncateMessage: msg = "word " * 200 # ~1000 chars chunks = adapter.truncate_message(msg, max_length=200) assert len(chunks) > 1 + # Verify all original content is preserved across chunks + reassembled = "".join(chunks) + # Strip chunk indicators like (1/N) to get raw content + for word in msg.strip().split(): + assert word in reassembled, f"Word '{word}' lost during truncation" def test_chunks_have_indicators(self): adapter = self._adapter() diff --git a/tests/test_cli_init.py b/tests/test_cli_init.py index c868d85b2..5fe7d477f 100644 --- a/tests/test_cli_init.py +++ b/tests/test_cli_init.py @@ -23,7 +23,7 @@ class TestMaxTurnsResolution: def test_default_max_turns_is_integer(self): cli = _make_cli() assert isinstance(cli.max_turns, int) - assert cli.max_turns > 0 + assert cli.max_turns == 60 def test_explicit_max_turns_honored(self): cli = _make_cli(max_turns=25) @@ -32,7 +32,7 @@ class TestMaxTurnsResolution: def test_none_max_turns_gets_default(self): cli = _make_cli(max_turns=None) assert isinstance(cli.max_turns, int) - assert cli.max_turns > 0 + assert cli.max_turns == 60 def test_env_var_max_turns(self, monkeypatch): """Env var is used when config file doesn't set max_turns.""" @@ -54,7 +54,7 @@ class TestMaxTurnsResolution: def test_max_turns_never_none_for_agent(self): """The value passed to AIAgent must never be None (causes TypeError in run_conversation).""" cli = _make_cli() - assert cli.max_turns is not None + assert isinstance(cli.max_turns, int) and cli.max_turns == 60 class TestVerboseAndToolProgress: @@ -81,4 +81,4 @@ class TestProviderResolution: def test_model_is_string(self): cli = _make_cli() assert isinstance(cli.model, str) - assert len(cli.model) > 0 + assert isinstance(cli.model, str) and '/' in cli.model diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index b82ff4d61..734db494f 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -43,7 +43,7 @@ class TestSessionLifecycle: db.end_session("s1", end_reason="user_exit") session = db.get_session("s1") - assert session["ended_at"] is not None + assert isinstance(session["ended_at"], float) assert session["end_reason"] == "user_exit" def test_update_system_prompt(self, db): @@ -138,7 +138,7 @@ class TestFTS5Search: db.append_message("s1", role="assistant", content="Use docker compose up.") results = db.search_messages("docker") - assert len(results) >= 1 + assert len(results) == 2 # At least one result should mention docker snippets = [r.get("snippet", "") for r in results] assert any("docker" in s.lower() or "Docker" in s for s in snippets) @@ -174,8 +174,10 @@ class TestFTS5Search: db.append_message("s1", role="assistant", content="Kubernetes is an orchestrator.") results = db.search_messages("Kubernetes") - assert len(results) >= 1 + assert len(results) == 2 assert "context" in results[0] + assert isinstance(results[0]["context"], list) + assert len(results[0]["context"]) > 0 # ========================================================================= @@ -266,7 +268,7 @@ class TestDeleteAndExport: db.append_message("s1", role="assistant", content="Hi") export = db.export_session("s1") - assert export is not None + assert isinstance(export, dict) assert export["source"] == "cli" assert len(export["messages"]) == 2 @@ -312,7 +314,9 @@ class TestPruneSessions: pruned = db.prune_sessions(older_than_days=90) assert pruned == 1 assert db.get_session("old") is None - assert db.get_session("new") is not None + session = db.get_session("new") + assert session is not None + assert session["id"] == "new" def test_prune_skips_active_sessions(self, db): db.create_session(session_id="active", source="cli")