fix(qqbot): authorize approval button interactions by session owner (#30737)

This commit is contained in:
Teknium 2026-05-24 04:25:12 -07:00 committed by GitHub
parent e4a1220f83
commit 3e78e353d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 93 additions and 12 deletions

View file

@ -1054,6 +1054,46 @@ class QQAdapter(BasePlatformAdapter):
"deny": "deny",
}
@staticmethod
def _parse_gateway_session_key(session_key: str) -> Optional[Dict[str, str]]:
"""Parse ``agent:main:<platform>:<chat_type>:<chat_id>[:<user_id>]``."""
parts = str(session_key or "").split(":")
if len(parts) < 5 or parts[0] != "agent" or parts[1] != "main":
return None
parsed = {
"platform": parts[2],
"chat_type": parts[3],
"chat_id": parts[4],
}
if len(parts) > 5:
parsed["user_id"] = parts[5]
return parsed
def _is_authorized_interaction_for_session(
self,
event: InteractionEvent,
session_key: str,
) -> bool:
"""Authorize approval/update interactions against session + operator."""
parsed = self._parse_gateway_session_key(session_key)
operator = str(event.operator_openid or "").strip()
if not parsed or parsed.get("platform") != "qqbot" or not operator:
return False
chat_type = parsed.get("chat_type", "")
chat_id = parsed.get("chat_id", "")
if chat_type == "c2c":
return bool(chat_id) and operator == chat_id
if chat_type in {"group", "guild"}:
event_chat = str(event.group_openid or event.guild_id or "").strip()
if not event_chat or event_chat != chat_id:
return False
session_user = str(parsed.get("user_id", "")).strip()
return bool(session_user) and operator == session_user
return False
async def _default_interaction_dispatch(
self,
event: InteractionEvent,
@ -1087,6 +1127,13 @@ class QQAdapter(BasePlatformAdapter):
self._log_tag, decision, session_key,
)
return
if not self._is_authorized_interaction_for_session(event, session_key):
logger.warning(
"[%s] Rejected unauthorized approval click for session %s "
"(operator=%s)",
self._log_tag, session_key, event.operator_openid,
)
return
try:
# Import lazily to keep the adapter importable in tests that
# don't exercise the approval subsystem.
@ -1107,6 +1154,13 @@ class QQAdapter(BasePlatformAdapter):
update_answer = parse_update_prompt_button_data(button_data)
if update_answer is not None:
update_session_key = f"agent:main:qqbot:{event.scene}:{event.group_openid or event.guild_id or event.user_openid}"
if not self._is_authorized_interaction_for_session(event, update_session_key):
logger.warning(
"[%s] Rejected unauthorized update prompt click (operator=%s)",
self._log_tag, event.operator_openid,
)
return
self._write_update_response(update_answer, event.operator_openid)
return

View file

@ -1233,14 +1233,14 @@ class TestAdapterInteractionDispatch:
"user_openid": "user-1",
"data": {
"type": 11,
"resolved": {"button_data": "approve:s:deny", "button_id": "deny"},
"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny", "button_id": "deny"},
},
})
assert len(ack_calls) == 1
assert ack_calls[0][0] == "i-1"
assert len(received) == 1
assert received[0].button_data == "approve:s:deny"
assert received[0].button_data == "approve:agent:main:qqbot:c2c:u:deny"
assert received[0].scene == "c2c"
@pytest.mark.asyncio
@ -1262,7 +1262,7 @@ class TestAdapterInteractionDispatch:
adapter.set_interaction_callback(cb)
await adapter._on_interaction({
"chat_type": 2, # no id
"data": {"resolved": {"button_data": "approve:s:deny"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}},
})
assert ack_calls == []
@ -1286,7 +1286,7 @@ class TestAdapterInteractionDispatch:
"id": "i-2",
"chat_type": 2,
"user_openid": "u",
"data": {"resolved": {"button_data": "approve:s:deny"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}},
})
@pytest.mark.asyncio
@ -1304,7 +1304,7 @@ class TestAdapterInteractionDispatch:
"id": "i-3",
"chat_type": 2,
"user_openid": "u",
"data": {"resolved": {"button_data": "approve:s:deny"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}},
})
@ -1570,13 +1570,13 @@ class TestDefaultInteractionDispatch:
"id": "i",
"chat_type": 2,
"user_openid": "u-42",
"data": {"resolved": {"button_data": "approve:sess-abc:allow-once"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u-42:allow-once"}},
})
await adapter._default_interaction_dispatch(event)
finally:
tools.approval.resolve_gateway_approval = orig
assert resolve_calls == [("sess-abc", "once", False)]
assert resolve_calls == [("agent:main:qqbot:c2c:u-42", "once", False)]
@pytest.mark.asyncio
async def test_approval_click_always_maps_to_always(self):
@ -1594,13 +1594,13 @@ class TestDefaultInteractionDispatch:
from gateway.platforms.qqbot.keyboards import parse_interaction_event
event = parse_interaction_event({
"id": "i", "chat_type": 2, "user_openid": "u",
"data": {"resolved": {"button_data": "approve:s:allow-always"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:allow-always"}},
})
await adapter._default_interaction_dispatch(event)
finally:
tools.approval.resolve_gateway_approval = orig
assert resolve_calls == [("s", "always", False)]
assert resolve_calls == [("agent:main:qqbot:c2c:u", "always", False)]
@pytest.mark.asyncio
async def test_approval_click_deny_maps_to_deny(self):
@ -1618,13 +1618,40 @@ class TestDefaultInteractionDispatch:
from gateway.platforms.qqbot.keyboards import parse_interaction_event
event = parse_interaction_event({
"id": "i", "chat_type": 2, "user_openid": "u",
"data": {"resolved": {"button_data": "approve:s:deny"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}},
})
await adapter._default_interaction_dispatch(event)
finally:
tools.approval.resolve_gateway_approval = orig
assert resolve_calls == [("s", "deny", False)]
assert resolve_calls == [("agent:main:qqbot:c2c:u", "deny", False)]
@pytest.mark.asyncio
async def test_approval_click_rejects_unauthorized_operator(self):
adapter = self._make_adapter()
resolve_calls = []
def fake_resolve(session_key, choice, resolve_all=False):
resolve_calls.append((session_key, choice, resolve_all))
return 1
import tools.approval
orig = tools.approval.resolve_gateway_approval
tools.approval.resolve_gateway_approval = fake_resolve
try:
from gateway.platforms.qqbot.keyboards import parse_interaction_event
event = parse_interaction_event({
"id": "i", "chat_type": 1,
"group_openid": "g-1",
"group_member_openid": "attacker",
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:group:g-1:owner:allow-once"}},
})
await adapter._default_interaction_dispatch(event)
finally:
tools.approval.resolve_gateway_approval = orig
assert resolve_calls == []
@pytest.mark.asyncio
async def test_update_prompt_click_writes_response_file(self, tmp_path, monkeypatch):
@ -1700,7 +1727,7 @@ class TestDefaultInteractionDispatch:
from gateway.platforms.qqbot.keyboards import parse_interaction_event
event = parse_interaction_event({
"id": "i", "chat_type": 2, "user_openid": "u",
"data": {"resolved": {"button_data": "approve:s:deny"}},
"data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}},
})
# Must not raise.
await adapter._default_interaction_dispatch(event)