Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions imap_tools/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,28 @@
from functools import cached_property
from email.header import decode_header
from email.message import _parseparam, _unquotevalue # noqa
from typing import Tuple, Dict, Optional, List
from typing import Tuple, Dict, Optional, List, Type, TypeVar

from .utils import decode_value, parse_email_addresses, parse_email_date, EmailAddress, replace_html_ct_charset
from .consts import UID_PATTERN, CODECS_OFFICIAL_REPLACEMENT_CHAR

Self = TypeVar("Self", bound="MailMessage")


class MailMessage:
"""The email message"""

def __init__(self, fetch_data: list) -> None:
def __init__(self, fetch_data: list, headers_errors: str = 'ignore') -> None:
raw_message_data, raw_uid_data, raw_flag_data = self._get_message_data_parts(fetch_data)
self._raw_uid_data = raw_uid_data
self._raw_flag_data = raw_flag_data
self._headers_errors = headers_errors
self.obj = email.message_from_bytes(raw_message_data)

@classmethod
def from_bytes(cls, raw_message_data: bytes):
def from_bytes(cls: Type[Self], raw_message_data: bytes, headers_errors: str = 'ignore') -> Type[Self]:
"""Alternative constructor"""
return cls([(b'', raw_message_data)])
return cls([(b'', raw_message_data)], headers_errors)

def __str__(self):
repl = '[\t\n\r\f\v]'
Expand Down Expand Up @@ -180,16 +183,24 @@ def html(self) -> str:
results.append(replace_html_ct_charset(html, 'utf-8'))
return ''.join(results)

def headers_bytes(self, lower: bool = True) -> Dict[str, Tuple[bytes, ...]]:
"""
Message headers as bytes
"""
result = {}
for key, val in getattr(self.obj, '_headers', ()):
result.setdefault(key.lower() if lower else key, []).append(val.encode('ascii', errors='surrogateescape'))
return {k: tuple(v) for k, v in result.items()}

@cached_property
def headers(self) -> Dict[str, Tuple[str, ...]]:
"""
Message headers
Keys in result dict are in lower register (email headers are not case-sensitive)
"""
result = {}
for key, val in getattr(self.obj, '_headers', ()):
result.setdefault(key.lower(), []).append(val)
return {k: tuple(v) for k, v in result.items()}
return {
key: tuple(val.decode(errors=self._headers_errors) for val in tup) for key, tup in self.headers_bytes().items()
}

@cached_property
def attachments(self) -> List['MailAttachment']:
Expand Down
31 changes: 31 additions & 0 deletions tests/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,37 @@ def test_attributes(self):
for att_attr in test_att_attr_set:
self.assertEqual(getattr(att, att_attr), expected_data['attachments'][att_i][att_attr])

def test_headers_bytes(self):
headers = (
b'X-Header: =?utf8?b?w6TDtsO8?=\r\n' +
'x-header: äöü\r\n'.encode() +
'X-HEADER: äöü\r\n'.encode('latin1')
)
assert MailMessage.from_bytes(headers).headers_bytes() == {
'x-header': (
b'=?utf8?b?w6TDtsO8?=',
'äöü'.encode(),
'äöü'.encode('latin1')
),
}
assert MailMessage.from_bytes(headers).headers_bytes(lower=False) == {
'X-Header': (b'=?utf8?b?w6TDtsO8?=',),
'x-header': ('äöü'.encode(),),
'X-HEADER': ('äöü'.encode('latin1'),),
}

def test_headers(self):
headers = (
b'X-Header: =?utf8?b?w6TDtsO8?=\r\n' +
'x-header: äöü\r\n'.encode() +
'X-HEADER: äöü\r\n'.encode('latin1')
)
assert MailMessage.from_bytes(headers).headers == {
'x-header': ('=?utf8?b?w6TDtsO8?=', 'äöü', '')
}
assert MailMessage.from_bytes(headers, 'backslashreplace').headers == {
'x-header': ('=?utf8?b?w6TDtsO8?=', 'äöü', '\\xe4\\xf6\\xfc')
}

if __name__ == "__main__":
unittest.main()
Expand Down