diff --git a/gateway/platforms/qqbot/adapter.py b/gateway/platforms/qqbot/adapter.py index 2c2124c56ef..7569884760e 100644 --- a/gateway/platforms/qqbot/adapter.py +++ b/gateway/platforms/qqbot/adapter.py @@ -1054,6 +1054,46 @@ class QQAdapter(BasePlatformAdapter): "deny": "deny", } + @staticmethod + def _parse_gateway_session_key(session_key: str) -> Optional[Dict[str, str]]: + """Parse ``agent:main:::[:]``.""" + parts = str(session_key or "").split(":") + if len(parts) < 5 or parts[0] != "agent" or parts[1] != "main": + return None + parsed = { + "platform": parts[2], + "chat_type": parts[3], + "chat_id": parts[4], + } + if len(parts) > 5: + parsed["user_id"] = parts[5] + return parsed + + def _is_authorized_interaction_for_session( + self, + event: InteractionEvent, + session_key: str, + ) -> bool: + """Authorize approval/update interactions against session + operator.""" + parsed = self._parse_gateway_session_key(session_key) + operator = str(event.operator_openid or "").strip() + if not parsed or parsed.get("platform") != "qqbot" or not operator: + return False + + chat_type = parsed.get("chat_type", "") + chat_id = parsed.get("chat_id", "") + if chat_type == "c2c": + return bool(chat_id) and operator == chat_id + + if chat_type in {"group", "guild"}: + event_chat = str(event.group_openid or event.guild_id or "").strip() + if not event_chat or event_chat != chat_id: + return False + session_user = str(parsed.get("user_id", "")).strip() + return bool(session_user) and operator == session_user + + return False + async def _default_interaction_dispatch( self, event: InteractionEvent, @@ -1087,6 +1127,13 @@ class QQAdapter(BasePlatformAdapter): self._log_tag, decision, session_key, ) return + if not self._is_authorized_interaction_for_session(event, session_key): + logger.warning( + "[%s] Rejected unauthorized approval click for session %s " + "(operator=%s)", + self._log_tag, session_key, event.operator_openid, + ) + return try: # Import lazily to keep the adapter importable in tests that # don't exercise the approval subsystem. @@ -1107,6 +1154,13 @@ class QQAdapter(BasePlatformAdapter): update_answer = parse_update_prompt_button_data(button_data) if update_answer is not None: + update_session_key = f"agent:main:qqbot:{event.scene}:{event.group_openid or event.guild_id or event.user_openid}" + if not self._is_authorized_interaction_for_session(event, update_session_key): + logger.warning( + "[%s] Rejected unauthorized update prompt click (operator=%s)", + self._log_tag, event.operator_openid, + ) + return self._write_update_response(update_answer, event.operator_openid) return diff --git a/tests/gateway/test_qqbot.py b/tests/gateway/test_qqbot.py index 8b02fc92cfe..bdcb4c9e8df 100644 --- a/tests/gateway/test_qqbot.py +++ b/tests/gateway/test_qqbot.py @@ -1233,14 +1233,14 @@ class TestAdapterInteractionDispatch: "user_openid": "user-1", "data": { "type": 11, - "resolved": {"button_data": "approve:s:deny", "button_id": "deny"}, + "resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny", "button_id": "deny"}, }, }) assert len(ack_calls) == 1 assert ack_calls[0][0] == "i-1" assert len(received) == 1 - assert received[0].button_data == "approve:s:deny" + assert received[0].button_data == "approve:agent:main:qqbot:c2c:u:deny" assert received[0].scene == "c2c" @pytest.mark.asyncio @@ -1262,7 +1262,7 @@ class TestAdapterInteractionDispatch: adapter.set_interaction_callback(cb) await adapter._on_interaction({ "chat_type": 2, # no id - "data": {"resolved": {"button_data": "approve:s:deny"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}}, }) assert ack_calls == [] @@ -1286,7 +1286,7 @@ class TestAdapterInteractionDispatch: "id": "i-2", "chat_type": 2, "user_openid": "u", - "data": {"resolved": {"button_data": "approve:s:deny"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}}, }) @pytest.mark.asyncio @@ -1304,7 +1304,7 @@ class TestAdapterInteractionDispatch: "id": "i-3", "chat_type": 2, "user_openid": "u", - "data": {"resolved": {"button_data": "approve:s:deny"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}}, }) @@ -1570,13 +1570,13 @@ class TestDefaultInteractionDispatch: "id": "i", "chat_type": 2, "user_openid": "u-42", - "data": {"resolved": {"button_data": "approve:sess-abc:allow-once"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u-42:allow-once"}}, }) await adapter._default_interaction_dispatch(event) finally: tools.approval.resolve_gateway_approval = orig - assert resolve_calls == [("sess-abc", "once", False)] + assert resolve_calls == [("agent:main:qqbot:c2c:u-42", "once", False)] @pytest.mark.asyncio async def test_approval_click_always_maps_to_always(self): @@ -1594,13 +1594,13 @@ class TestDefaultInteractionDispatch: from gateway.platforms.qqbot.keyboards import parse_interaction_event event = parse_interaction_event({ "id": "i", "chat_type": 2, "user_openid": "u", - "data": {"resolved": {"button_data": "approve:s:allow-always"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:allow-always"}}, }) await adapter._default_interaction_dispatch(event) finally: tools.approval.resolve_gateway_approval = orig - assert resolve_calls == [("s", "always", False)] + assert resolve_calls == [("agent:main:qqbot:c2c:u", "always", False)] @pytest.mark.asyncio async def test_approval_click_deny_maps_to_deny(self): @@ -1618,13 +1618,40 @@ class TestDefaultInteractionDispatch: from gateway.platforms.qqbot.keyboards import parse_interaction_event event = parse_interaction_event({ "id": "i", "chat_type": 2, "user_openid": "u", - "data": {"resolved": {"button_data": "approve:s:deny"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}}, }) await adapter._default_interaction_dispatch(event) finally: tools.approval.resolve_gateway_approval = orig - assert resolve_calls == [("s", "deny", False)] + assert resolve_calls == [("agent:main:qqbot:c2c:u", "deny", False)] + + + @pytest.mark.asyncio + async def test_approval_click_rejects_unauthorized_operator(self): + adapter = self._make_adapter() + resolve_calls = [] + + def fake_resolve(session_key, choice, resolve_all=False): + resolve_calls.append((session_key, choice, resolve_all)) + return 1 + + import tools.approval + orig = tools.approval.resolve_gateway_approval + tools.approval.resolve_gateway_approval = fake_resolve + try: + from gateway.platforms.qqbot.keyboards import parse_interaction_event + event = parse_interaction_event({ + "id": "i", "chat_type": 1, + "group_openid": "g-1", + "group_member_openid": "attacker", + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:group:g-1:owner:allow-once"}}, + }) + await adapter._default_interaction_dispatch(event) + finally: + tools.approval.resolve_gateway_approval = orig + + assert resolve_calls == [] @pytest.mark.asyncio async def test_update_prompt_click_writes_response_file(self, tmp_path, monkeypatch): @@ -1700,7 +1727,7 @@ class TestDefaultInteractionDispatch: from gateway.platforms.qqbot.keyboards import parse_interaction_event event = parse_interaction_event({ "id": "i", "chat_type": 2, "user_openid": "u", - "data": {"resolved": {"button_data": "approve:s:deny"}}, + "data": {"resolved": {"button_data": "approve:agent:main:qqbot:c2c:u:deny"}}, }) # Must not raise. await adapter._default_interaction_dispatch(event)