fix: update 6 test files broken by dead code removal

- test_percentage_clamp.py: remove TestContextCompressorUsagePercent class
  and test_context_compressor_clamped (tested removed get_status() method)
- test_credential_pool.py: remove test_mark_used_increments_request_count
  (tested removed mark_used()), replace active_lease_count() calls with
  direct _active_leases dict access, remove mark_used from thread test
- test_session.py: replace SessionSource.local_cli() factory calls with
  direct SessionSource construction (local_cli classmethod removed)
- test_error_classifier.py: remove test_is_transient_property (tested
  removed is_transient property on ClassifiedError)
- test_delivery.py: remove TestDeliveryRouter class (tested removed
  resolve_targets method), clean up unused imports
- test_skills_hub.py: remove test_is_hub_installed (tested removed
  is_hub_installed method on HubLockFile)
This commit is contained in:
Teknium 2026-04-10 03:07:47 -07:00 committed by Teknium
parent c6c769772f
commit 957485876b
6 changed files with 18 additions and 147 deletions

View file

@ -702,53 +702,6 @@ def test_least_used_strategy_selects_lowest_count(tmp_path, monkeypatch):
assert entry.access_token == "sk-or-light"
def test_mark_used_increments_request_count(tmp_path, monkeypatch):
"""mark_used should increment the request_count of the current entry."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setattr(
"agent.credential_pool.get_pool_strategy",
lambda _provider: "fill_first",
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_singletons",
lambda provider, entries: (False, set()),
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_env",
lambda provider, entries: (False, set()),
)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "key-a",
"label": "test",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-or-test",
"request_count": 5,
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.request_count == 5
pool.mark_used()
updated = pool.current()
assert updated is not None
assert updated.request_count == 6
def test_thread_safety_concurrent_select(tmp_path, monkeypatch):
"""Concurrent select() calls should not corrupt pool state."""
import threading as _threading
@ -798,7 +751,6 @@ def test_thread_safety_concurrent_select(tmp_path, monkeypatch):
entry = pool.select()
if entry:
results.append(entry.id)
pool.mark_used(entry.id)
except Exception as exc:
errors.append(exc)
@ -1056,8 +1008,8 @@ def test_acquire_lease_prefers_unleased_entry(tmp_path, monkeypatch):
assert first == "cred-1"
assert second == "cred-2"
assert pool.active_lease_count("cred-1") == 1
assert pool.active_lease_count("cred-2") == 1
assert pool._active_leases.get("cred-1", 0) == 1
assert pool._active_leases.get("cred-2", 0) == 1
@ -1087,7 +1039,7 @@ def test_release_lease_decrements_counter(tmp_path, monkeypatch):
pool = load_pool("openrouter")
leased = pool.acquire_lease()
assert leased == "cred-1"
assert pool.active_lease_count("cred-1") == 1
assert pool._active_leases.get("cred-1", 0) == 1
pool.release_lease("cred-1")
assert pool.active_lease_count("cred-1") == 0
assert pool._active_leases.get("cred-1", 0) == 0

View file

@ -75,28 +75,6 @@ class TestClassifiedError:
e3 = ClassifiedError(reason=FailoverReason.billing)
assert e3.is_auth is False
def test_is_transient_property(self):
transient_reasons = [
FailoverReason.rate_limit,
FailoverReason.overloaded,
FailoverReason.server_error,
FailoverReason.timeout,
FailoverReason.unknown,
]
for reason in transient_reasons:
e = ClassifiedError(reason=reason)
assert e.is_transient is True, f"{reason} should be transient"
non_transient = [
FailoverReason.auth,
FailoverReason.billing,
FailoverReason.model_not_found,
FailoverReason.format_error,
]
for reason in non_transient:
e = ClassifiedError(reason=reason)
assert e.is_transient is False, f"{reason} should NOT be transient"
def test_defaults(self):
e = ClassifiedError(reason=FailoverReason.unknown)
assert e.retryable is True

View file

@ -1,7 +1,7 @@
"""Tests for the delivery routing module."""
from gateway.config import Platform, GatewayConfig, PlatformConfig, HomeChannel
from gateway.delivery import DeliveryRouter, DeliveryTarget
from gateway.config import Platform
from gateway.delivery import DeliveryTarget
from gateway.session import SessionSource
@ -65,10 +65,4 @@ class TestTargetToStringRoundtrip:
assert reparsed.chat_id == "999"
class TestDeliveryRouter:
def test_resolve_targets_does_not_duplicate_local_when_explicit(self):
router = DeliveryRouter(GatewayConfig(always_log_local=True))
targets = router.resolve_targets(["local"])
assert [target.platform for target in targets] == [Platform.LOCAL]

View file

@ -90,7 +90,10 @@ class TestSessionSourceRoundtrip:
class TestSessionSourceDescription:
def test_local_cli(self):
source = SessionSource.local_cli()
source = SessionSource(
platform=Platform.LOCAL, chat_id="cli",
chat_name="CLI terminal", chat_type="dm",
)
assert source.description == "CLI terminal"
def test_dm_with_username(self):
@ -143,7 +146,10 @@ class TestSessionSourceDescription:
class TestLocalCliFactory:
def test_local_cli_defaults(self):
source = SessionSource.local_cli()
source = SessionSource(
platform=Platform.LOCAL, chat_id="cli",
chat_name="CLI terminal", chat_type="dm",
)
assert source.platform == Platform.LOCAL
assert source.chat_id == "cli"
assert source.chat_type == "dm"
@ -267,7 +273,10 @@ class TestBuildSessionContextPrompt:
def test_local_prompt_mentions_machine(self):
config = GatewayConfig()
source = SessionSource.local_cli()
source = SessionSource(
platform=Platform.LOCAL, chat_id="cli",
chat_name="CLI terminal", chat_type="dm",
)
ctx = build_session_context(source, config)
prompt = build_session_context_prompt(ctx)

View file

@ -7,52 +7,6 @@ compression fires), users see >100% in /stats, gateway status, and
memory tool output.
"""
import pytest
class TestContextCompressorUsagePercent:
"""agent/context_compressor.py — get_status() usage_percent"""
def test_usage_percent_capped_at_100(self):
"""Tokens exceeding context_length should still show max 100%."""
from agent.context_compressor import ContextCompressor
comp = ContextCompressor.__new__(ContextCompressor)
comp.last_prompt_tokens = 210_000 # exceeds context_length
comp.context_length = 200_000
comp.threshold_tokens = 160_000
comp.compression_count = 0
status = comp.get_status()
assert status["usage_percent"] <= 100
def test_usage_percent_normal(self):
"""Normal usage should show correct percentage."""
from agent.context_compressor import ContextCompressor
comp = ContextCompressor.__new__(ContextCompressor)
comp.last_prompt_tokens = 100_000
comp.context_length = 200_000
comp.threshold_tokens = 160_000
comp.compression_count = 0
status = comp.get_status()
assert status["usage_percent"] == 50.0
def test_usage_percent_zero_context_length(self):
"""Zero context_length should return 0, not crash."""
from agent.context_compressor import ContextCompressor
comp = ContextCompressor.__new__(ContextCompressor)
comp.last_prompt_tokens = 1000
comp.context_length = 0
comp.threshold_tokens = 0
comp.compression_count = 0
status = comp.get_status()
assert status["usage_percent"] == 0
class TestMemoryToolPercentClamp:
"""tools/memory_tool.py — _success_response and _render_block pct"""
@ -126,12 +80,6 @@ class TestSourceLinesAreClamped:
with open(os.path.join(base, rel_path)) as f:
return f.read()
def test_context_compressor_clamped(self):
src = self._read_file("agent/context_compressor.py")
assert "min(100," in src, (
"context_compressor.py usage_percent is not clamped with min(100, ...)"
)
def test_gateway_run_clamped(self):
src = self._read_file("gateway/run.py")
# Check that the stats handler has min(100, ...)

View file

@ -854,16 +854,6 @@ class TestHubLockFile:
names = {e["name"] for e in installed}
assert names == {"s1", "s2"}
def test_is_hub_installed(self, tmp_path):
lock = HubLockFile(path=tmp_path / "lock.json")
lock.record_install(
name="my-skill", source="github", identifier="x",
trust_level="trusted", scan_verdict="pass",
skill_hash="h", install_path="my-skill", files=["SKILL.md"],
)
assert lock.is_hub_installed("my-skill") is True
assert lock.is_hub_installed("other") is False
# ---------------------------------------------------------------------------
# TapsManager