mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
- Full comment handler: parse drive.notice.comment_add_v1 events, build timeline, run agent, deliver reply with chunking support. - 5 tools: feishu_doc_read, feishu_drive_list_comments, feishu_drive_list_comment_replies, feishu_drive_reply_comment, feishu_drive_add_comment. - 3-tier access control rules (exact doc > wildcard "*" > top-level > defaults) with per-field fallback. Config via ~/.hermes/feishu_comment_rules.json, mtime-cached hot-reload. - Self-reply filter using generalized self_open_id (supports future user-identity subscriptions). Receiver check: only process events where the bot is the @mentioned target. - Smart timeline selection, long text chunking, semantic text extraction, session sharing per document, wiki link resolution. Change-Id: I31e82fd6355173dbcc400b8934b6d9799e3137b9
320 lines
11 KiB
Python
320 lines
11 KiB
Python
"""Tests for feishu_comment_rules — 3-tier access control rule engine."""
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from gateway.platforms.feishu_comment_rules import (
|
|
CommentsConfig,
|
|
CommentDocumentRule,
|
|
ResolvedCommentRule,
|
|
_MtimeCache,
|
|
_parse_document_rule,
|
|
has_wiki_keys,
|
|
is_user_allowed,
|
|
load_config,
|
|
pairing_add,
|
|
pairing_list,
|
|
pairing_remove,
|
|
resolve_rule,
|
|
)
|
|
|
|
|
|
class TestCommentDocumentRuleParsing(unittest.TestCase):
|
|
def test_parse_full_rule(self):
|
|
rule = _parse_document_rule({
|
|
"enabled": False,
|
|
"policy": "allowlist",
|
|
"allow_from": ["ou_a", "ou_b"],
|
|
})
|
|
self.assertFalse(rule.enabled)
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertEqual(rule.allow_from, frozenset(["ou_a", "ou_b"]))
|
|
|
|
def test_parse_partial_rule(self):
|
|
rule = _parse_document_rule({"policy": "allowlist"})
|
|
self.assertIsNone(rule.enabled)
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertIsNone(rule.allow_from)
|
|
|
|
def test_parse_empty_rule(self):
|
|
rule = _parse_document_rule({})
|
|
self.assertIsNone(rule.enabled)
|
|
self.assertIsNone(rule.policy)
|
|
self.assertIsNone(rule.allow_from)
|
|
|
|
def test_invalid_policy_ignored(self):
|
|
rule = _parse_document_rule({"policy": "invalid_value"})
|
|
self.assertIsNone(rule.policy)
|
|
|
|
|
|
class TestResolveRule(unittest.TestCase):
|
|
def test_exact_match(self):
|
|
cfg = CommentsConfig(
|
|
policy="pairing",
|
|
allow_from=frozenset(["ou_top"]),
|
|
documents={
|
|
"docx:abc": CommentDocumentRule(policy="allowlist"),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "abc")
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertTrue(rule.match_source.startswith("exact:"))
|
|
|
|
def test_wildcard_match(self):
|
|
cfg = CommentsConfig(
|
|
policy="pairing",
|
|
documents={
|
|
"*": CommentDocumentRule(policy="allowlist"),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "unknown")
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertEqual(rule.match_source, "wildcard")
|
|
|
|
def test_top_level_fallback(self):
|
|
cfg = CommentsConfig(policy="pairing", allow_from=frozenset(["ou_top"]))
|
|
rule = resolve_rule(cfg, "docx", "whatever")
|
|
self.assertEqual(rule.policy, "pairing")
|
|
self.assertEqual(rule.allow_from, frozenset(["ou_top"]))
|
|
self.assertEqual(rule.match_source, "top")
|
|
|
|
def test_exact_overrides_wildcard(self):
|
|
cfg = CommentsConfig(
|
|
policy="pairing",
|
|
documents={
|
|
"*": CommentDocumentRule(policy="pairing"),
|
|
"docx:abc": CommentDocumentRule(policy="allowlist"),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "abc")
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertTrue(rule.match_source.startswith("exact:"))
|
|
|
|
def test_field_by_field_fallback(self):
|
|
"""Exact sets policy, wildcard sets allow_from, enabled from top."""
|
|
cfg = CommentsConfig(
|
|
enabled=True,
|
|
policy="pairing",
|
|
allow_from=frozenset(["ou_top"]),
|
|
documents={
|
|
"*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])),
|
|
"docx:abc": CommentDocumentRule(policy="allowlist"),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "abc")
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertEqual(rule.allow_from, frozenset(["ou_wildcard"]))
|
|
self.assertTrue(rule.enabled)
|
|
|
|
def test_explicit_empty_allow_from_does_not_fall_through(self):
|
|
"""allow_from=[] on exact should NOT inherit from wildcard or top."""
|
|
cfg = CommentsConfig(
|
|
allow_from=frozenset(["ou_top"]),
|
|
documents={
|
|
"*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])),
|
|
"docx:abc": CommentDocumentRule(
|
|
policy="allowlist",
|
|
allow_from=frozenset(),
|
|
),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "abc")
|
|
self.assertEqual(rule.allow_from, frozenset())
|
|
|
|
def test_wiki_token_match(self):
|
|
cfg = CommentsConfig(
|
|
policy="pairing",
|
|
documents={
|
|
"wiki:WIKI123": CommentDocumentRule(policy="allowlist"),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "obj_token", wiki_token="WIKI123")
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertTrue(rule.match_source.startswith("exact:wiki:"))
|
|
|
|
def test_exact_takes_priority_over_wiki(self):
|
|
cfg = CommentsConfig(
|
|
documents={
|
|
"docx:abc": CommentDocumentRule(policy="allowlist"),
|
|
"wiki:WIKI123": CommentDocumentRule(policy="pairing"),
|
|
},
|
|
)
|
|
rule = resolve_rule(cfg, "docx", "abc", wiki_token="WIKI123")
|
|
self.assertEqual(rule.policy, "allowlist")
|
|
self.assertTrue(rule.match_source.startswith("exact:docx:"))
|
|
|
|
def test_default_config(self):
|
|
cfg = CommentsConfig()
|
|
rule = resolve_rule(cfg, "docx", "anything")
|
|
self.assertTrue(rule.enabled)
|
|
self.assertEqual(rule.policy, "pairing")
|
|
self.assertEqual(rule.allow_from, frozenset())
|
|
|
|
|
|
class TestHasWikiKeys(unittest.TestCase):
|
|
def test_no_wiki_keys(self):
|
|
cfg = CommentsConfig(documents={
|
|
"docx:abc": CommentDocumentRule(policy="allowlist"),
|
|
"*": CommentDocumentRule(policy="pairing"),
|
|
})
|
|
self.assertFalse(has_wiki_keys(cfg))
|
|
|
|
def test_has_wiki_keys(self):
|
|
cfg = CommentsConfig(documents={
|
|
"wiki:WIKI123": CommentDocumentRule(policy="allowlist"),
|
|
})
|
|
self.assertTrue(has_wiki_keys(cfg))
|
|
|
|
def test_empty_documents(self):
|
|
cfg = CommentsConfig()
|
|
self.assertFalse(has_wiki_keys(cfg))
|
|
|
|
|
|
class TestIsUserAllowed(unittest.TestCase):
|
|
def test_allowlist_allows_listed(self):
|
|
rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top")
|
|
self.assertTrue(is_user_allowed(rule, "ou_a"))
|
|
|
|
def test_allowlist_denies_unlisted(self):
|
|
rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top")
|
|
self.assertFalse(is_user_allowed(rule, "ou_b"))
|
|
|
|
def test_allowlist_empty_denies_all(self):
|
|
rule = ResolvedCommentRule(True, "allowlist", frozenset(), "top")
|
|
self.assertFalse(is_user_allowed(rule, "ou_anyone"))
|
|
|
|
def test_pairing_allows_in_allow_from(self):
|
|
rule = ResolvedCommentRule(True, "pairing", frozenset(["ou_a"]), "top")
|
|
self.assertTrue(is_user_allowed(rule, "ou_a"))
|
|
|
|
def test_pairing_checks_store(self):
|
|
rule = ResolvedCommentRule(True, "pairing", frozenset(), "top")
|
|
with patch(
|
|
"gateway.platforms.feishu_comment_rules._load_pairing_approved",
|
|
return_value={"ou_approved"},
|
|
):
|
|
self.assertTrue(is_user_allowed(rule, "ou_approved"))
|
|
self.assertFalse(is_user_allowed(rule, "ou_unknown"))
|
|
|
|
|
|
class TestMtimeCache(unittest.TestCase):
|
|
def test_returns_empty_dict_for_missing_file(self):
|
|
cache = _MtimeCache(Path("/nonexistent/path.json"))
|
|
self.assertEqual(cache.load(), {})
|
|
|
|
def test_reads_file_and_caches(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump({"key": "value"}, f)
|
|
f.flush()
|
|
path = Path(f.name)
|
|
try:
|
|
cache = _MtimeCache(path)
|
|
data = cache.load()
|
|
self.assertEqual(data, {"key": "value"})
|
|
# Second load should use cache (same mtime)
|
|
data2 = cache.load()
|
|
self.assertEqual(data2, {"key": "value"})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_reloads_on_mtime_change(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump({"v": 1}, f)
|
|
f.flush()
|
|
path = Path(f.name)
|
|
try:
|
|
cache = _MtimeCache(path)
|
|
self.assertEqual(cache.load(), {"v": 1})
|
|
# Modify file
|
|
time.sleep(0.05)
|
|
with open(path, "w") as f2:
|
|
json.dump({"v": 2}, f2)
|
|
# Force mtime change detection
|
|
os.utime(path, (time.time() + 1, time.time() + 1))
|
|
self.assertEqual(cache.load(), {"v": 2})
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestLoadConfig(unittest.TestCase):
|
|
def test_load_with_documents(self):
|
|
raw = {
|
|
"enabled": True,
|
|
"policy": "allowlist",
|
|
"allow_from": ["ou_a"],
|
|
"documents": {
|
|
"*": {"policy": "pairing"},
|
|
"docx:abc": {"policy": "allowlist", "allow_from": ["ou_b"]},
|
|
},
|
|
}
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump(raw, f)
|
|
path = Path(f.name)
|
|
try:
|
|
with patch("gateway.platforms.feishu_comment_rules.RULES_FILE", path):
|
|
with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(path)):
|
|
cfg = load_config()
|
|
self.assertTrue(cfg.enabled)
|
|
self.assertEqual(cfg.policy, "allowlist")
|
|
self.assertEqual(cfg.allow_from, frozenset(["ou_a"]))
|
|
self.assertIn("*", cfg.documents)
|
|
self.assertIn("docx:abc", cfg.documents)
|
|
self.assertEqual(cfg.documents["docx:abc"].policy, "allowlist")
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_load_missing_file_returns_defaults(self):
|
|
with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(Path("/nonexistent"))):
|
|
cfg = load_config()
|
|
self.assertTrue(cfg.enabled)
|
|
self.assertEqual(cfg.policy, "pairing")
|
|
self.assertEqual(cfg.allow_from, frozenset())
|
|
self.assertEqual(cfg.documents, {})
|
|
|
|
|
|
class TestPairingStore(unittest.TestCase):
|
|
def setUp(self):
|
|
self._tmpdir = tempfile.mkdtemp()
|
|
self._pairing_file = Path(self._tmpdir) / "pairing.json"
|
|
with open(self._pairing_file, "w") as f:
|
|
json.dump({"approved": {}}, f)
|
|
self._patcher_file = patch("gateway.platforms.feishu_comment_rules.PAIRING_FILE", self._pairing_file)
|
|
self._patcher_cache = patch(
|
|
"gateway.platforms.feishu_comment_rules._pairing_cache",
|
|
_MtimeCache(self._pairing_file),
|
|
)
|
|
self._patcher_file.start()
|
|
self._patcher_cache.start()
|
|
|
|
def tearDown(self):
|
|
self._patcher_cache.stop()
|
|
self._patcher_file.stop()
|
|
if self._pairing_file.exists():
|
|
self._pairing_file.unlink()
|
|
os.rmdir(self._tmpdir)
|
|
|
|
def test_add_and_list(self):
|
|
self.assertTrue(pairing_add("ou_new"))
|
|
approved = pairing_list()
|
|
self.assertIn("ou_new", approved)
|
|
|
|
def test_add_duplicate(self):
|
|
pairing_add("ou_a")
|
|
self.assertFalse(pairing_add("ou_a"))
|
|
|
|
def test_remove(self):
|
|
pairing_add("ou_a")
|
|
self.assertTrue(pairing_remove("ou_a"))
|
|
self.assertNotIn("ou_a", pairing_list())
|
|
|
|
def test_remove_nonexistent(self):
|
|
self.assertFalse(pairing_remove("ou_nobody"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|