fix(signal): share markdown formatting across send paths

Route Signal send paths through shared markdown formatting helpers and render markdown bullets consistently as Unicode bullets. Add coverage for Signal formatting and send_message integration.
This commit is contained in:
lkz-de 2026-06-15 02:52:39 +02:00 committed by kshitijk4poor
parent 15852722d4
commit 905820b59f
5 changed files with 306 additions and 140 deletions

View file

@ -39,6 +39,7 @@ from gateway.platforms.base import (
cache_image_from_url,
)
from gateway.platforms.helpers import redact_phone
from gateway.platforms.signal_format import markdown_to_signal
from gateway.platforms.signal_rate_limit import (
SIGNAL_BATCH_PACING_NOTICE_THRESHOLD,
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
@ -822,143 +823,8 @@ class SignalAdapter(BasePlatformAdapter):
@staticmethod
def _markdown_to_signal(text: str) -> tuple:
"""Convert markdown to plain text + Signal textStyles list.
Signal doesn't render markdown. Instead it uses ``bodyRanges``
(exposed by signal-cli as ``textStyle`` / ``textStyles`` params)
with the format ``start:length:STYLE``.
Positions are measured in **UTF-16 code units** (not Python code
points) because that's what the Signal protocol uses.
Supported styles: BOLD, ITALIC, STRIKETHROUGH, MONOSPACE.
(Signal's SPOILER style is not currently mapped — no standard
markdown syntax for it; would need ``||spoiler||`` parsing.)
Returns ``(plain_text, styles_list)`` where *styles_list* may be
empty if there's nothing to format.
"""
import re
def _utf16_len(s: str) -> int:
"""Length of *s* in UTF-16 code units."""
return len(s.encode("utf-16-le")) // 2
# Pre-process: normalize whitespace before any position tracking
# so later operations don't invalidate recorded offsets.
text = re.sub(r"\n{3,}", "\n\n", text)
text = text.strip()
styles: list = []
# --- Phase 1: fenced code blocks ```...``` → MONOSPACE ---
_CB = re.compile(r"```[a-zA-Z0-9_+-]*\n?(.*?)```", re.DOTALL)
while m := _CB.search(text):
inner = m.group(1).rstrip("\n")
start = m.start()
text = text[: m.start()] + inner + text[m.end() :]
styles.append((start, len(inner), "MONOSPACE"))
# --- Phase 2: heading markers # Foo → Foo (BOLD) ---
_HEADING = re.compile(r"^#{1,6}\s+", re.MULTILINE)
new_text = ""
last_end = 0
for m in _HEADING.finditer(text):
new_text += text[last_end : m.start()]
last_end = m.end()
eol = text.find("\n", m.end())
if eol == -1:
eol = len(text)
heading_text = text[m.end() : eol]
start = len(new_text)
new_text += heading_text
styles.append((start, len(heading_text), "BOLD"))
last_end = eol
new_text += text[last_end:]
text = new_text
# --- Phase 3: inline patterns (single-pass to avoid offset drift) ---
# The old code processed each pattern sequentially, stripping markers
# and recording positions per-pass. Later passes shifted text without
# adjusting earlier positions → bold/italic landed mid-word.
#
# Fix: collect ALL non-overlapping matches first, then strip every
# marker in one pass so positions are computed against the final text.
_PATTERNS = [
(re.compile(r"\*\*(.+?)\*\*", re.DOTALL), "BOLD"),
(re.compile(r"__(.+?)__", re.DOTALL), "BOLD"),
(re.compile(r"~~(.+?)~~", re.DOTALL), "STRIKETHROUGH"),
(re.compile(r"`(.+?)`"), "MONOSPACE"),
(re.compile(r"(?<!\*)\*(?!\*| )(.+?)(?<!\*)\*(?!\*)"), "ITALIC"),
(re.compile(r"(?<!\w)_(?!_)(.+?)(?<!_)_(?!\w)"), "ITALIC"),
]
# Collect all non-overlapping matches (earlier patterns win ties).
all_matches: list = [] # (start, end, g1_start, g1_end, style)
occupied: list = [] # (start, end) intervals already claimed
for pat, style in _PATTERNS:
for m in pat.finditer(text):
ms, me = m.start(), m.end()
if not any(ms < oe and me > os for os, oe in occupied):
all_matches.append((ms, me, m.start(1), m.end(1), style))
occupied.append((ms, me))
all_matches.sort()
# Build removal list so we can adjust Phase 1/2 styles.
# Each match removes its prefix markers (start..g1_start) and
# suffix markers (g1_end..end).
removals: list = [] # (position, length) sorted
for ms, me, g1s, g1e, _ in all_matches:
if g1s > ms:
removals.append((ms, g1s - ms))
if me > g1e:
removals.append((g1e, me - g1e))
removals.sort()
# Adjust Phase 1/2 styles for characters about to be removed.
def _adj(pos: int) -> int:
shift = 0
for rp, rl in removals:
if rp < pos:
shift += min(rl, pos - rp)
else:
break
return pos - shift
adjusted_prior: list = []
for s, l, st in styles:
ns = _adj(s)
ne = _adj(s + l)
if ne > ns:
adjusted_prior.append((ns, ne - ns, st))
# Strip all inline markers in one pass → positions are correct.
result = ""
last_end = 0
inline_styles: list = []
for ms, me, g1s, g1e, sty in all_matches:
result += text[last_end:ms]
pos = len(result)
inner = text[g1s:g1e]
result += inner
inline_styles.append((pos, len(inner), sty))
last_end = me
result += text[last_end:]
text = result
styles = adjusted_prior + inline_styles
# Convert code-point offsets → UTF-16 code-unit offsets
style_strings = []
for cp_start, cp_len, stype in sorted(styles):
# Safety: skip any out-of-bounds styles
if cp_start < 0 or cp_start + cp_len > len(text):
continue
u16_start = _utf16_len(text[:cp_start])
u16_len = _utf16_len(text[cp_start : cp_start + cp_len])
style_strings.append(f"{u16_start}:{u16_len}:{stype}")
return text, style_strings
"""Backward-compatible wrapper around shared Signal formatting helper."""
return markdown_to_signal(text)
def format_message(self, content: str) -> str:
"""Strip markdown for plain-text fallback (used by base class).

View file

@ -0,0 +1,140 @@
"""Shared Signal formatting helpers.
Keep markdown Signal native formatting conversion in one place so both the
live Signal adapter and standalone send paths emit the same bodyRanges.
"""
from __future__ import annotations
import re
def markdown_to_signal(text: str) -> tuple[str, list[str]]:
"""Convert markdown to plain text + Signal textStyles list.
Signal doesn't render markdown. Instead it uses ``bodyRanges`` (exposed by
signal-cli as ``textStyle`` / ``textStyles`` params) with the format
``start:length:STYLE``.
Positions are measured in UTF-16 code units because that's what the Signal
protocol uses.
Supported styles: BOLD, ITALIC, STRIKETHROUGH, MONOSPACE.
"""
def _utf16_len(s: str) -> int:
"""Length of *s* in UTF-16 code units."""
return len(s.encode("utf-16-le")) // 2
def _normalize_bullet_markers(source: str) -> str:
"""Replace Markdown bullet markers with plain Unicode bullets.
Signal does not render Markdown list syntax, so ``- item`` and
``* item`` otherwise arrive as literal Markdown markers. Preserve
fenced code blocks byte-for-byte; list-looking lines inside code are
code, not prose bullets.
"""
parts = re.split(r"(```.*?```)", source, flags=re.DOTALL)
for idx, part in enumerate(parts):
if idx % 2 == 1:
continue
parts[idx] = re.sub(r"(?m)^([ \t]{0,3})[-*+]\s+", r"\1• ", part)
return "".join(parts)
text = re.sub(r"\n{3,}", "\n\n", text)
text = text.strip()
text = _normalize_bullet_markers(text)
styles: list[tuple[int, int, str]] = []
code_block = re.compile(r"```[a-zA-Z0-9_+-]*\n?(.*?)```", re.DOTALL)
while match := code_block.search(text):
inner = match.group(1).rstrip("\n")
start = match.start()
text = text[: match.start()] + inner + text[match.end() :]
styles.append((start, len(inner), "MONOSPACE"))
heading = re.compile(r"^#{1,6}\s+", re.MULTILINE)
new_text = ""
last_end = 0
for match in heading.finditer(text):
new_text += text[last_end : match.start()]
last_end = match.end()
eol = text.find("\n", match.end())
if eol == -1:
eol = len(text)
heading_text = text[match.end() : eol]
start = len(new_text)
new_text += heading_text
styles.append((start, len(heading_text), "BOLD"))
last_end = eol
new_text += text[last_end:]
text = new_text
patterns = [
(re.compile(r"\*\*(.+?)\*\*", re.DOTALL), "BOLD"),
(re.compile(r"__(.+?)__", re.DOTALL), "BOLD"),
(re.compile(r"~~(.+?)~~", re.DOTALL), "STRIKETHROUGH"),
(re.compile(r"`(.+?)`"), "MONOSPACE"),
(re.compile(r"(?<!\*)\*(?!\*| )(.+?)(?<!\*)\*(?!\*)"), "ITALIC"),
(re.compile(r"(?<!\w)_(?!_)(.+?)(?<!_)_(?!\w)"), "ITALIC"),
]
all_matches: list[tuple[int, int, int, int, str]] = []
occupied: list[tuple[int, int]] = []
for pattern, style in patterns:
for match in pattern.finditer(text):
ms, me = match.start(), match.end()
if not any(ms < oe and me > os for os, oe in occupied):
all_matches.append((ms, me, match.start(1), match.end(1), style))
occupied.append((ms, me))
all_matches.sort()
removals: list[tuple[int, int]] = []
for ms, me, g1s, g1e, _ in all_matches:
if g1s > ms:
removals.append((ms, g1s - ms))
if me > g1e:
removals.append((g1e, me - g1e))
removals.sort()
def _adjust(pos: int) -> int:
shift = 0
for remove_pos, remove_len in removals:
if remove_pos < pos:
shift += min(remove_len, pos - remove_pos)
else:
break
return pos - shift
adjusted_prior: list[tuple[int, int, str]] = []
for start, length, style in styles:
new_start = _adjust(start)
new_end = _adjust(start + length)
if new_end > new_start:
adjusted_prior.append((new_start, new_end - new_start, style))
result = ""
last_end = 0
inline_styles: list[tuple[int, int, str]] = []
for ms, me, g1s, g1e, style in all_matches:
result += text[last_end:ms]
pos = len(result)
inner = text[g1s:g1e]
result += inner
inline_styles.append((pos, len(inner), style))
last_end = me
result += text[last_end:]
text = result
styles = adjusted_prior + inline_styles
style_strings: list[str] = []
for cp_start, cp_len, style_type in sorted(styles):
if cp_start < 0 or cp_start + cp_len > len(text):
continue
u16_start = _utf16_len(text[:cp_start])
u16_len = _utf16_len(text[cp_start : cp_start + cp_len])
style_strings.append(f"{u16_start}:{u16_len}:{style_type}")
return text, style_strings

View file

@ -9,6 +9,7 @@ import pytest
from gateway.config import PlatformConfig
from gateway.platforms.signal import SignalAdapter
from gateway.platforms.signal_format import markdown_to_signal
# ---------------------------------------------------------------------------
@ -20,6 +21,11 @@ def _m2s(text: str):
return SignalAdapter._markdown_to_signal(text)
def test_shared_helper_matches_signal_adapter_wrapper():
text = "🙂 **bold** and `code`"
assert markdown_to_signal(text) == SignalAdapter._markdown_to_signal(text)
def _style_types(styles: list[str]) -> list[str]:
"""Extract just the STYLE part from '0:4:BOLD' strings."""
return [s.rsplit(":", 1)[1] for s in styles]
@ -138,8 +144,29 @@ class TestItalicFalsePositives:
"""* item lines must NOT be treated as italic delimiters."""
md = "* item one\n* item two\n* item three"
text, styles = _m2s(md)
assert text == "• item one\n• item two\n• item three"
assert _find_style(styles, "ITALIC") == []
def test_hyphen_bullet_list_uses_signal_safe_bullets(self):
"""Signal does not render Markdown list markers; normalize them."""
md = "- item one\n- item two"
text, styles = _m2s(md)
assert text == "• item one\n• item two"
assert styles == []
def test_plus_bullet_list_uses_signal_safe_bullets(self):
md = "+ item one\n+ item two"
text, styles = _m2s(md)
assert text == "• item one\n• item two"
assert styles == []
def test_markdown_bullets_inside_fenced_code_are_preserved(self):
md = "before\n```\n- literal\n* literal\n```\nafter"
text, styles = _m2s(md)
assert "- literal\n* literal" in text
assert "• literal" not in text
assert any(s.endswith(":MONOSPACE") for s in styles)
def test_bullet_list_with_content_before(self):
md = "Here are things:\n\n* first thing\n* second thing"
text, styles = _m2s(md)

View file

@ -1189,6 +1189,18 @@ class TestParseTargetRefE164:
assert thread_id is None
assert is_explicit is True
def test_signal_group_target_is_explicit(self):
chat_id, thread_id, is_explicit = _parse_target_ref("signal", " group:abc123 ")
assert chat_id == "group:abc123"
assert thread_id is None
assert is_explicit is True
def test_empty_signal_group_target_is_not_explicit(self):
chat_id, thread_id, is_explicit = _parse_target_ref("signal", " group: ")
assert chat_id is None
assert thread_id is None
assert is_explicit is False
def test_sms_e164_is_explicit(self):
chat_id, _, is_explicit = _parse_target_ref("sms", "+15551234567")
assert chat_id == "+15551234567"
@ -2230,11 +2242,68 @@ class TestSendSignalChunking:
)
)
assert result == {"success": True, "platform": "signal", "chat_id": "+15557654321"}
assert result["success"] is True
assert result["platform"] == "signal"
assert result["chat_id"].endswith("4321")
assert len(fake.calls) == 1
params = fake.calls[0]["payload"]["params"]
assert params["message"] == "hello"
assert "attachments" not in params
assert "textStyle" not in params
assert "textStyles" not in params
def test_text_only_markdown_uses_singular_text_style(self, monkeypatch):
fake = _FakeSignalHttp([{"result": {"timestamp": 1}}])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+155****4567"},
"+155****4321",
"**hello**",
)
)
assert result["success"] is True
params = fake.calls[0]["payload"]["params"]
assert params["message"] == "hello"
assert params["textStyle"] == "0:5:BOLD"
assert "textStyles" not in params
def test_text_only_multiple_styles_use_plural_text_styles(self, monkeypatch):
fake = _FakeSignalHttp([{"result": {"timestamp": 1}}])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+155****4567"},
"+155****4321",
"**bold** and *italic*",
)
)
assert result["success"] is True
params = fake.calls[0]["payload"]["params"]
assert params["message"] == "bold and italic"
assert "textStyle" not in params
assert params["textStyles"] == ["0:4:BOLD", "9:6:ITALIC"]
def test_text_style_offsets_use_utf16_code_units(self, monkeypatch):
fake = _FakeSignalHttp([{"result": {"timestamp": 1}}])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+155****4567"},
"+155****4321",
"🙂 **bold**",
)
)
assert result["success"] is True
params = fake.calls[0]["payload"]["params"]
assert params["message"] == "🙂 bold"
assert params["textStyle"] == "3:4:BOLD"
def test_chunks_attachments_above_max(self, tmp_path, monkeypatch):
"""33 attachments → 2 batches; text only on first batch. Batch 1
@ -2274,10 +2343,53 @@ class TestSendSignalChunking:
first = fake.calls[0]["payload"]["params"]
assert first["message"] == "Caption goes here"
assert len(first["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG
assert "textStyle" not in first
assert "textStyles" not in first
second = fake.calls[1]["payload"]["params"]
assert second["message"] == "" # caption only on batch 0
assert len(second["attachments"]) == 33 - SIGNAL_MAX_ATTACHMENTS_PER_MSG
assert "textStyle" not in second
assert "textStyles" not in second
def test_caption_styles_only_apply_to_first_attachment_batch(self, tmp_path, monkeypatch):
from gateway.platforms.signal_rate_limit import SIGNAL_MAX_ATTACHMENTS_PER_MSG
paths = []
for i in range(33):
p = tmp_path / f"img_{i}.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
paths.append((str(p), False))
fake = _FakeSignalHttp([
{"result": {"timestamp": 1}},
{"result": {"timestamp": 2}},
])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+155****4567"},
"group:abc123",
"**Bold** and *italic*",
media_files=paths,
)
)
assert result["success"] is True
assert result["chat_id"] == "group:***"
first = fake.calls[0]["payload"]["params"]
assert first["groupId"] == "abc123"
assert first["message"] == "Bold and italic"
assert first["textStyles"] == ["0:4:BOLD", "9:6:ITALIC"]
assert len(first["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG
second = fake.calls[1]["payload"]["params"]
assert second["groupId"] == "abc123"
assert second["message"] == ""
assert len(second["attachments"]) == 33 - SIGNAL_MAX_ATTACHMENTS_PER_MSG
assert "textStyle" not in second
assert "textStyles" not in second
def test_full_followup_batch_emits_pacing_notice(self, tmp_path, monkeypatch):
"""64 attachments → 2 full batches. Batch 1 needs 14 more tokens

View file

@ -88,6 +88,13 @@ def _error(message: str) -> dict:
return {"error": _sanitize_error_text(message)}
def _display_chat_id(platform_name: str, chat_id: str) -> str:
"""Return a result-safe chat identifier for tool transcripts/log consumers."""
if platform_name == "signal" and str(chat_id).startswith("group:"):
return "group:***"
return chat_id
def _telegram_retry_delay(exc: Exception, attempt: int) -> float | None:
retry_after = getattr(exc, "retry_after", None)
if retry_after is not None:
@ -523,6 +530,12 @@ def _parse_target_ref(platform_name: str, target_ref: str):
# through to the _PHONE_PLATFORMS handler below.
if _WHATSAPP_JID_RE.fullmatch(target_ref):
return target_ref.strip(), None, True
stripped_target = target_ref.strip()
if platform_name == "signal" and stripped_target.startswith("group:"):
group_id = stripped_target[len("group:"):].strip()
if group_id:
return f"group:{group_id}", None, True
return None, None, False
if platform_name in _PHONE_PLATFORMS:
match = _E164_TARGET_RE.fullmatch(target_ref)
if match:
@ -1258,6 +1271,7 @@ async def _send_signal(extra, chat_id, message, media_files=None):
_signal_send_timeout,
get_scheduler,
)
from gateway.platforms.signal_format import markdown_to_signal
try:
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
@ -1284,8 +1298,15 @@ async def _send_signal(extra, chat_id, message, media_files=None):
else:
att_batches = [[]]
plain_text, text_styles = markdown_to_signal(message)
async def _post(batch_attachments, batch_message):
params = {"account": account, "message": batch_message}
if batch_message and text_styles:
if len(text_styles) == 1:
params["textStyle"] = text_styles[0]
else:
params["textStyles"] = text_styles
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
@ -1342,7 +1363,7 @@ async def _send_signal(extra, chat_id, message, media_files=None):
f"for Signal rate limit, batch {idx + 1}/{len(att_batches)}.)"
)
batch_message = message if idx == 0 else ""
batch_message = plain_text if idx == 0 else ""
for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1):
try:
@ -1407,7 +1428,7 @@ async def _send_signal(extra, chat_id, message, media_files=None):
f"no attachments delivered"
)
result = {"success": True, "platform": "signal", "chat_id": chat_id}
result = {"success": True, "platform": "signal", "chat_id": _display_chat_id("signal", chat_id)}
if warnings:
result["warnings"] = warnings
return result