utils/email_handler.py (97 lines of code) (raw):
import time
import re
import imaplib
import email
import logging
from email.header import decode_header
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_unique_derived_email(email: str):
"""
Derive the email from the user email.
"""
user_name, domain = email.split('@')
return f"{user_name}+{int(time.time())}@{domain}"
def get_specific_derived_email(email: str, suffix: str):
"""
Derive the email from the user email.
Will append the suffix to the email: `user_name+suffix@domain`
"""
user_name, domain = email.split('@')
return f"{user_name}+{suffix}@{domain}"
class EmailHandler:
"""
A class to handle email operations such as cleaning the inbox,
marking all unread emails as read, and reading OTP codes.
"""
def __init__(self, user_email, password, imap_server='imap.gmail.com'):
"""
Initializes the EmailHandler with user credentials and connects to the IMAP server.
Args:
user_email (str): The email address of the user.
password (str): The password for the email account.
imap_server (str): The IMAP server address. Defaults to 'imap.gmail.com'.
"""
self.user_email = user_email
self.password = password
self.imap_server = imap_server
self.imap = None
def __enter__(self):
"""
Enters the runtime context and logs into the IMAP server.
"""
self.imap = imaplib.IMAP4_SSL(self.imap_server)
try:
self.imap.login(self.user_email, self.password)
logging.info("Logged into IMAP server.")
except Exception as e:
logging.error(f"Failed to login to IMAP server: {e}")
raise
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Exits the runtime context and logs out from the IMAP server.
"""
if self.imap:
self.imap.logout()
logging.info("Logged out from IMAP server.")
def clean_inbox(self):
"""
Deletes all emails in the inbox.
WARNING: This action is irreversible.
"""
logging.warning("Deleting all emails in the inbox.")
# Select the inbox folder
status, _ = self.imap.select("INBOX")
if status != "OK":
logging.error("Failed to select INBOX.")
return
# Search for all emails
status, messages = self.imap.search(None, 'ALL')
if status != "OK":
logging.error("Failed to retrieve emails.")
return
email_ids = messages[0].split()
if not email_ids:
logging.info("No emails to delete.")
return
# Mark all emails for deletion
for email_id in email_ids:
self.imap.store(email_id, '+FLAGS', '\\Deleted')
# Permanently delete emails marked for deletion
self.imap.expunge()
logging.info("All emails deleted from the inbox.")
def mark_all_unread_as_read(self):
"""
Marks all unread emails in the inbox as read.
"""
logging.info("Marking all unread emails as read.")
# Select the inbox folder
status, _ = self.imap.select("INBOX")
if status != "OK":
logging.error("Failed to select INBOX.")
return
# Search for unread emails
status, messages = self.imap.search(None, '(UNSEEN)')
if status != "OK":
logging.error("Failed to retrieve unread emails.")
return
email_ids = messages[0].split()
if not email_ids:
logging.info("No unread emails to mark as read.")
return
# Mark each email as read
for email_id in email_ids:
self.imap.store(email_id, '+FLAGS', '\\Seen')
logging.info("All unread emails marked as read.")
def read_otp_code(self, retries=5, delay=6):
"""
Retrieves the OTP code from unread emails.
Args:
retries (int): Number of retries to attempt fetching the OTP code.
delay (int): Delay in seconds between retries.
Returns:
str: The OTP code if found, else None.
"""
logging.info("Attempting to read OTP code from emails.")
# Loop to retry fetching the OTP for a specified number of attempts
for i in range(retries):
# Search for unread emails with the subject "Expensify magic sign-in code:"
self.imap.select("inbox")
status, messages = self.imap.search(None, '(UNSEEN SUBJECT "Expensify magic sign-in code:")')
# Check if the search was successful
if not status == "OK":
logging.error(f"Failed to search for emails. Retrying {i + 1}/{retries}...")
time.sleep(delay)
continue
# If there are any matching emails, process the latest one
email_ids = messages[0].split()
if not email_ids:
logging.info(f"Failed to retrieve emails. Retrying {i + 1}/{retries}...")
time.sleep(delay)
continue
latest_email_id = email_ids[-1]
status, msg_data = self.imap.fetch(latest_email_id, "(RFC822)")
# Iterate over the message data to retrieve the email content
for response_part in msg_data:
if isinstance(response_part, tuple):
# Parse the email content
msg = email.message_from_bytes(response_part[1])
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding or "utf-8")
# Extract the OTP code from the email subject
match = re.search(r"Expensify magic sign-in code: (\d+)", subject)
if match:
code = match.group(1)
return code
logging.info(f"No matching emails found. Retrying {i + 1}/{retries}...")
time.sleep(delay)
logging.warning("Max retries reached. OTP code not found.")
return None