fix(email): reject spoofed From: header for authorization (GHSA-rxqh-5572-8m77)

The email adapter authorized senders entirely off the From: header, which is
attacker-controlled and unauthenticated by IMAP. An attacker could forge
From: an-allowlisted-address and pass both the adapter's EMAIL_ALLOWED_USERS
pre-filter and the gateway's allowlist authz (both key on the same spoofable
sender_addr), getting unauthorized commands executed by the agent.

Verify the From: domain against the trusted Authentication-Results header the
receiving mail server stamps (SPF/DKIM/DMARC) before trusting it for
authorization. Enforced only when an allowlist is in effect and allow-all is
off — fail-closed. Operators whose server does not stamp the header can opt
out via platforms.email.require_authenticated_sender: false (or
EMAIL_TRUST_FROM_HEADER=true).
This commit is contained in:
teknium1 2026-06-25 19:02:42 -07:00 committed by Teknium
parent dedf5643d8
commit 85e084d60d
2 changed files with 450 additions and 1 deletions

View file

@ -43,7 +43,7 @@ from gateway.platforms.base import (
cache_image_from_bytes,
)
from gateway.config import Platform, PlatformConfig
from utils import env_int
from utils import env_int, env_bool
logger = logging.getLogger(__name__)
# Automated sender patterns — emails from these are silently ignored
@ -243,6 +243,122 @@ def _extract_email_address(raw: str) -> str:
return raw.strip().lower()
def _domain_of(address: str) -> str:
"""Return the lowercased domain part of an email address, or ''."""
_, _, domain = address.rpartition("@")
return domain.strip().lower()
def _domains_aligned(a: str, b: str) -> bool:
"""Return True if two domains are equal or in an organizational
parent/subdomain relationship (relaxed DMARC alignment).
DMARC relaxed alignment treats ``mail.example.com`` as aligned with
``example.com``. We approximate organizational alignment by checking
exact equality or that one domain is a dot-suffix of the other.
"""
a = (a or "").strip().lower().rstrip(".")
b = (b or "").strip().lower().rstrip(".")
if not a or not b:
return False
if a == b:
return True
return a.endswith("." + b) or b.endswith("." + a)
# Match a single "method=result" token in an Authentication-Results header,
# e.g. ``dmarc=pass`` or ``spf=fail``.
_AUTH_METHOD_RE = re.compile(
r"\b(dmarc|dkim|spf)\s*=\s*([a-z]+)", re.IGNORECASE
)
# Match a property value like ``header.from=example.com`` or
# ``smtp.mailfrom=user@example.com``.
_AUTH_PROP_RE = re.compile(
r"\b(header\.from|header\.d|smtp\.mailfrom|smtp\.from|envelope-from)\s*=\s*([^\s;]+)",
re.IGNORECASE,
)
def _verify_sender_authentication(
msg: email_lib.message.Message,
from_addr: str,
*,
authserv_id: str = "",
) -> Tuple[bool, str]:
"""Verify that the message's ``From:`` domain is authenticated.
The ``From:`` header is attacker-controlled and is never authenticated by
IMAP delivery, so an allowlist keyed on ``From:`` alone is trivially
spoofable (GHSA-rxqh-5572-8m77). The only trustworthy signal is the
``Authentication-Results`` header that the *receiving* mail server (the one
we IMAP into) stamps after running SPF/DKIM/DMARC. That header is prepended
by our own server, so the topmost instance is the one we trust; any
``Authentication-Results`` an attacker injected into the body of their
message sorts below it.
Returns ``(authenticated, reason)``. ``authenticated`` is True when:
* a DMARC pass is recorded for the From domain, OR
* an SPF pass aligned with the From domain, OR
* a DKIM pass aligned (``header.d``) with the From domain.
When no ``Authentication-Results`` header is present at all, we return
``(False, "no Authentication-Results header")`` fail-closed. Operators
whose mail server does not stamp this header can opt out of the check
(see ``EmailAdapter._require_authenticated_sender``).
"""
from_domain = _domain_of(from_addr)
if not from_domain:
return False, "missing From domain"
# get_all preserves header order; the receiving server prepends its result,
# so the FIRST Authentication-Results is the trusted one. We pin to the
# configured authserv-id when provided to defend against an injected header
# that happens to sort first.
headers = msg.get_all("Authentication-Results") or []
if not headers:
return False, "no Authentication-Results header"
trusted = None
for raw in headers:
value = " ".join(str(raw).split())
if authserv_id:
# authserv-id is the first token before the first ';'
serv = value.split(";", 1)[0].strip().lower()
if not _domains_aligned(serv, authserv_id) and serv != authserv_id.lower():
continue
trusted = value
break
if trusted is None:
return False, "no Authentication-Results from trusted authserv-id"
methods = {m.lower(): r.lower() for m, r in _AUTH_METHOD_RE.findall(trusted)}
props = {p.lower(): v.strip().strip('"') for p, v in _AUTH_PROP_RE.findall(trusted)}
# 1) DMARC pass is the strongest signal — DMARC already enforces From
# alignment, so a pass means the From domain is authenticated.
if methods.get("dmarc") == "pass":
return True, "dmarc=pass"
# 2) SPF pass aligned with the From domain (the envelope/MAIL FROM domain
# must match the From domain).
if methods.get("spf") == "pass":
spf_domain = _domain_of(props.get("smtp.mailfrom", "")) or props.get(
"smtp.from", ""
) or props.get("envelope-from", "")
spf_domain = _domain_of(spf_domain) if "@" in spf_domain else spf_domain
if _domains_aligned(spf_domain, from_domain):
return True, "spf=pass aligned"
# 3) DKIM pass aligned with the From domain (the signing domain header.d
# must align with the From domain).
if methods.get("dkim") == "pass":
dkim_domain = props.get("header.d", "") or _domain_of(props.get("header.from", ""))
if _domains_aligned(dkim_domain, from_domain):
return True, "dkim=pass aligned"
return False, f"authentication failed ({trusted[:120]})"
def _extract_attachments(
msg: email_lib.message.Message,
skip_attachments: bool = False,
@ -332,6 +448,34 @@ class EmailAdapter(BasePlatformAdapter):
# skip_attachments: true
self._skip_attachments = extra.get("skip_attachments", False)
# Require the sender's From: domain to be authenticated (SPF/DKIM/DMARC)
# before trusting it for authorization. The From: header is
# attacker-controlled and unauthenticated by IMAP, so an allowlist keyed
# on it alone is spoofable (GHSA-rxqh-5572-8m77). Default ON (fail-closed).
#
# Operators whose receiving mail server does not stamp an
# Authentication-Results header can opt out via config.yaml:
# platforms:
# email:
# require_authenticated_sender: false
# or the EMAIL_TRUST_FROM_HEADER=true env mirror (parity with the other
# EMAIL_* access-control vars). When allow-all is in effect the operator
# has already chosen to accept any sender, so the check is moot and the
# gate below is skipped.
if "require_authenticated_sender" in extra:
self._require_authenticated_sender = bool(extra["require_authenticated_sender"])
elif env_bool("EMAIL_TRUST_FROM_HEADER", False):
self._require_authenticated_sender = False
else:
self._require_authenticated_sender = True
# Optional authserv-id to pin Authentication-Results to the operator's
# own receiving server (defends against an injected header that sorts
# first). Defaults to the From-domain of the agent's own address.
self._authserv_id = (
extra.get("authserv_id", "") or os.getenv("EMAIL_AUTHSERV_ID", "")
).strip().lower()
# Track message IDs we've already processed to avoid duplicates
self._seen_uids: set = set()
self._seen_uids_max: int = 2000 # cap to prevent unbounded memory growth
@ -547,6 +691,17 @@ class EmailAdapter(BasePlatformAdapter):
if _is_automated_sender(sender_addr, msg_headers):
logger.debug("[Email] Skipping automated sender: %s", sender_addr)
continue
# Verify the From: domain is authenticated (SPF/DKIM/DMARC)
# while the raw message — and its trusted
# Authentication-Results header — is still in scope. The
# verdict is consumed at dispatch where authorization is
# decided. From: is attacker-controlled, so this is the only
# place a spoof can be caught (GHSA-rxqh-5572-8m77).
sender_authenticated, auth_reason = _verify_sender_authentication(
msg, sender_addr, authserv_id=self._authserv_id
)
body = _extract_text_body(msg)
attachments = _extract_attachments(msg, skip_attachments=self._skip_attachments)
@ -560,6 +715,8 @@ class EmailAdapter(BasePlatformAdapter):
"body": body,
"attachments": attachments,
"date": msg.get("Date", ""),
"sender_authenticated": sender_authenticated,
"auth_reason": auth_reason,
})
finally:
try:
@ -570,6 +727,36 @@ class EmailAdapter(BasePlatformAdapter):
logger.error("[Email] IMAP fetch error: %s", e)
return results
@staticmethod
def _allow_all_senders() -> bool:
"""Return True when the operator opted into accepting any sender.
Mirrors the gateway authz allow-all resolution: the per-platform
EMAIL_ALLOW_ALL_USERS flag or the global GATEWAY_ALLOW_ALL_USERS flag.
When either is set, sender identity is moot, so the From: authentication
gate is skipped.
"""
truthy = {"true", "1", "yes"}
return (
os.getenv("EMAIL_ALLOW_ALL_USERS", "").strip().lower() in truthy
or os.getenv("GATEWAY_ALLOW_ALL_USERS", "").strip().lower() in truthy
)
@staticmethod
def _allowlist_in_effect() -> bool:
"""Return True when a sender allowlist gates email access.
Authorization keys on the From: address only when an allowlist is
configured the per-platform EMAIL_ALLOWED_USERS or the global
GATEWAY_ALLOWED_USERS. When neither is set the gateway default-denies
every sender regardless, so the spoofable From: identity grants nothing
and the authentication gate is unnecessary.
"""
return bool(
os.getenv("EMAIL_ALLOWED_USERS", "").strip()
or os.getenv("GATEWAY_ALLOWED_USERS", "").strip()
)
async def _dispatch_message(self, msg_data: Dict[str, Any]) -> None:
"""Convert a fetched email into a MessageEvent and dispatch it."""
sender_addr = msg_data["sender_addr"]
@ -595,6 +782,32 @@ class EmailAdapter(BasePlatformAdapter):
logger.debug("[Email] Dropping non-allowlisted sender at dispatch: %s", sender_addr)
return
# Reject spoofed senders. The allowlist (and the gateway's own authz)
# key on sender_addr, which comes straight from the attacker-controlled
# From: header — so an attacker can forge From: an-allowlisted@addr to
# get authorized (GHSA-rxqh-5572-8m77). This only matters when an
# allowlist is actually being used to GRANT access: if no allowlist is
# configured the gateway default-denies everyone anyway, and if allow-all
# is on the operator already accepts any sender. So enforce From:
# authentication exactly when an allowlist is in effect and allow-all is
# off. Fail-closed: an unauthenticated From: is dropped before it can be
# matched against the allowlist.
if (
self._require_authenticated_sender
and self._allowlist_in_effect()
and not self._allow_all_senders()
and not msg_data.get("sender_authenticated", False)
):
logger.warning(
"[Email] Dropping sender with unauthenticated From: %s (%s). "
"If your mail server does not stamp Authentication-Results, set "
"platforms.email.require_authenticated_sender: false (or "
"EMAIL_TRUST_FROM_HEADER=true) to accept the risk.",
sender_addr,
msg_data.get("auth_reason", "no verdict"),
)
return
subject = msg_data["subject"]
body = msg_data["body"].strip()
attachments = msg_data["attachments"]

View file

@ -537,6 +537,10 @@ class TestDispatchMessage(unittest.TestCase):
"body": "Hello",
"attachments": [],
"date": "",
# Authenticated From: (SPF/DKIM/DMARC passed at the receiving
# server). Allowlisted senders must be authenticated to proceed.
"sender_authenticated": True,
"auth_reason": "dmarc=pass",
}
asyncio.run(adapter._dispatch_message(msg_data))
@ -570,6 +574,134 @@ class TestDispatchMessage(unittest.TestCase):
# Handler should be called when no allowlist is configured
adapter._message_handler.assert_called()
def test_spoofed_from_rejected_when_allowlisted(self):
"""A forged From: matching the allowlist is dropped when unauthenticated.
Core of GHSA-rxqh-5572-8m77: an attacker forges From: an-allowlisted
address. With an allowlist in effect and no allow-all, an unauthenticated
From: must be rejected before it can be matched against the allowlist.
"""
import asyncio
with patch.dict(os.environ, {
"EMAIL_ALLOWED_USERS": "admin@test.com",
}):
adapter = self._make_adapter()
adapter._message_handler = MagicMock()
msg_data = {
"uid": b"200",
"sender_addr": "admin@test.com", # forged From: matching allowlist
"sender_name": "Admin",
"subject": "Spoofed",
"message_id": "<spoof@evil.com>",
"in_reply_to": "",
"body": "rm -rf /",
"attachments": [],
"date": "",
"sender_authenticated": False, # SPF/DKIM/DMARC did not pass
"auth_reason": "authentication failed (spf=fail)",
}
asyncio.run(adapter._dispatch_message(msg_data))
adapter._message_handler.assert_not_called()
self.assertNotIn("admin@test.com", adapter._thread_context)
def test_unauthenticated_allowed_without_allowlist(self):
"""No allowlist → no From: auth gate (gateway default-denies anyway)."""
import asyncio
with patch.dict(os.environ, {}, clear=False):
for k in ("EMAIL_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS"):
os.environ.pop(k, None)
adapter = self._make_adapter()
adapter._message_handler = MagicMock()
msg_data = {
"uid": b"201",
"sender_addr": "anyone@test.com",
"sender_name": "Anyone",
"subject": "Hi",
"message_id": "<any@test.com>",
"in_reply_to": "",
"body": "Hi",
"attachments": [],
"date": "",
"sender_authenticated": False,
"auth_reason": "no Authentication-Results header",
}
asyncio.run(adapter._dispatch_message(msg_data))
# Adapter forwards; the gateway's own default-deny handles authz.
adapter._message_handler.assert_called()
def test_unauthenticated_allowed_with_trust_from_header(self):
"""EMAIL_TRUST_FROM_HEADER=true disables the gate even with an allowlist."""
import asyncio
with patch.dict(os.environ, {
"EMAIL_ALLOWED_USERS": "admin@test.com",
"EMAIL_TRUST_FROM_HEADER": "true",
}):
adapter = self._make_adapter()
captured = []
async def capture_handle(event):
captured.append(event)
adapter.handle_message = capture_handle
msg_data = {
"uid": b"202",
"sender_addr": "admin@test.com",
"sender_name": "Admin",
"subject": "Trusted",
"message_id": "<t@test.com>",
"in_reply_to": "",
"body": "Hello",
"attachments": [],
"date": "",
"sender_authenticated": False,
"auth_reason": "no Authentication-Results header",
}
asyncio.run(adapter._dispatch_message(msg_data))
self.assertEqual(len(captured), 1)
def test_unauthenticated_allowed_with_allow_all(self):
"""EMAIL_ALLOW_ALL_USERS=true makes sender identity moot — gate skipped.
With allow-all and no restrictive allowlist, an unauthenticated sender
is forwarded: the operator has explicitly chosen to accept anyone.
"""
import asyncio
with patch.dict(os.environ, {
"EMAIL_ALLOW_ALL_USERS": "true",
}):
os.environ.pop("EMAIL_ALLOWED_USERS", None)
os.environ.pop("GATEWAY_ALLOWED_USERS", None)
adapter = self._make_adapter()
captured = []
async def capture_handle(event):
captured.append(event)
adapter.handle_message = capture_handle
msg_data = {
"uid": b"203",
"sender_addr": "stranger@elsewhere.com",
"sender_name": "Stranger",
"subject": "Hi",
"message_id": "<s@elsewhere.com>",
"in_reply_to": "",
"body": "Hello",
"attachments": [],
"date": "",
"sender_authenticated": False,
"auth_reason": "no Authentication-Results header",
}
asyncio.run(adapter._dispatch_message(msg_data))
self.assertEqual(len(captured), 1)
class TestThreadContext(unittest.TestCase):
"""Test email reply threading logic."""
@ -1482,5 +1614,109 @@ class TestConnectionConfigResolution(unittest.TestCase):
self.assertTrue(check_email_requirements())
class TestSenderAuthentication(unittest.TestCase):
"""Verify _verify_sender_authentication parses Authentication-Results
correctly and resists From: spoofing (GHSA-rxqh-5572-8m77)."""
def _msg(self, from_addr, auth_results=None):
"""Build an email.message.Message with the given From: and
zero or more Authentication-Results headers (first = topmost/trusted)."""
msg = MIMEText("body")
msg["From"] = from_addr
for ar in auth_results or []:
msg["Authentication-Results"] = ar
return msg
def _verify(self, from_addr, auth_results=None, authserv_id=""):
from plugins.platforms.email.adapter import (
_verify_sender_authentication,
_extract_email_address,
)
msg = self._msg(from_addr, auth_results)
addr = _extract_email_address(from_addr)
return _verify_sender_authentication(msg, addr, authserv_id=authserv_id)
def test_dmarc_pass_authenticates(self):
ok, reason = self._verify(
"Admin <admin@example.com>",
["mx.google.com; dmarc=pass header.from=example.com; spf=pass"],
)
self.assertTrue(ok, reason)
def test_spf_pass_aligned_authenticates(self):
ok, reason = self._verify(
"admin@example.com",
["mx.google.com; spf=pass smtp.mailfrom=admin@example.com"],
)
self.assertTrue(ok, reason)
def test_dkim_pass_aligned_authenticates(self):
ok, reason = self._verify(
"admin@example.com",
["mx.google.com; dkim=pass header.d=example.com"],
)
self.assertTrue(ok, reason)
def test_spf_pass_misaligned_rejected(self):
# SPF passes for the envelope domain, but it doesn't match From: domain.
ok, reason = self._verify(
"admin@example.com",
["mx.google.com; spf=pass smtp.mailfrom=bounce@evil.com"],
)
self.assertFalse(ok, reason)
def test_dkim_pass_misaligned_rejected(self):
ok, reason = self._verify(
"admin@example.com",
["mx.google.com; dkim=pass header.d=evil.com"],
)
self.assertFalse(ok, reason)
def test_all_fail_rejected(self):
ok, reason = self._verify(
"admin@example.com",
["mx.google.com; dmarc=fail; spf=fail; dkim=fail"],
)
self.assertFalse(ok, reason)
def test_no_authentication_results_rejected(self):
ok, reason = self._verify("admin@example.com", [])
self.assertFalse(ok)
self.assertIn("no Authentication-Results", reason)
def test_relaxed_alignment_subdomain(self):
# mail.example.com (DKIM signer) aligns with example.com (From).
ok, reason = self._verify(
"admin@example.com",
["mx.google.com; dkim=pass header.d=mail.example.com"],
)
self.assertTrue(ok, reason)
def test_injected_header_below_trusted_does_not_authenticate(self):
"""An attacker-injected Authentication-Results sorts BELOW the receiving
server's. With authserv-id pinning, only the trusted (first) header is
consulted, so a forged 'dmarc=pass' lower in the stack is ignored."""
ok, reason = self._verify(
"admin@example.com",
[
# Trusted: stamped by our server, real verdict = fail
"mx.ourserver.com; dmarc=fail header.from=example.com",
# Forged by attacker, claims pass
"mx.ourserver.com; dmarc=pass header.from=example.com",
],
authserv_id="mx.ourserver.com",
)
self.assertFalse(ok, reason)
def test_authserv_id_mismatch_skips_untrusted_header(self):
"""A header from an authserv-id we don't trust is skipped entirely."""
ok, reason = self._verify(
"admin@example.com",
["attacker.relay.com; dmarc=pass header.from=example.com"],
authserv_id="mx.ourserver.com",
)
self.assertFalse(ok, reason)
if __name__ == "__main__":
unittest.main()