feat: add Feishu document comment intelligent reply with 3-tier access control

- Full comment handler: parse drive.notice.comment_add_v1 events, build
  timeline, run agent, deliver reply with chunking support.
- 5 tools: feishu_doc_read, feishu_drive_list_comments,
  feishu_drive_list_comment_replies, feishu_drive_reply_comment,
  feishu_drive_add_comment.
- 3-tier access control rules (exact doc > wildcard "*" > top-level >
  defaults) with per-field fallback. Config via
  ~/.hermes/feishu_comment_rules.json, mtime-cached hot-reload.
- Self-reply filter using generalized self_open_id (supports future
  user-identity subscriptions). Receiver check: only process events
  where the bot is the @mentioned target.
- Smart timeline selection, long text chunking, semantic text extraction,
  session sharing per document, wiki link resolution.

Change-Id: I31e82fd6355173dbcc400b8934b6d9799e3137b9
This commit is contained in:
liujinkun 2026-04-16 20:51:11 +08:00 committed by Teknium
parent 9b14b76eb3
commit 85cdb04bd4
9 changed files with 3059 additions and 0 deletions

View file

@ -1228,6 +1228,10 @@ class FeishuAdapter(BasePlatformAdapter):
.register_p2_im_chat_member_bot_deleted_v1(self._on_bot_removed_from_chat)
.register_p2_im_chat_access_event_bot_p2p_chat_entered_v1(self._on_p2p_chat_entered)
.register_p2_im_message_recalled_v1(self._on_message_recalled)
.register_p2_customized_event(
"drive.notice.comment_add_v1",
self._on_drive_comment_event,
)
.build()
)
@ -1965,6 +1969,25 @@ class FeishuAdapter(BasePlatformAdapter):
def _on_message_recalled(self, data: Any) -> None:
logger.debug("[Feishu] Message recalled by user")
def _on_drive_comment_event(self, data: Any) -> None:
"""Handle drive document comment notification (drive.notice.comment_add_v1).
Delegates to :mod:`gateway.platforms.feishu_comment` for parsing,
logging, and reaction. Scheduling follows the same
``run_coroutine_threadsafe`` pattern used by ``_on_message_event``.
"""
from gateway.platforms.feishu_comment import handle_drive_comment_event
loop = self._loop
if not self._loop_accepts_callbacks(loop):
logger.warning("[Feishu] Dropping drive comment event before adapter loop is ready")
return
future = asyncio.run_coroutine_threadsafe(
handle_drive_comment_event(self._client, data, self_open_id=self._bot_open_id),
loop,
)
future.add_done_callback(self._log_background_failure)
def _on_reaction_event(self, event_type: str, data: Any) -> None:
"""Route user reactions on bot messages as synthetic text events."""
event = getattr(data, "event", None)
@ -2590,6 +2613,8 @@ class FeishuAdapter(BasePlatformAdapter):
self._on_reaction_event(event_type, data)
elif event_type == "card.action.trigger":
self._on_card_action_trigger(data)
elif event_type == "drive.notice.comment_add_v1":
self._on_drive_comment_event(data)
else:
logger.debug("[Feishu] Ignoring webhook event type: %s", event_type or "unknown")
return web.json_response({"code": 0, "msg": "ok"})

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,424 @@
"""
Feishu document comment access-control rules.
3-tier rule resolution: exact doc > wildcard "*" > top-level > code defaults.
Each field (enabled/policy/allow_from) falls back independently.
Config: ~/.hermes/feishu_comment_rules.json (mtime-cached, hot-reload).
Pairing store: ~/.hermes/feishu_comment_pairing.json.
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
_HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")))
RULES_FILE = _HERMES_HOME / "feishu_comment_rules.json"
PAIRING_FILE = _HERMES_HOME / "feishu_comment_pairing.json"
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
_VALID_POLICIES = ("allowlist", "pairing")
@dataclass(frozen=True)
class CommentDocumentRule:
"""Per-document rule. ``None`` means 'inherit from lower tier'."""
enabled: Optional[bool] = None
policy: Optional[str] = None
allow_from: Optional[frozenset] = None
@dataclass(frozen=True)
class CommentsConfig:
"""Top-level comment access config."""
enabled: bool = True
policy: str = "pairing"
allow_from: frozenset = field(default_factory=frozenset)
documents: Dict[str, CommentDocumentRule] = field(default_factory=dict)
@dataclass(frozen=True)
class ResolvedCommentRule:
"""Fully resolved rule after field-by-field fallback."""
enabled: bool
policy: str
allow_from: frozenset
match_source: str # e.g. "exact:docx:xxx" | "wildcard" | "top" | "default"
# ---------------------------------------------------------------------------
# Mtime-cached file loading
# ---------------------------------------------------------------------------
class _MtimeCache:
"""Generic mtime-based file cache. ``stat()`` per access, re-read only on change."""
def __init__(self, path: Path):
self._path = path
self._mtime: float = 0.0
self._data: Optional[dict] = None
def load(self) -> dict:
try:
st = self._path.stat()
mtime = st.st_mtime
except FileNotFoundError:
self._mtime = 0.0
self._data = {}
return {}
if mtime == self._mtime and self._data is not None:
return self._data
try:
with open(self._path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
data = {}
except (json.JSONDecodeError, OSError):
logger.warning("[Feishu-Rules] Failed to read %s, using empty config", self._path)
data = {}
self._mtime = mtime
self._data = data
return data
_rules_cache = _MtimeCache(RULES_FILE)
_pairing_cache = _MtimeCache(PAIRING_FILE)
# ---------------------------------------------------------------------------
# Config parsing
# ---------------------------------------------------------------------------
def _parse_frozenset(raw: Any) -> Optional[frozenset]:
"""Parse a list of strings into a frozenset; return None if key absent."""
if raw is None:
return None
if isinstance(raw, (list, tuple)):
return frozenset(str(u).strip() for u in raw if str(u).strip())
return None
def _parse_document_rule(raw: dict) -> CommentDocumentRule:
enabled = raw.get("enabled")
if enabled is not None:
enabled = bool(enabled)
policy = raw.get("policy")
if policy is not None:
policy = str(policy).strip().lower()
if policy not in _VALID_POLICIES:
policy = None
allow_from = _parse_frozenset(raw.get("allow_from"))
return CommentDocumentRule(enabled=enabled, policy=policy, allow_from=allow_from)
def load_config() -> CommentsConfig:
"""Load comment rules from disk (mtime-cached)."""
raw = _rules_cache.load()
if not raw:
return CommentsConfig()
documents: Dict[str, CommentDocumentRule] = {}
raw_docs = raw.get("documents", {})
if isinstance(raw_docs, dict):
for key, rule_raw in raw_docs.items():
if isinstance(rule_raw, dict):
documents[str(key)] = _parse_document_rule(rule_raw)
policy = str(raw.get("policy", "pairing")).strip().lower()
if policy not in _VALID_POLICIES:
policy = "pairing"
return CommentsConfig(
enabled=raw.get("enabled", True),
policy=policy,
allow_from=_parse_frozenset(raw.get("allow_from")) or frozenset(),
documents=documents,
)
# ---------------------------------------------------------------------------
# Rule resolution (§8.4 field-by-field fallback)
# ---------------------------------------------------------------------------
def has_wiki_keys(cfg: CommentsConfig) -> bool:
"""Check if any document rule key starts with 'wiki:'."""
return any(k.startswith("wiki:") for k in cfg.documents)
def resolve_rule(
cfg: CommentsConfig,
file_type: str,
file_token: str,
wiki_token: str = "",
) -> ResolvedCommentRule:
"""Resolve effective rule: exact doc → wiki key → wildcard → top-level → defaults."""
exact_key = f"{file_type}:{file_token}"
exact = cfg.documents.get(exact_key)
exact_src = f"exact:{exact_key}"
if exact is None and wiki_token:
wiki_key = f"wiki:{wiki_token}"
exact = cfg.documents.get(wiki_key)
exact_src = f"exact:{wiki_key}"
wildcard = cfg.documents.get("*")
layers = []
if exact is not None:
layers.append((exact, exact_src))
if wildcard is not None:
layers.append((wildcard, "wildcard"))
def _pick(field_name: str):
for layer, source in layers:
val = getattr(layer, field_name)
if val is not None:
return val, source
return getattr(cfg, field_name), "top"
enabled, en_src = _pick("enabled")
policy, pol_src = _pick("policy")
allow_from, _ = _pick("allow_from")
# match_source = highest-priority tier that contributed any field
priority_order = {"exact": 0, "wildcard": 1, "top": 2}
best_src = min(
[en_src, pol_src],
key=lambda s: priority_order.get(s.split(":")[0], 3),
)
return ResolvedCommentRule(
enabled=enabled,
policy=policy,
allow_from=allow_from,
match_source=best_src,
)
# ---------------------------------------------------------------------------
# Pairing store
# ---------------------------------------------------------------------------
def _load_pairing_approved() -> set:
"""Return set of approved user open_ids (mtime-cached)."""
data = _pairing_cache.load()
approved = data.get("approved", {})
if isinstance(approved, dict):
return set(approved.keys())
if isinstance(approved, list):
return set(str(u) for u in approved if u)
return set()
def _save_pairing(data: dict) -> None:
PAIRING_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = PAIRING_FILE.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
tmp.replace(PAIRING_FILE)
# Invalidate cache so next load picks up change
_pairing_cache._mtime = 0.0
_pairing_cache._data = None
def pairing_add(user_open_id: str) -> bool:
"""Add a user to the pairing-approved list. Returns True if newly added."""
data = _pairing_cache.load()
approved = data.get("approved", {})
if not isinstance(approved, dict):
approved = {}
if user_open_id in approved:
return False
approved[user_open_id] = {"approved_at": time.time()}
data["approved"] = approved
_save_pairing(data)
return True
def pairing_remove(user_open_id: str) -> bool:
"""Remove a user from the pairing-approved list. Returns True if removed."""
data = _pairing_cache.load()
approved = data.get("approved", {})
if not isinstance(approved, dict):
return False
if user_open_id not in approved:
return False
del approved[user_open_id]
data["approved"] = approved
_save_pairing(data)
return True
def pairing_list() -> Dict[str, Any]:
"""Return the approved dict {user_open_id: {approved_at: ...}}."""
data = _pairing_cache.load()
approved = data.get("approved", {})
return dict(approved) if isinstance(approved, dict) else {}
# ---------------------------------------------------------------------------
# Access check (public API for feishu_comment.py)
# ---------------------------------------------------------------------------
def is_user_allowed(rule: ResolvedCommentRule, user_open_id: str) -> bool:
"""Check if user passes the resolved rule's policy gate."""
if user_open_id in rule.allow_from:
return True
if rule.policy == "pairing":
return user_open_id in _load_pairing_approved()
return False
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _print_status() -> None:
cfg = load_config()
print(f"Rules file: {RULES_FILE}")
print(f" exists: {RULES_FILE.exists()}")
print(f"Pairing file: {PAIRING_FILE}")
print(f" exists: {PAIRING_FILE.exists()}")
print()
print(f"Top-level:")
print(f" enabled: {cfg.enabled}")
print(f" policy: {cfg.policy}")
print(f" allow_from: {sorted(cfg.allow_from) if cfg.allow_from else '[]'}")
print()
if cfg.documents:
print(f"Document rules ({len(cfg.documents)}):")
for key, rule in sorted(cfg.documents.items()):
parts = []
if rule.enabled is not None:
parts.append(f"enabled={rule.enabled}")
if rule.policy is not None:
parts.append(f"policy={rule.policy}")
if rule.allow_from is not None:
parts.append(f"allow_from={sorted(rule.allow_from)}")
print(f" [{key}] {', '.join(parts) if parts else '(empty — inherits all)'}")
else:
print("Document rules: (none)")
print()
approved = pairing_list()
print(f"Pairing approved ({len(approved)}):")
for uid, meta in sorted(approved.items()):
ts = meta.get("approved_at", 0)
print(f" {uid} (approved_at={ts})")
def _do_check(doc_key: str, user_open_id: str) -> None:
cfg = load_config()
parts = doc_key.split(":", 1)
if len(parts) != 2:
print(f"Error: doc_key must be 'fileType:fileToken', got '{doc_key}'")
return
file_type, file_token = parts
rule = resolve_rule(cfg, file_type, file_token)
allowed = is_user_allowed(rule, user_open_id)
print(f"Document: {doc_key}")
print(f"User: {user_open_id}")
print(f"Resolved rule:")
print(f" enabled: {rule.enabled}")
print(f" policy: {rule.policy}")
print(f" allow_from: {sorted(rule.allow_from) if rule.allow_from else '[]'}")
print(f" match_source: {rule.match_source}")
print(f"Result: {'ALLOWED' if allowed else 'DENIED'}")
def _main() -> int:
import sys
try:
from hermes_cli.env_loader import load_hermes_dotenv
load_hermes_dotenv()
except Exception:
pass
usage = (
"Usage: python -m gateway.platforms.feishu_comment_rules <command> [args]\n"
"\n"
"Commands:\n"
" status Show rules config and pairing state\n"
" check <fileType:token> <user> Simulate access check\n"
" pairing add <user_open_id> Add user to pairing-approved list\n"
" pairing remove <user_open_id> Remove user from pairing-approved list\n"
" pairing list List pairing-approved users\n"
"\n"
f"Rules config file: {RULES_FILE}\n"
" Edit this JSON file directly to configure policies and document rules.\n"
" Changes take effect on the next comment event (no restart needed).\n"
)
args = sys.argv[1:]
if not args:
print(usage)
return 1
cmd = args[0]
if cmd == "status":
_print_status()
elif cmd == "check":
if len(args) < 3:
print("Usage: check <fileType:fileToken> <user_open_id>")
return 1
_do_check(args[1], args[2])
elif cmd == "pairing":
if len(args) < 2:
print("Usage: pairing <add|remove|list> [args]")
return 1
sub = args[1]
if sub == "add":
if len(args) < 3:
print("Usage: pairing add <user_open_id>")
return 1
if pairing_add(args[2]):
print(f"Added: {args[2]}")
else:
print(f"Already approved: {args[2]}")
elif sub == "remove":
if len(args) < 3:
print("Usage: pairing remove <user_open_id>")
return 1
if pairing_remove(args[2]):
print(f"Removed: {args[2]}")
else:
print(f"Not in approved list: {args[2]}")
elif sub == "list":
approved = pairing_list()
if not approved:
print("(no approved users)")
for uid, meta in sorted(approved.items()):
print(f" {uid} approved_at={meta.get('approved_at', '?')}")
else:
print(f"Unknown pairing subcommand: {sub}")
return 1
else:
print(f"Unknown command: {cmd}\n")
print(usage)
return 1
return 0
if __name__ == "__main__":
import sys
sys.exit(_main())

View file

@ -0,0 +1,261 @@
"""Tests for feishu_comment — event filtering, access control integration, wiki reverse lookup."""
import asyncio
import json
import unittest
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock, patch
from gateway.platforms.feishu_comment import (
parse_drive_comment_event,
_ALLOWED_NOTICE_TYPES,
_sanitize_comment_text,
)
def _make_event(
comment_id="c1",
reply_id="r1",
notice_type="add_reply",
file_token="docx_token",
file_type="docx",
from_open_id="ou_user",
to_open_id="ou_bot",
is_mentioned=True,
):
"""Build a minimal drive comment event SimpleNamespace."""
return SimpleNamespace(event={
"event_id": "evt_1",
"comment_id": comment_id,
"reply_id": reply_id,
"is_mentioned": is_mentioned,
"timestamp": "1713200000",
"notice_meta": {
"file_token": file_token,
"file_type": file_type,
"notice_type": notice_type,
"from_user_id": {"open_id": from_open_id},
"to_user_id": {"open_id": to_open_id},
},
})
class TestParseEvent(unittest.TestCase):
def test_parse_valid_event(self):
evt = _make_event()
parsed = parse_drive_comment_event(evt)
self.assertIsNotNone(parsed)
self.assertEqual(parsed["comment_id"], "c1")
self.assertEqual(parsed["file_type"], "docx")
self.assertEqual(parsed["from_open_id"], "ou_user")
self.assertEqual(parsed["to_open_id"], "ou_bot")
def test_parse_missing_event_attr(self):
self.assertIsNone(parse_drive_comment_event(object()))
def test_parse_none_event(self):
self.assertIsNone(parse_drive_comment_event(SimpleNamespace()))
class TestEventFiltering(unittest.TestCase):
"""Test the filtering logic in handle_drive_comment_event."""
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
@patch("gateway.platforms.feishu_comment_rules.load_config")
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed")
def test_self_reply_filtered(self, mock_allowed, mock_resolve, mock_load):
"""Events where from_open_id == self_open_id should be dropped."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
evt = _make_event(from_open_id="ou_bot", to_open_id="ou_bot")
self._run(handle_drive_comment_event(Mock(), evt, self_open_id="ou_bot"))
mock_load.assert_not_called()
@patch("gateway.platforms.feishu_comment_rules.load_config")
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed")
def test_wrong_receiver_filtered(self, mock_allowed, mock_resolve, mock_load):
"""Events where to_open_id != self_open_id should be dropped."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
evt = _make_event(to_open_id="ou_other_bot")
self._run(handle_drive_comment_event(Mock(), evt, self_open_id="ou_bot"))
mock_load.assert_not_called()
@patch("gateway.platforms.feishu_comment_rules.load_config")
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed")
def test_empty_to_open_id_filtered(self, mock_allowed, mock_resolve, mock_load):
"""Events with empty to_open_id should be dropped."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
evt = _make_event(to_open_id="")
self._run(handle_drive_comment_event(Mock(), evt, self_open_id="ou_bot"))
mock_load.assert_not_called()
@patch("gateway.platforms.feishu_comment_rules.load_config")
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed")
def test_invalid_notice_type_filtered(self, mock_allowed, mock_resolve, mock_load):
"""Events with unsupported notice_type should be dropped."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
evt = _make_event(notice_type="resolve_comment")
self._run(handle_drive_comment_event(Mock(), evt, self_open_id="ou_bot"))
mock_load.assert_not_called()
def test_allowed_notice_types(self):
self.assertIn("add_comment", _ALLOWED_NOTICE_TYPES)
self.assertIn("add_reply", _ALLOWED_NOTICE_TYPES)
self.assertNotIn("resolve_comment", _ALLOWED_NOTICE_TYPES)
class TestAccessControlIntegration(unittest.TestCase):
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
@patch("gateway.platforms.feishu_comment_rules.has_wiki_keys", return_value=False)
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed", return_value=False)
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.load_config")
def test_denied_user_no_side_effects(self, mock_load, mock_resolve, mock_allowed, mock_wiki_keys):
"""Denied user should not trigger typing reaction or agent."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
from gateway.platforms.feishu_comment_rules import ResolvedCommentRule
mock_resolve.return_value = ResolvedCommentRule(True, "allowlist", frozenset(), "top")
mock_load.return_value = Mock()
client = Mock()
evt = _make_event()
self._run(handle_drive_comment_event(client, evt, self_open_id="ou_bot"))
# No API calls should be made for denied users
client.request.assert_not_called()
@patch("gateway.platforms.feishu_comment_rules.has_wiki_keys", return_value=False)
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed", return_value=False)
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.load_config")
def test_disabled_comment_skipped(self, mock_load, mock_resolve, mock_allowed, mock_wiki_keys):
"""Disabled comments should return immediately."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
from gateway.platforms.feishu_comment_rules import ResolvedCommentRule
mock_resolve.return_value = ResolvedCommentRule(False, "allowlist", frozenset(), "top")
mock_load.return_value = Mock()
evt = _make_event()
self._run(handle_drive_comment_event(Mock(), evt, self_open_id="ou_bot"))
mock_allowed.assert_not_called()
class TestSanitizeCommentText(unittest.TestCase):
def test_angle_brackets_escaped(self):
self.assertEqual(_sanitize_comment_text("List<String>"), "List&lt;String&gt;")
def test_ampersand_escaped_first(self):
self.assertEqual(_sanitize_comment_text("a & b"), "a &amp; b")
def test_ampersand_not_double_escaped(self):
result = _sanitize_comment_text("a < b & c > d")
self.assertEqual(result, "a &lt; b &amp; c &gt; d")
self.assertNotIn("&amp;lt;", result)
self.assertNotIn("&amp;gt;", result)
def test_plain_text_unchanged(self):
self.assertEqual(_sanitize_comment_text("hello world"), "hello world")
def test_empty_string(self):
self.assertEqual(_sanitize_comment_text(""), "")
def test_code_snippet(self):
text = 'if (a < b && c > 0) { return "ok"; }'
result = _sanitize_comment_text(text)
self.assertNotIn("<", result)
self.assertNotIn(">", result)
self.assertIn("&lt;", result)
self.assertIn("&gt;", result)
class TestWikiReverseLookup(unittest.TestCase):
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
@patch("gateway.platforms.feishu_comment._exec_request")
def test_reverse_lookup_success(self, mock_exec):
from gateway.platforms.feishu_comment import _reverse_lookup_wiki_token
mock_exec.return_value = (0, "Success", {
"node": {"node_token": "WIKI_TOKEN_123", "obj_token": "docx_abc"},
})
result = self._run(_reverse_lookup_wiki_token(Mock(), "docx", "docx_abc"))
self.assertEqual(result, "WIKI_TOKEN_123")
# Verify correct API params
call_args = mock_exec.call_args
queries = call_args[1].get("queries") or call_args[0][3]
query_dict = dict(queries)
self.assertEqual(query_dict["token"], "docx_abc")
self.assertEqual(query_dict["obj_type"], "docx")
@patch("gateway.platforms.feishu_comment._exec_request")
def test_reverse_lookup_not_wiki(self, mock_exec):
from gateway.platforms.feishu_comment import _reverse_lookup_wiki_token
mock_exec.return_value = (131001, "not found", {})
result = self._run(_reverse_lookup_wiki_token(Mock(), "docx", "docx_abc"))
self.assertIsNone(result)
@patch("gateway.platforms.feishu_comment._exec_request")
def test_reverse_lookup_service_error(self, mock_exec):
from gateway.platforms.feishu_comment import _reverse_lookup_wiki_token
mock_exec.return_value = (500, "internal error", {})
result = self._run(_reverse_lookup_wiki_token(Mock(), "docx", "docx_abc"))
self.assertIsNone(result)
@patch("gateway.platforms.feishu_comment._reverse_lookup_wiki_token", new_callable=AsyncMock)
@patch("gateway.platforms.feishu_comment_rules.has_wiki_keys", return_value=True)
@patch("gateway.platforms.feishu_comment_rules.is_user_allowed", return_value=True)
@patch("gateway.platforms.feishu_comment_rules.resolve_rule")
@patch("gateway.platforms.feishu_comment_rules.load_config")
@patch("gateway.platforms.feishu_comment.add_comment_reaction", new_callable=AsyncMock)
@patch("gateway.platforms.feishu_comment.batch_query_comment", new_callable=AsyncMock)
@patch("gateway.platforms.feishu_comment.query_document_meta", new_callable=AsyncMock)
def test_wiki_lookup_triggered_when_no_exact_match(
self, mock_meta, mock_batch, mock_reaction,
mock_load, mock_resolve, mock_allowed, mock_wiki_keys, mock_lookup,
):
"""Wiki reverse lookup should fire when rule falls to wildcard/top and wiki keys exist."""
from gateway.platforms.feishu_comment import handle_drive_comment_event
from gateway.platforms.feishu_comment_rules import ResolvedCommentRule
# First resolve returns wildcard (no exact match), second returns exact wiki match
mock_resolve.side_effect = [
ResolvedCommentRule(True, "allowlist", frozenset(), "wildcard"),
ResolvedCommentRule(True, "allowlist", frozenset(), "exact:wiki:WIKI123"),
]
mock_load.return_value = Mock()
mock_lookup.return_value = "WIKI123"
mock_meta.return_value = {"title": "Test", "url": ""}
mock_batch.return_value = {"is_whole": False, "quote": ""}
evt = _make_event()
# Will proceed past access control but fail later — that's OK, we just test the lookup
try:
self._run(handle_drive_comment_event(Mock(), evt, self_open_id="ou_bot"))
except Exception:
pass
mock_lookup.assert_called_once_with(unittest.mock.ANY, "docx", "docx_token")
self.assertEqual(mock_resolve.call_count, 2)
# Second call should include wiki_token
second_call_kwargs = mock_resolve.call_args_list[1]
self.assertEqual(second_call_kwargs[1].get("wiki_token") or second_call_kwargs[0][3], "WIKI123")
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,320 @@
"""Tests for feishu_comment_rules — 3-tier access control rule engine."""
import json
import os
import tempfile
import time
import unittest
from pathlib import Path
from unittest.mock import patch
from gateway.platforms.feishu_comment_rules import (
CommentsConfig,
CommentDocumentRule,
ResolvedCommentRule,
_MtimeCache,
_parse_document_rule,
has_wiki_keys,
is_user_allowed,
load_config,
pairing_add,
pairing_list,
pairing_remove,
resolve_rule,
)
class TestCommentDocumentRuleParsing(unittest.TestCase):
def test_parse_full_rule(self):
rule = _parse_document_rule({
"enabled": False,
"policy": "allowlist",
"allow_from": ["ou_a", "ou_b"],
})
self.assertFalse(rule.enabled)
self.assertEqual(rule.policy, "allowlist")
self.assertEqual(rule.allow_from, frozenset(["ou_a", "ou_b"]))
def test_parse_partial_rule(self):
rule = _parse_document_rule({"policy": "allowlist"})
self.assertIsNone(rule.enabled)
self.assertEqual(rule.policy, "allowlist")
self.assertIsNone(rule.allow_from)
def test_parse_empty_rule(self):
rule = _parse_document_rule({})
self.assertIsNone(rule.enabled)
self.assertIsNone(rule.policy)
self.assertIsNone(rule.allow_from)
def test_invalid_policy_ignored(self):
rule = _parse_document_rule({"policy": "invalid_value"})
self.assertIsNone(rule.policy)
class TestResolveRule(unittest.TestCase):
def test_exact_match(self):
cfg = CommentsConfig(
policy="pairing",
allow_from=frozenset(["ou_top"]),
documents={
"docx:abc": CommentDocumentRule(policy="allowlist"),
},
)
rule = resolve_rule(cfg, "docx", "abc")
self.assertEqual(rule.policy, "allowlist")
self.assertTrue(rule.match_source.startswith("exact:"))
def test_wildcard_match(self):
cfg = CommentsConfig(
policy="pairing",
documents={
"*": CommentDocumentRule(policy="allowlist"),
},
)
rule = resolve_rule(cfg, "docx", "unknown")
self.assertEqual(rule.policy, "allowlist")
self.assertEqual(rule.match_source, "wildcard")
def test_top_level_fallback(self):
cfg = CommentsConfig(policy="pairing", allow_from=frozenset(["ou_top"]))
rule = resolve_rule(cfg, "docx", "whatever")
self.assertEqual(rule.policy, "pairing")
self.assertEqual(rule.allow_from, frozenset(["ou_top"]))
self.assertEqual(rule.match_source, "top")
def test_exact_overrides_wildcard(self):
cfg = CommentsConfig(
policy="pairing",
documents={
"*": CommentDocumentRule(policy="pairing"),
"docx:abc": CommentDocumentRule(policy="allowlist"),
},
)
rule = resolve_rule(cfg, "docx", "abc")
self.assertEqual(rule.policy, "allowlist")
self.assertTrue(rule.match_source.startswith("exact:"))
def test_field_by_field_fallback(self):
"""Exact sets policy, wildcard sets allow_from, enabled from top."""
cfg = CommentsConfig(
enabled=True,
policy="pairing",
allow_from=frozenset(["ou_top"]),
documents={
"*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])),
"docx:abc": CommentDocumentRule(policy="allowlist"),
},
)
rule = resolve_rule(cfg, "docx", "abc")
self.assertEqual(rule.policy, "allowlist")
self.assertEqual(rule.allow_from, frozenset(["ou_wildcard"]))
self.assertTrue(rule.enabled)
def test_explicit_empty_allow_from_does_not_fall_through(self):
"""allow_from=[] on exact should NOT inherit from wildcard or top."""
cfg = CommentsConfig(
allow_from=frozenset(["ou_top"]),
documents={
"*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])),
"docx:abc": CommentDocumentRule(
policy="allowlist",
allow_from=frozenset(),
),
},
)
rule = resolve_rule(cfg, "docx", "abc")
self.assertEqual(rule.allow_from, frozenset())
def test_wiki_token_match(self):
cfg = CommentsConfig(
policy="pairing",
documents={
"wiki:WIKI123": CommentDocumentRule(policy="allowlist"),
},
)
rule = resolve_rule(cfg, "docx", "obj_token", wiki_token="WIKI123")
self.assertEqual(rule.policy, "allowlist")
self.assertTrue(rule.match_source.startswith("exact:wiki:"))
def test_exact_takes_priority_over_wiki(self):
cfg = CommentsConfig(
documents={
"docx:abc": CommentDocumentRule(policy="allowlist"),
"wiki:WIKI123": CommentDocumentRule(policy="pairing"),
},
)
rule = resolve_rule(cfg, "docx", "abc", wiki_token="WIKI123")
self.assertEqual(rule.policy, "allowlist")
self.assertTrue(rule.match_source.startswith("exact:docx:"))
def test_default_config(self):
cfg = CommentsConfig()
rule = resolve_rule(cfg, "docx", "anything")
self.assertTrue(rule.enabled)
self.assertEqual(rule.policy, "pairing")
self.assertEqual(rule.allow_from, frozenset())
class TestHasWikiKeys(unittest.TestCase):
def test_no_wiki_keys(self):
cfg = CommentsConfig(documents={
"docx:abc": CommentDocumentRule(policy="allowlist"),
"*": CommentDocumentRule(policy="pairing"),
})
self.assertFalse(has_wiki_keys(cfg))
def test_has_wiki_keys(self):
cfg = CommentsConfig(documents={
"wiki:WIKI123": CommentDocumentRule(policy="allowlist"),
})
self.assertTrue(has_wiki_keys(cfg))
def test_empty_documents(self):
cfg = CommentsConfig()
self.assertFalse(has_wiki_keys(cfg))
class TestIsUserAllowed(unittest.TestCase):
def test_allowlist_allows_listed(self):
rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top")
self.assertTrue(is_user_allowed(rule, "ou_a"))
def test_allowlist_denies_unlisted(self):
rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top")
self.assertFalse(is_user_allowed(rule, "ou_b"))
def test_allowlist_empty_denies_all(self):
rule = ResolvedCommentRule(True, "allowlist", frozenset(), "top")
self.assertFalse(is_user_allowed(rule, "ou_anyone"))
def test_pairing_allows_in_allow_from(self):
rule = ResolvedCommentRule(True, "pairing", frozenset(["ou_a"]), "top")
self.assertTrue(is_user_allowed(rule, "ou_a"))
def test_pairing_checks_store(self):
rule = ResolvedCommentRule(True, "pairing", frozenset(), "top")
with patch(
"gateway.platforms.feishu_comment_rules._load_pairing_approved",
return_value={"ou_approved"},
):
self.assertTrue(is_user_allowed(rule, "ou_approved"))
self.assertFalse(is_user_allowed(rule, "ou_unknown"))
class TestMtimeCache(unittest.TestCase):
def test_returns_empty_dict_for_missing_file(self):
cache = _MtimeCache(Path("/nonexistent/path.json"))
self.assertEqual(cache.load(), {})
def test_reads_file_and_caches(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"key": "value"}, f)
f.flush()
path = Path(f.name)
try:
cache = _MtimeCache(path)
data = cache.load()
self.assertEqual(data, {"key": "value"})
# Second load should use cache (same mtime)
data2 = cache.load()
self.assertEqual(data2, {"key": "value"})
finally:
path.unlink()
def test_reloads_on_mtime_change(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"v": 1}, f)
f.flush()
path = Path(f.name)
try:
cache = _MtimeCache(path)
self.assertEqual(cache.load(), {"v": 1})
# Modify file
time.sleep(0.05)
with open(path, "w") as f2:
json.dump({"v": 2}, f2)
# Force mtime change detection
os.utime(path, (time.time() + 1, time.time() + 1))
self.assertEqual(cache.load(), {"v": 2})
finally:
path.unlink()
class TestLoadConfig(unittest.TestCase):
def test_load_with_documents(self):
raw = {
"enabled": True,
"policy": "allowlist",
"allow_from": ["ou_a"],
"documents": {
"*": {"policy": "pairing"},
"docx:abc": {"policy": "allowlist", "allow_from": ["ou_b"]},
},
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(raw, f)
path = Path(f.name)
try:
with patch("gateway.platforms.feishu_comment_rules.RULES_FILE", path):
with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(path)):
cfg = load_config()
self.assertTrue(cfg.enabled)
self.assertEqual(cfg.policy, "allowlist")
self.assertEqual(cfg.allow_from, frozenset(["ou_a"]))
self.assertIn("*", cfg.documents)
self.assertIn("docx:abc", cfg.documents)
self.assertEqual(cfg.documents["docx:abc"].policy, "allowlist")
finally:
path.unlink()
def test_load_missing_file_returns_defaults(self):
with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(Path("/nonexistent"))):
cfg = load_config()
self.assertTrue(cfg.enabled)
self.assertEqual(cfg.policy, "pairing")
self.assertEqual(cfg.allow_from, frozenset())
self.assertEqual(cfg.documents, {})
class TestPairingStore(unittest.TestCase):
def setUp(self):
self._tmpdir = tempfile.mkdtemp()
self._pairing_file = Path(self._tmpdir) / "pairing.json"
with open(self._pairing_file, "w") as f:
json.dump({"approved": {}}, f)
self._patcher_file = patch("gateway.platforms.feishu_comment_rules.PAIRING_FILE", self._pairing_file)
self._patcher_cache = patch(
"gateway.platforms.feishu_comment_rules._pairing_cache",
_MtimeCache(self._pairing_file),
)
self._patcher_file.start()
self._patcher_cache.start()
def tearDown(self):
self._patcher_cache.stop()
self._patcher_file.stop()
if self._pairing_file.exists():
self._pairing_file.unlink()
os.rmdir(self._tmpdir)
def test_add_and_list(self):
self.assertTrue(pairing_add("ou_new"))
approved = pairing_list()
self.assertIn("ou_new", approved)
def test_add_duplicate(self):
pairing_add("ou_a")
self.assertFalse(pairing_add("ou_a"))
def test_remove(self):
pairing_add("ou_a")
self.assertTrue(pairing_remove("ou_a"))
self.assertNotIn("ou_a", pairing_list())
def test_remove_nonexistent(self):
self.assertFalse(pairing_remove("ou_nobody"))
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,62 @@
"""Tests for feishu_doc_tool and feishu_drive_tool — registration and schema validation."""
import importlib
import unittest
from tools.registry import registry
# Trigger tool discovery so feishu tools get registered
importlib.import_module("tools.feishu_doc_tool")
importlib.import_module("tools.feishu_drive_tool")
class TestFeishuToolRegistration(unittest.TestCase):
"""Verify feishu tools are registered and have valid schemas."""
EXPECTED_TOOLS = {
"feishu_doc_read": "feishu_doc",
"feishu_drive_list_comments": "feishu_drive",
"feishu_drive_list_comment_replies": "feishu_drive",
"feishu_drive_reply_comment": "feishu_drive",
"feishu_drive_add_comment": "feishu_drive",
}
def test_all_tools_registered(self):
for tool_name, toolset in self.EXPECTED_TOOLS.items():
entry = registry.get_entry(tool_name)
self.assertIsNotNone(entry, f"{tool_name} not registered")
self.assertEqual(entry.toolset, toolset)
def test_schemas_have_required_fields(self):
for tool_name in self.EXPECTED_TOOLS:
entry = registry.get_entry(tool_name)
schema = entry.schema
self.assertIn("name", schema)
self.assertEqual(schema["name"], tool_name)
self.assertIn("description", schema)
self.assertIn("parameters", schema)
self.assertIn("type", schema["parameters"])
self.assertEqual(schema["parameters"]["type"], "object")
def test_handlers_are_callable(self):
for tool_name in self.EXPECTED_TOOLS:
entry = registry.get_entry(tool_name)
self.assertTrue(callable(entry.handler))
def test_doc_read_schema_params(self):
entry = registry.get_entry("feishu_doc_read")
props = entry.schema["parameters"].get("properties", {})
self.assertIn("doc_token", props)
def test_drive_tools_require_file_token(self):
for tool_name in self.EXPECTED_TOOLS:
if tool_name == "feishu_doc_read":
continue
entry = registry.get_entry(tool_name)
props = entry.schema["parameters"].get("properties", {})
self.assertIn("file_token", props, f"{tool_name} missing file_token param")
self.assertIn("file_type", props, f"{tool_name} missing file_type param")
if __name__ == "__main__":
unittest.main()

136
tools/feishu_doc_tool.py Normal file
View file

@ -0,0 +1,136 @@
"""Feishu Document Tool -- read document content via Feishu/Lark API.
Provides ``feishu_doc_read`` for reading document content as plain text.
Uses the same lazy-import + BaseRequest pattern as feishu_comment.py.
"""
import asyncio
import json
import logging
import threading
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
# Thread-local storage for the lark client injected by feishu_comment handler.
_local = threading.local()
def set_client(client):
"""Store a lark client for the current thread (called by feishu_comment)."""
_local.client = client
def get_client():
"""Return the lark client for the current thread, or None."""
return getattr(_local, "client", None)
# ---------------------------------------------------------------------------
# feishu_doc_read
# ---------------------------------------------------------------------------
_RAW_CONTENT_URI = "/open-apis/docx/v1/documents/:document_id/raw_content"
FEISHU_DOC_READ_SCHEMA = {
"name": "feishu_doc_read",
"description": (
"Read the full content of a Feishu/Lark document as plain text. "
"Useful when you need more context beyond the quoted text in a comment."
),
"parameters": {
"type": "object",
"properties": {
"doc_token": {
"type": "string",
"description": "The document token (from the document URL or comment context).",
},
},
"required": ["doc_token"],
},
}
def _check_feishu():
try:
import lark_oapi # noqa: F401
return True
except ImportError:
return False
def _handle_feishu_doc_read(args: dict, **kwargs) -> str:
doc_token = args.get("doc_token", "").strip()
if not doc_token:
return tool_error("doc_token is required")
client = get_client()
if client is None:
return tool_error("Feishu client not available (not in a Feishu comment context)")
try:
from lark_oapi import AccessTokenType
from lark_oapi.core.enum import HttpMethod
from lark_oapi.core.model.base_request import BaseRequest
except ImportError:
return tool_error("lark_oapi not installed")
request = (
BaseRequest.builder()
.http_method(HttpMethod.GET)
.uri(_RAW_CONTENT_URI)
.token_types({AccessTokenType.TENANT})
.paths({"document_id": doc_token})
.build()
)
try:
response = asyncio.get_event_loop().run_until_complete(
asyncio.to_thread(client.request, request)
)
except RuntimeError:
# No running event loop -- call synchronously
response = client.request(request)
code = getattr(response, "code", None)
if code != 0:
msg = getattr(response, "msg", "unknown error")
return tool_error(f"Failed to read document: code={code} msg={msg}")
raw = getattr(response, "raw", None)
if raw and hasattr(raw, "content"):
try:
body = json.loads(raw.content)
content = body.get("data", {}).get("content", "")
return tool_result(success=True, content=content)
except (json.JSONDecodeError, AttributeError):
pass
# Fallback: try response.data
data = getattr(response, "data", None)
if data:
if isinstance(data, dict):
content = data.get("content", "")
else:
content = getattr(data, "content", str(data))
return tool_result(success=True, content=content)
return tool_error("No content returned from document API")
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
registry.register(
name="feishu_doc_read",
toolset="feishu_doc",
schema=FEISHU_DOC_READ_SCHEMA,
handler=_handle_feishu_doc_read,
check_fn=_check_feishu,
requires_env=[],
is_async=False,
description="Read Feishu document content",
emoji="\U0001f4c4",
)

433
tools/feishu_drive_tool.py Normal file
View file

@ -0,0 +1,433 @@
"""Feishu Drive Tools -- document comment operations via Feishu/Lark API.
Provides tools for listing, replying to, and adding document comments.
Uses the same lazy-import + BaseRequest pattern as feishu_comment.py.
The lark client is injected per-thread by the comment event handler.
"""
import asyncio
import json
import logging
import threading
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
# Thread-local storage for the lark client injected by feishu_comment handler.
_local = threading.local()
def set_client(client):
"""Store a lark client for the current thread (called by feishu_comment)."""
_local.client = client
def get_client():
"""Return the lark client for the current thread, or None."""
return getattr(_local, "client", None)
def _check_feishu():
try:
import lark_oapi # noqa: F401
return True
except ImportError:
return False
def _do_request(client, method, uri, paths=None, queries=None, body=None):
"""Build and execute a BaseRequest, return (code, msg, data_dict)."""
from lark_oapi import AccessTokenType
from lark_oapi.core.enum import HttpMethod
from lark_oapi.core.model.base_request import BaseRequest
http_method = HttpMethod.GET if method == "GET" else HttpMethod.POST
builder = (
BaseRequest.builder()
.http_method(http_method)
.uri(uri)
.token_types({AccessTokenType.TENANT})
)
if paths:
builder = builder.paths(paths)
if queries:
builder = builder.queries(queries)
if body is not None:
builder = builder.body(body)
request = builder.build()
try:
response = asyncio.get_event_loop().run_until_complete(
asyncio.to_thread(client.request, request)
)
except RuntimeError:
response = client.request(request)
code = getattr(response, "code", None)
msg = getattr(response, "msg", "")
# Parse response data
data = {}
raw = getattr(response, "raw", None)
if raw and hasattr(raw, "content"):
try:
body_json = json.loads(raw.content)
data = body_json.get("data", {})
except (json.JSONDecodeError, AttributeError):
pass
if not data:
resp_data = getattr(response, "data", None)
if isinstance(resp_data, dict):
data = resp_data
elif resp_data and hasattr(resp_data, "__dict__"):
data = vars(resp_data)
return code, msg, data
# ---------------------------------------------------------------------------
# feishu_drive_list_comments
# ---------------------------------------------------------------------------
_LIST_COMMENTS_URI = "/open-apis/drive/v1/files/:file_token/comments"
FEISHU_DRIVE_LIST_COMMENTS_SCHEMA = {
"name": "feishu_drive_list_comments",
"description": (
"List comments on a Feishu document. "
"Use is_whole=true to list whole-document comments only."
),
"parameters": {
"type": "object",
"properties": {
"file_token": {
"type": "string",
"description": "The document file token.",
},
"file_type": {
"type": "string",
"description": "File type (default: docx).",
"default": "docx",
},
"is_whole": {
"type": "boolean",
"description": "If true, only return whole-document comments.",
"default": False,
},
"page_size": {
"type": "integer",
"description": "Number of comments per page (max 100).",
"default": 100,
},
"page_token": {
"type": "string",
"description": "Pagination token for next page.",
},
},
"required": ["file_token"],
},
}
def _handle_list_comments(args: dict, **kwargs) -> str:
client = get_client()
if client is None:
return tool_error("Feishu client not available")
file_token = args.get("file_token", "").strip()
if not file_token:
return tool_error("file_token is required")
file_type = args.get("file_type", "docx") or "docx"
is_whole = args.get("is_whole", False)
page_size = args.get("page_size", 100)
page_token = args.get("page_token", "")
queries = [
("file_type", file_type),
("user_id_type", "open_id"),
("page_size", str(page_size)),
]
if is_whole:
queries.append(("is_whole", "true"))
if page_token:
queries.append(("page_token", page_token))
code, msg, data = _do_request(
client, "GET", _LIST_COMMENTS_URI,
paths={"file_token": file_token},
queries=queries,
)
if code != 0:
return tool_error(f"List comments failed: code={code} msg={msg}")
return tool_result(data)
# ---------------------------------------------------------------------------
# feishu_drive_list_comment_replies
# ---------------------------------------------------------------------------
_LIST_REPLIES_URI = "/open-apis/drive/v1/files/:file_token/comments/:comment_id/replies"
FEISHU_DRIVE_LIST_REPLIES_SCHEMA = {
"name": "feishu_drive_list_comment_replies",
"description": "List all replies in a comment thread on a Feishu document.",
"parameters": {
"type": "object",
"properties": {
"file_token": {
"type": "string",
"description": "The document file token.",
},
"comment_id": {
"type": "string",
"description": "The comment ID to list replies for.",
},
"file_type": {
"type": "string",
"description": "File type (default: docx).",
"default": "docx",
},
"page_size": {
"type": "integer",
"description": "Number of replies per page (max 100).",
"default": 100,
},
"page_token": {
"type": "string",
"description": "Pagination token for next page.",
},
},
"required": ["file_token", "comment_id"],
},
}
def _handle_list_replies(args: dict, **kwargs) -> str:
client = get_client()
if client is None:
return tool_error("Feishu client not available")
file_token = args.get("file_token", "").strip()
comment_id = args.get("comment_id", "").strip()
if not file_token or not comment_id:
return tool_error("file_token and comment_id are required")
file_type = args.get("file_type", "docx") or "docx"
page_size = args.get("page_size", 100)
page_token = args.get("page_token", "")
queries = [
("file_type", file_type),
("user_id_type", "open_id"),
("page_size", str(page_size)),
]
if page_token:
queries.append(("page_token", page_token))
code, msg, data = _do_request(
client, "GET", _LIST_REPLIES_URI,
paths={"file_token": file_token, "comment_id": comment_id},
queries=queries,
)
if code != 0:
return tool_error(f"List replies failed: code={code} msg={msg}")
return tool_result(data)
# ---------------------------------------------------------------------------
# feishu_drive_reply_comment
# ---------------------------------------------------------------------------
_REPLY_COMMENT_URI = "/open-apis/drive/v1/files/:file_token/comments/:comment_id/replies"
FEISHU_DRIVE_REPLY_SCHEMA = {
"name": "feishu_drive_reply_comment",
"description": (
"Reply to a local comment thread on a Feishu document. "
"Use this for local (quoted-text) comments. "
"For whole-document comments, use feishu_drive_add_comment instead."
),
"parameters": {
"type": "object",
"properties": {
"file_token": {
"type": "string",
"description": "The document file token.",
},
"comment_id": {
"type": "string",
"description": "The comment ID to reply to.",
},
"content": {
"type": "string",
"description": "The reply text content (plain text only, no markdown).",
},
"file_type": {
"type": "string",
"description": "File type (default: docx).",
"default": "docx",
},
},
"required": ["file_token", "comment_id", "content"],
},
}
def _handle_reply_comment(args: dict, **kwargs) -> str:
client = get_client()
if client is None:
return tool_error("Feishu client not available")
file_token = args.get("file_token", "").strip()
comment_id = args.get("comment_id", "").strip()
content = args.get("content", "").strip()
if not file_token or not comment_id or not content:
return tool_error("file_token, comment_id, and content are required")
file_type = args.get("file_type", "docx") or "docx"
body = {
"content": {
"elements": [
{
"type": "text_run",
"text_run": {"text": content},
}
]
}
}
code, msg, data = _do_request(
client, "POST", _REPLY_COMMENT_URI,
paths={"file_token": file_token, "comment_id": comment_id},
queries=[("file_type", file_type)],
body=body,
)
if code != 0:
return tool_error(f"Reply comment failed: code={code} msg={msg}")
return tool_result(success=True, data=data)
# ---------------------------------------------------------------------------
# feishu_drive_add_comment
# ---------------------------------------------------------------------------
_ADD_COMMENT_URI = "/open-apis/drive/v1/files/:file_token/new_comments"
FEISHU_DRIVE_ADD_COMMENT_SCHEMA = {
"name": "feishu_drive_add_comment",
"description": (
"Add a new whole-document comment on a Feishu document. "
"Use this for whole-document comments or as a fallback when "
"reply_comment fails with code 1069302."
),
"parameters": {
"type": "object",
"properties": {
"file_token": {
"type": "string",
"description": "The document file token.",
},
"content": {
"type": "string",
"description": "The comment text content (plain text only, no markdown).",
},
"file_type": {
"type": "string",
"description": "File type (default: docx).",
"default": "docx",
},
},
"required": ["file_token", "content"],
},
}
def _handle_add_comment(args: dict, **kwargs) -> str:
client = get_client()
if client is None:
return tool_error("Feishu client not available")
file_token = args.get("file_token", "").strip()
content = args.get("content", "").strip()
if not file_token or not content:
return tool_error("file_token and content are required")
file_type = args.get("file_type", "docx") or "docx"
body = {
"file_type": file_type,
"reply_elements": [
{"type": "text", "text": content},
],
}
code, msg, data = _do_request(
client, "POST", _ADD_COMMENT_URI,
paths={"file_token": file_token},
body=body,
)
if code != 0:
return tool_error(f"Add comment failed: code={code} msg={msg}")
return tool_result(success=True, data=data)
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
registry.register(
name="feishu_drive_list_comments",
toolset="feishu_drive",
schema=FEISHU_DRIVE_LIST_COMMENTS_SCHEMA,
handler=_handle_list_comments,
check_fn=_check_feishu,
requires_env=[],
is_async=False,
description="List document comments",
emoji="\U0001f4ac",
)
registry.register(
name="feishu_drive_list_comment_replies",
toolset="feishu_drive",
schema=FEISHU_DRIVE_LIST_REPLIES_SCHEMA,
handler=_handle_list_replies,
check_fn=_check_feishu,
requires_env=[],
is_async=False,
description="List comment replies",
emoji="\U0001f4ac",
)
registry.register(
name="feishu_drive_reply_comment",
toolset="feishu_drive",
schema=FEISHU_DRIVE_REPLY_SCHEMA,
handler=_handle_reply_comment,
check_fn=_check_feishu,
requires_env=[],
is_async=False,
description="Reply to a document comment",
emoji="\u2709\ufe0f",
)
registry.register(
name="feishu_drive_add_comment",
toolset="feishu_drive",
schema=FEISHU_DRIVE_ADD_COMMENT_SCHEMA,
handler=_handle_add_comment,
check_fn=_check_feishu,
requires_env=[],
is_async=False,
description="Add a whole-document comment",
emoji="\u2709\ufe0f",
)

View file

@ -201,6 +201,21 @@ TOOLSETS = {
"includes": []
},
"feishu_doc": {
"description": "Read Feishu/Lark document content",
"tools": ["feishu_doc_read"],
"includes": []
},
"feishu_drive": {
"description": "Feishu/Lark document comment operations (list, reply, add)",
"tools": [
"feishu_drive_list_comments", "feishu_drive_list_comment_replies",
"feishu_drive_reply_comment", "feishu_drive_add_comment",
],
"includes": []
},
# Scenario-specific toolsets