feat(plugins): add thread-local tool whitelist to pre_tool_call gate

Adds set_thread_tool_whitelist / clear_thread_tool_whitelist to
hermes_cli/plugins.py. When set on the current thread, restricts which
tools can pass through get_pre_tool_call_block_message; non-whitelisted
tools are blocked with a configurable deny message.

Mirrors the per-thread approval-callback pattern already used by
set_approval_callback (tools/terminal_tool.py:190). Used by
_spawn_background_review to deny non-memory/non-skill tools at runtime
while inheriting the parent agent's full tools schema for prefix-cache
parity (see follow-up commit).

Tests cover allow / deny / clear / cross-thread isolation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
WorldWriter 2026-04-29 13:04:56 +08:00 committed by Teknium
parent d898e0eb7f
commit 3a30c605b3
2 changed files with 109 additions and 0 deletions

View file

@ -1339,6 +1339,21 @@ def invoke_hook(hook_name: str, **kwargs: Any) -> List[Any]:
_thread_tool_whitelist = threading.local()
def set_thread_tool_whitelist(
allowed: Optional[Set[str]],
deny_msg_fmt: str = "Tool '{tool_name}' denied: not in this thread's tool whitelist",
) -> None:
_thread_tool_whitelist.allowed = allowed
_thread_tool_whitelist.fmt = deny_msg_fmt
def clear_thread_tool_whitelist() -> None:
_thread_tool_whitelist.allowed = None
def get_pre_tool_call_block_message(
tool_name: str,
args: Optional[Dict[str, Any]],
@ -1357,6 +1372,11 @@ def get_pre_tool_call_block_message(
directive wins. Invalid or irrelevant hook return values are
silently ignored so existing observer-only hooks are unaffected.
"""
allowed = getattr(_thread_tool_whitelist, "allowed", None)
if allowed is not None and tool_name not in allowed:
fmt = getattr(_thread_tool_whitelist, "fmt", "Tool '{tool_name}' denied")
return fmt.format(tool_name=tool_name)
hook_results = invoke_hook(
"pre_tool_call",
tool_name=tool_name,

View file

@ -538,6 +538,95 @@ class TestPreToolCallBlocking:
assert get_pre_tool_call_block_message("terminal", {}) == "first blocker"
class TestThreadToolWhitelist:
"""Tests for the thread-local tool whitelist used by background review forks."""
def test_allowed_tool_passes_through_to_hooks(self, monkeypatch):
from hermes_cli.plugins import (
set_thread_tool_whitelist,
clear_thread_tool_whitelist,
)
monkeypatch.setattr(
"hermes_cli.plugins.invoke_hook",
lambda hook_name, **kwargs: [],
)
set_thread_tool_whitelist({"memory", "skill_manage"})
try:
assert get_pre_tool_call_block_message("memory", {}) is None
finally:
clear_thread_tool_whitelist()
def test_disallowed_tool_blocked_with_message(self, monkeypatch):
from hermes_cli.plugins import (
set_thread_tool_whitelist,
clear_thread_tool_whitelist,
)
monkeypatch.setattr(
"hermes_cli.plugins.invoke_hook",
lambda hook_name, **kwargs: [],
)
set_thread_tool_whitelist(
{"memory"}, deny_msg_fmt="denied: {tool_name}"
)
try:
msg = get_pre_tool_call_block_message("terminal", {})
assert msg == "denied: terminal"
finally:
clear_thread_tool_whitelist()
def test_clear_restores_unrestricted_behavior(self, monkeypatch):
from hermes_cli.plugins import (
set_thread_tool_whitelist,
clear_thread_tool_whitelist,
)
monkeypatch.setattr(
"hermes_cli.plugins.invoke_hook",
lambda hook_name, **kwargs: [],
)
set_thread_tool_whitelist({"memory"})
clear_thread_tool_whitelist()
# After clearing, any tool should pass through to plugin hooks (which
# return [] here, so result is None).
assert get_pre_tool_call_block_message("terminal", {}) is None
def test_whitelist_is_thread_local(self, monkeypatch):
"""Setting a whitelist in one thread must NOT leak into another."""
import threading
from hermes_cli.plugins import (
set_thread_tool_whitelist,
clear_thread_tool_whitelist,
)
monkeypatch.setattr(
"hermes_cli.plugins.invoke_hook",
lambda hook_name, **kwargs: [],
)
# Main thread: install a restrictive whitelist.
set_thread_tool_whitelist({"memory"})
try:
assert get_pre_tool_call_block_message("terminal", {}) is not None
# Worker thread: should NOT inherit main thread's whitelist.
result = {}
def worker():
result["msg"] = get_pre_tool_call_block_message("terminal", {})
t = threading.Thread(target=worker)
t.start()
t.join()
assert result["msg"] is None, (
"thread-local whitelist leaked across threads"
)
finally:
clear_thread_tool_whitelist()
# ── TestPluginContext ──────────────────────────────────────────────────────