diff --git a/cron/scheduler.py b/cron/scheduler.py index f694f4407..8d71248b4 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -158,6 +158,44 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]: } +# Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background +_AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'}) +_VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'}) +_IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'}) + + +def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: dict | None, loop, job: dict) -> None: + """Send extracted MEDIA files as native platform attachments via a live adapter. + + Routes each file to the appropriate adapter method (send_voice, send_image_file, + send_video, send_document) based on file extension — mirroring the routing logic + in ``BasePlatformAdapter._process_message_background``. + """ + from pathlib import Path + + for media_path, _is_voice in media_files: + try: + ext = Path(media_path).suffix.lower() + if ext in _AUDIO_EXTS: + coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata) + elif ext in _VIDEO_EXTS: + coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata) + elif ext in _IMAGE_EXTS: + coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata) + else: + coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata) + + future = asyncio.run_coroutine_threadsafe(coro, loop) + result = future.result(timeout=30) + if result and not getattr(result, "success", True): + logger.warning( + "Job '%s': media send failed for %s: %s", + job.get("id", "?"), media_path, getattr(result, "error", "unknown"), + ) + except Exception as e: + logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e) + + def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: """ Deliver job output to the configured target (origin chat, specific platform, etc.). @@ -246,18 +284,28 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): send_metadata = {"thread_id": thread_id} if thread_id else None try: - future = asyncio.run_coroutine_threadsafe( - runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata), - loop, - ) - send_result = future.result(timeout=60) - if send_result and not getattr(send_result, "success", True): - err = getattr(send_result, "error", "unknown") - logger.warning( - "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", - job["id"], platform_name, chat_id, err, + # Send cleaned text (MEDIA tags stripped) — not the raw content + text_to_send = cleaned_delivery_content.strip() + adapter_ok = True + if text_to_send: + future = asyncio.run_coroutine_threadsafe( + runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), + loop, ) - else: + send_result = future.result(timeout=60) + if send_result and not getattr(send_result, "success", True): + err = getattr(send_result, "error", "unknown") + logger.warning( + "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", + job["id"], platform_name, chat_id, err, + ) + adapter_ok = False # fall through to standalone path + + # Send extracted media files as native attachments via the live adapter + if adapter_ok and media_files: + _send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job) + + if adapter_ok: logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) return except Exception as e: diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index c12828977..baa6be9de 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -277,6 +277,188 @@ class TestDeliverResultWrapping: # Media files should be forwarded separately assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)] + def test_live_adapter_sends_media_as_attachments(self): + """When a live adapter is available, MEDIA files should be sent as native + platform attachments (e.g., Discord voice, Telegram audio) rather than + as literal 'MEDIA:/path' text.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + adapter.send_voice.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.DISCORD: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + # run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg) + def fake_run_coro(coro, _loop): + future = Future() + future.set_result(MagicMock(success=True)) + coro.close() + return future + + job = { + "id": "tts-job", + "deliver": "origin", + "origin": {"platform": "discord", "chat_id": "9876"}, + } + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + _deliver_result( + job, + "Here is TTS\nMEDIA:/tmp/cron-voice.mp3", + adapters={Platform.DISCORD: adapter}, + loop=loop, + ) + + # Text should be sent without the MEDIA tag + adapter.send.assert_called_once() + text_sent = adapter.send.call_args[0][1] + assert "MEDIA:" not in text_sent + assert "Here is TTS" in text_sent + + # Audio file should be sent as a voice attachment + adapter.send_voice.assert_called_once() + voice_call = adapter.send_voice.call_args + assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3" + + def test_live_adapter_routes_image_to_send_image_file(self): + """Image MEDIA files should be routed to send_image_file, not send_voice.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + adapter.send_image_file.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.DISCORD: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + def fake_run_coro(coro, _loop): + future = Future() + future.set_result(MagicMock(success=True)) + coro.close() + return future + + job = { + "id": "img-job", + "deliver": "origin", + "origin": {"platform": "discord", "chat_id": "1234"}, + } + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + _deliver_result( + job, + "Chart attached\nMEDIA:/tmp/chart.png", + adapters={Platform.DISCORD: adapter}, + loop=loop, + ) + + adapter.send_image_file.assert_called_once() + assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png" + adapter.send_voice.assert_not_called() + + def test_live_adapter_media_only_no_text(self): + """When content is ONLY a MEDIA tag with no text, media should still be sent.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send_voice.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + def fake_run_coro(coro, _loop): + future = Future() + future.set_result(MagicMock(success=True)) + coro.close() + return future + + job = { + "id": "voice-only", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "999"}, + } + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + _deliver_result( + job, + "MEDIA:/tmp/voice.ogg", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + # Text send should NOT be called (no text after stripping MEDIA tag) + adapter.send.assert_not_called() + # Audio should still be delivered + adapter.send_voice.assert_called_once() + + def test_live_adapter_sends_cleaned_text_not_raw(self): + """The live adapter path must send cleaned text (MEDIA tags stripped), + not the raw delivery_content with embedded MEDIA: tags.""" + from gateway.config import Platform + from concurrent.futures import Future + + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + def fake_run_coro(coro, _loop): + future = Future() + future.set_result(MagicMock(success=True)) + coro.close() + return future + + job = { + "id": "img-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "555"}, + } + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + _deliver_result( + job, + "Report\nMEDIA:/tmp/chart.png", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + text_sent = adapter.send.call_args[0][1] + assert "MEDIA:" not in text_sent + assert "Report" in text_sent + def test_no_mirror_to_session_call(self): """Cron deliveries should NOT mirror into the gateway session.""" from gateway.config import Platform