issues/292/user_tool.py (159 lines of code) (raw):

import sys, os base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, base_path) import asyncio from playwright.async_api import async_playwright, Page, expect import os import time from email.header import decode_header import time import re import imaplib import email import logging from email.header import decode_header from utils.trace_cleaner import trace_cleaner NEW_DOT_URL = "https://dev.new.expensify.com:8082" EXPENSIFY_EMAIL = os.getenv("USER_EMAIL", "testingq186+41783@gmail.com") EXPENSIFY_PASSWORD = os.getenv("PASSWORD", "") FIRST_NAME = "Testing" LAST_NAME = "41783" logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') async def main(): async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=True, args=["--ignore-certificate-errors", "--disable-web-security", "--disable-features=IsolateOrigins,site-per-process"], proxy={"server": "http://localhost:8080"}, slow_mo=500) context = await browser.new_context(ignore_https_errors=True) await context.tracing.start(screenshots=True, snapshots=True) page = await context.new_page() try: class EmailHandler: """ A class to handle email operations such as cleaning the inbox, marking all unread emails as read, and reading OTP codes. """ def __init__(self, user_email, password, imap_server='imap.gmail.com'): """ Initializes the EmailHandler with user credentials and connects to the IMAP server. Args: user_email (str): The email address of the user. password (str): The password for the email account. imap_server (str): The IMAP server address. Defaults to 'imap.gmail.com'. """ self.user_email = user_email self.password = password self.imap_server = imap_server self.imap = None def __enter__(self): """ Enters the runtime context and logs into the IMAP server. """ self.imap = imaplib.IMAP4_SSL(self.imap_server) try: self.imap.login(self.user_email, self.password) logging.info("Logged into IMAP server.") except Exception as e: logging.error(f"Failed to login to IMAP server: {e}") raise return self def __exit__(self, exc_type, exc_value, traceback): """ Exits the runtime context and logs out from the IMAP server. """ if self.imap: self.imap.logout() logging.info("Logged out from IMAP server.") def clean_inbox(self): """ Deletes all emails in the inbox. WARNING: This action is irreversible. """ logging.warning("Deleting all emails in the inbox.") status, _ = self.imap.select("INBOX") if status != "OK": logging.error("Failed to select INBOX.") return status, messages = self.imap.search(None, 'ALL') if status != "OK": logging.error("Failed to retrieve emails.") return email_ids = messages[0].split() if not email_ids: logging.info("No emails to delete.") return for email_id in email_ids: self.imap.store(email_id, '+FLAGS', '\\Deleted') self.imap.expunge() logging.info("All emails deleted from the inbox.") def mark_all_unread_as_read(self): """ Marks all unread emails in the inbox as read. """ logging.info("Marking all unread emails as read.") status, _ = self.imap.select("INBOX") if status != "OK": logging.error("Failed to select INBOX.") return status, messages = self.imap.search(None, '(UNSEEN)') if status != "OK": logging.error("Failed to retrieve unread emails.") return email_ids = messages[0].split() if not email_ids: logging.info("No unread emails to mark as read.") return for email_id in email_ids: self.imap.store(email_id, '+FLAGS', '\\Seen') logging.info("All unread emails marked as read.") def read_otp_code(self, retries=5, delay=6): """ Retrieves the OTP code from unread emails. Args: retries (int): Number of retries to attempt fetching the OTP code. delay (int): Delay in seconds between retries. Returns: str: The OTP code if found, else None. """ logging.info("Attempting to read OTP code from emails.") for i in range(retries): self.imap.select("inbox") status, messages = self.imap.search(None, '(UNSEEN SUBJECT "Expensify magic sign-in code:")') if not status == "OK": logging.error(f"Failed to search for emails. Retrying {i + 1}/{retries}...") time.sleep(delay) continue email_ids = messages[0].split() if not email_ids: logging.info(f"Failed to retrieve emails. Retrying {i + 1}/{retries}...") time.sleep(delay) continue latest_email_id = email_ids[-1] status, msg_data = self.imap.fetch(latest_email_id, "(RFC822)") for response_part in msg_data: if isinstance(response_part, tuple): msg = email.message_from_bytes(response_part[1]) subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding or "utf-8") match = re.search(r"Expensify magic sign-in code: (\d+)", subject) if match: code = match.group(1) return code logging.info(f"No matching emails found. Retrying {i + 1}/{retries}...") time.sleep(delay) logging.warning("Max retries reached. OTP code not found.") return None def read_otp_from_email(email, password): with EmailHandler(email, password) as email_handler: email_handler.clean_inbox() # Clean inbox otp = email_handler.read_otp_code() return otp async def complete_onboarding(): await page.locator("text=Track and budget expenses").click() await page.locator('input[name="fname"]').fill(FIRST_NAME) await page.locator('input[name="lname"]').fill(LAST_NAME) await page.get_by_role("button", name="Continue").last.click() async def sign_in_new_dot(page: Page, email: str, password: str): """ Sign in into the new Expensify dot. """ await page.goto(NEW_DOT_URL) await page.get_by_test_id("username").fill(email) await page.get_by_role("button", name="Continue").click() try: await(expect(page.get_by_test_id("validateCode")).to_be_visible()) await page.get_by_test_id("validateCode").fill('111111') except: await page.get_by_role("button", name="Join").click() try: await expect(page.get_by_text("What do you want to do today?")).to_be_visible() await complete_onboarding() except: pass async def test_app_returns_to_confirmation_page_when_tapping_back_button_on_category_page(page: Page): await page.get_by_label("Start chat (Floating action)").click() await page.get_by_label("Track expense").first.click() await page.get_by_role("button", name="Got it").click() await page.get_by_label("Manual").click() await page.get_by_placeholder("0").fill("1000") await page.locator("#numPadContainerView").get_by_role("button", name="Next").click() await page.get_by_role("button", name="Track expense").click() await page.get_by_role("button", name="Categorize it").first.click() await page.get_by_label("Back").click() await expect(page.get_by_test_id("IOURequestStepConfirmation").get_by_text("Categorize")).not_to_be_visible() await sign_in_new_dot(page, EXPENSIFY_EMAIL, EXPENSIFY_PASSWORD) await test_app_returns_to_confirmation_page_when_tapping_back_button_on_category_page(page) except Exception as e: raise e finally: await context.tracing.stop(path="/app/expensify/user_tool/output_browser1.zip") trace_cleaner("/app/expensify/user_tool/output_browser1.zip") await browser.close() def test_41783(): asyncio.run(main())