-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathemail_parser.py
More file actions
176 lines (143 loc) · 5.74 KB
/
email_parser.py
File metadata and controls
176 lines (143 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Email parsing and extraction utilities"""
import base64
import re
from datetime import datetime
from html.parser import HTMLParser
from config import APPLICATION_PATTERNS, COMPANY_PATTERNS, DEBUG_MODE
"""Extract text from HTML emails"""
def __init__(self):
super().__init__()
self.text = []
def handle_data(self, data):
self.text.append(data)
def get_text(self):
return ' '.join(self.text)
def get_email_body(msg_payload):
"""Extract email body from message payload, handling nested parts"""
body_text = ""
body_html = ""
def extract_from_parts(parts):
nonlocal body_text, body_html
for part in parts:
mime_type = part.get('mimeType', '')
# Handle nested parts
if 'parts' in part:
extract_from_parts(part['parts'])
# Extract text/plain
if mime_type == 'text/plain' and 'data' in part.get('body', {}):
try:
decoded = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
body_text += decoded + " "
except Exception:
pass
# Extract text/html
elif mime_type == 'text/html' and 'data' in part.get('body', {}):
try:
decoded = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
body_html += decoded + " "
except Exception:
pass
# Handle multipart emails
if 'parts' in msg_payload:
extract_from_parts(msg_payload['parts'])
else:
# Handle single part emails
if 'body' in msg_payload and 'data' in msg_payload['body']:
try:
body_text = base64.urlsafe_b64decode(msg_payload['body']['data']).decode('utf-8', errors='ignore')
except Exception:
pass
# Prefer plain text, fallback to HTML
if body_text:
return body_text
elif body_html:
# Convert HTML to text
parser = HTMLTextExtractor()
try:
parser.feed(body_html)
return parser.get_text()
except Exception:
return body_html
return ""
def is_job_application_email(subject, body, sender):
"""Check if email is a job application confirmation"""
# Combine subject and first 2000 chars of body for efficiency
combined = (subject + " " + body[:2000]).lower()
# Check all patterns
matches = []
for pattern in APPLICATION_PATTERNS:
if re.search(pattern, combined, re.IGNORECASE):
matches.append(pattern)
if matches:
if DEBUG_MODE:
print(f" ✓ MATCH: {len(matches)} pattern(s) matched")
print(f" Subject: {subject[:100]}")
print(f" Sender: {sender[:60]}")
for match in matches[:3]:
print(f" - {match}")
return True
if DEBUG_MODE:
print(f" ✗ NO MATCH: {subject[:80]}")
return False
def parse_email_date(date_str, debug=False):
"""Parse email date with multiple format attempts"""
if debug:
print(f" Parsing date: {date_str[:60]}")
date_formats = [
('%a, %d %b %Y %H:%M:%S %z', lambda s: s.split(' (')[0]),
('%a, %d %b %Y %H:%M:%S', lambda s: s[:25]),
('%Y-%m-%d %H:%M:%S', lambda s: s[:19]),
('%d %b %Y %H:%M:%S', lambda s: s),
]
for date_format, preprocessor in date_formats:
try:
processed = preprocessor(date_str)
date = datetime.strptime(processed, date_format)
if debug:
print(f" ✓ Parsed as: {date.strftime('%Y-%m-%d %H:%M:%S')}")
return date
except (ValueError, IndexError):
continue
if debug:
print(f" ✗ Could not parse, using current time")
return datetime.now()
def extract_company(sender, subject='', body='', debug=False):
"""Extract company name from email with multiple fallback strategies"""
# Strategy 1: Try sender email domain
sender_match = re.search(r'@([a-zA-Z0-9-]+)\.[a-zA-Z]{2,}', sender)
if sender_match:
domain = sender_match.group(1).lower()
company = domain.replace('-', ' ').title()
if debug:
print(f" Company (from domain): {company}")
return company
# Strategy 2: Search in subject and body
combined = subject + "\n" + body[:1000]
for pattern in COMPANY_PATTERNS:
matches = re.findall(pattern, combined, re.MULTILINE)
if matches:
company = matches[0].strip()
company = re.sub(r'\s+', ' ', company)
company = company.rstrip('.,!?')
if 3 < len(company) < 50:
skip_phrases = [
'the position', 'the role', 'the team', 'the company',
'your application', 'our team', 'this position'
]
if not any(phrase in company.lower() for phrase in skip_phrases):
if debug:
print(f" Company (from pattern): {company}")
return company
# Strategy 3: Try sender name
sender_name_match = re.search(r'^([^<]+)', sender)
if sender_name_match:
name = sender_name_match.group(1).strip().strip('"')
if '@' not in name and len(name) > 3:
words = name.split()
if len(words) <= 1 or any(word.lower() in ['team', 'recruiting', 'careers', 'talent'] for word in words):
if debug:
print(f" Company (from sender name): {name}")
return name
if debug:
print(f" Company: Unknown (no match found)")
return "Unknown Company"