Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions tests/test_mail_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Security validation for untrusted mail inputs and the CSP host helper."""

import pytest

from tinyagentos import mail_client
from tinyagentos.middleware.security_headers import _strip_port


class TestValidateUid:
def test_accepts_plain_numeric_uid(self):
assert mail_client._validate_uid("12345") == "12345"

@pytest.mark.parametrize(
"bad",
[
"",
"1 (RFC822)",
"1\r\nA OK",
"1:5",
"abc",
"1,2,3",
"*",
],
)
def test_rejects_non_numeric_or_injection(self, bad):
with pytest.raises(mail_client.MailValidationError):
mail_client._validate_uid(bad)


class TestValidateHeader:
def test_accepts_clean_value(self):
assert mail_client._validate_header("Hello there", "subject") == "Hello there"
assert mail_client._validate_header("a@b.test, c@d.test", "to") == "a@b.test, c@d.test"

@pytest.mark.parametrize(
"bad",
[
"subject\r\nBcc: victim@evil.test",
"name\nX-Injected: 1",
"value\rwith-cr",
"has\x00null",
],
)
def test_rejects_crlf_injection(self, bad):
with pytest.raises(mail_client.MailValidationError):
mail_client._validate_header(bad, "to")


class TestStripPort:
@pytest.mark.parametrize(
"host,expected",
[
("example.com", "example.com"),
("example.com:6969", "example.com"),
("192.168.1.5:443", "192.168.1.5"),
("[::1]", "[::1]"),
("[::1]:6969", "[::1]"),
("[2001:db8::1]:8080", "[2001:db8::1]"),
],
)
def test_strips_port_without_corrupting_ipv6(self, host, expected):
assert _strip_port(host) == expected
40 changes: 36 additions & 4 deletions tinyagentos/mail_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
import email
import imaplib
import re
import smtplib
from dataclasses import dataclass, field
from email.header import decode_header, make_header
Expand All @@ -33,7 +34,11 @@
MAX_MESSAGE_LIMIT = 200


class MailFolderError(ValueError):
class MailValidationError(ValueError):
"""Raised when an untrusted value is unsafe to use in a mail command."""


class MailFolderError(MailValidationError):
"""Raised when a folder name is unsafe to interpolate into an IMAP command."""


Expand All @@ -49,6 +54,32 @@ def _validate_folder(folder: str) -> str:
return folder


_UID_RE = re.compile(r"^[0-9]+$")


def _validate_uid(uid: str) -> str:
"""Reject message ids that are not a plain IMAP UID.

``uid`` is an untrusted path parameter passed straight to
``conn.uid("FETCH", uid, ...)``. A non-numeric value could carry extra IMAP
command tokens, so we require a bare numeric UID. The ids we hand out from
``list_messages`` are always numeric UIDs, so this rejects nothing legitimate."""
if not uid or not _UID_RE.fullmatch(uid):
raise MailValidationError(f"invalid message id: {uid!r}")
return uid


def _validate_header(value: str, field: str) -> str:
"""Reject CR/LF/NUL in a value destined for a MIME header.

``to``/``cc``/``subject`` come from the client and are assigned directly to
message headers. A newline would let a caller inject extra headers (a hidden
Bcc, a spoofed From), so we forbid the line-break characters outright."""
if any(c in value for c in ("\r", "\n", "\x00")):
raise MailValidationError(f"invalid characters in {field}")
return value


@dataclass
class MailAccountConfig:
"""Connection details for a single account. The password is resolved from
Expand Down Expand Up @@ -282,6 +313,7 @@ def _get_message_blocking(
cfg: MailAccountConfig, folder: str, uid: str
) -> MessageDetail | None:
_validate_folder(folder)
_validate_uid(uid)
conn = _imap_connect(cfg)
try:
conn.select(f'"{folder}"', readonly=True)
Expand All @@ -306,10 +338,10 @@ def _build_outgoing(
) -> MIMEMultipart:
msg = MIMEMultipart()
msg["From"] = cfg.email_address or cfg.username

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: From header is not validated for CRLF/NUL injection

cfg.email_address and cfg.username are account configuration values supplied by the user and later used in MIME/SMTP commands. Since To, Cc, and Subject are now validated, leaving From unchecked leaves the same header-injection class available if a stored account value contains CR/LF/NUL.

Suggested change
msg["From"] = cfg.email_address or cfg.username
msg["From"] = _validate_header(cfg.email_address or cfg.username, "from")

Reply with @kilocode-bot fix it to have Kilo Code address this issue.

msg["To"] = to
msg["To"] = _validate_header(to, "to")
if cc:
msg["Cc"] = cc
msg["Subject"] = subject
msg["Cc"] = _validate_header(cc, "cc")
msg["Subject"] = _validate_header(subject, "subject")
Comment on lines 340 to +344

@gitar-bot gitar-bot Bot Jun 16, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Security: From header not validated for CRLF injection

_build_outgoing now validates To, Cc, and Subject via _validate_header, but the From header (cfg.email_address or cfg.username, mail_client.py:340) is still assigned without validation. These values originate from client input at account-creation time (AccountCreate.email_address/username in routes/mail.py). If a CR/LF/NUL ever slips into the stored account record, it would reach the MIME From header and reopen the same header-injection vector this PR closes for the other fields.

This is lower severity than the addressed cases because the values are account-config rather than per-request, and the line is outside the modified region — but applying _validate_header here (or validating at account creation) would close the gap consistently.

Validate the From header value alongside To/Cc/Subject.:

msg["From"] = _validate_header(cfg.email_address or cfg.username, "from")
msg["To"] = _validate_header(to, "to")

Was this helpful? React with 👍 / 👎

msg.attach(MIMEText(body, "plain"))
for filename, content, content_type in attachments or []:
maintype, _, subtype = content_type.partition("/")
Expand Down
6 changes: 6 additions & 0 deletions tinyagentos/middleware/security_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def _build_csp(frame_src_extra: str = "") -> str:


def _strip_port(host: str) -> str:
# A bracketed IPv6 host ("[::1]" or "[::1]:6969") is full of colons, so a
# naive rsplit on ":" would corrupt it. Keep everything up to the closing
# bracket; for a normal "host:port" just drop the trailing port.
if host.startswith("["):
end = host.find("]")
return host[: end + 1] if end != -1 else host

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Malformed bracketed Host values are accepted after port stripping

_strip_port("[::1]:6969:extra") returns "[::1]" because it stops at the first ] and ignores the trailing suffix. That malformed Host then passes _SAFE_HOST_RE, so CSP frame-src may be generated from a syntactically invalid Host header. Require no trailing characters after the closing bracket, or validate the port suffix, before returning the bracketed host.


Reply with @kilocode-bot fix it to have Kilo Code address this issue.

return host.rsplit(":", 1)[0] if ":" in host else host


Expand Down
4 changes: 3 additions & 1 deletion tinyagentos/routes/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def get_message(
return JSONResponse({"error": "account credential missing"}, status_code=400)
try:
detail = await mail_client.get_message(cfg, folder, uid)
except mail_client.MailFolderError as exc:
except mail_client.MailValidationError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
except Exception as exc:
return JSONResponse({"error": f"imap error: {exc}"}, status_code=502)
Expand Down Expand Up @@ -240,6 +240,8 @@ async def send(
# TODO(phase-2): attachment upload pass-through (multipart) -- the
# client supports it; the route only sends a text body for now.
await mail_client.send_message(cfg, body.to, body.subject, body.body, cc=body.cc)
except mail_client.MailValidationError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
except Exception as exc:
return JSONResponse({"error": f"smtp error: {exc}"}, status_code=502)
return {"status": "sent"}
Loading