issues/340/user_tool.py (177 lines of code) (raw):
import sys, os
base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, base_path)
import re
import time
import email
import pytest
import random
import imaplib
from playwright._impl._page import Page
from playwright.sync_api import sync_playwright, expect
from utils.trace_cleaner import trace_cleaner
BASE_EMAIL = "testerfreelancer555+28943@gmail.com"
PASSWORD = ""
class EmailHandler:
def __init__(self, email_address, password):
if not self._validate_email(email_address):
raise ValueError("Invalid email address format.")
self.original_email = email_address
self.password = password
random_suffix = random.randint(1000, 9999)
current_timestamp = int(time.time())
random_suffix = f"{current_timestamp}{random_suffix}"
self.new_email = self._generate_new_email(email_address, random_suffix)
def _validate_email(self, email_address):
return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email_address))
def _generate_new_email(self, email_address, suffix):
username, domain = "testerfreelancer@gmail.com".split('@')
return f"{username}+{suffix}@{domain}"
def get_new_email_address(self):
return self.new_email
def get_email_address(self):
return self.original_email
def check_for_otp(self, page, target=False):
if not target:
target = self.original_email
try:
tries = 1
limit_tries = 5
while tries <= limit_tries:
mail = imaplib.IMAP4_SSL("imap.gmail.com")
mail.login(self.original_email, self.password)
mail.select("inbox")
status, messages = mail.search(None, "UNSEEN")
if status != "OK":
print("No unread emails found.")
email_ids = messages[0].split()
email_ids = email_ids[::-1] # Reverse order to process latest first
for email_id in email_ids:
status, msg_data = mail.fetch(email_id, '(RFC822)')
if status != "OK":
print("Error fetching email.")
continue
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
if msg["To"] == target:
otp_code = self._extract_otp_from_email(msg)
if otp_code:
mail.store(email_id, '+FLAGS', '\\Deleted')
mail.expunge() # Permanently removes deleted emails from the mailbox
mail.close()
mail.logout()
return otp_code
tries += 1
page.wait_for_timeout(1000)
mail.close()
mail.logout()
print("No OTP found in unread emails.")
return None
except imaplib.IMAP4.error as error:
print(error)
print("Failed to connect to Gmail. Please check your email address or password.")
return None
def _extract_otp_from_email(self, msg):
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
email_body = part.get_payload(decode=True).decode()
otp_code = self._extract_otp(email_body)
if otp_code:
return otp_code
else:
email_body = msg.get_payload(decode=True).decode()
otp_code = self._extract_otp(email_body)
if otp_code:
return otp_code
return None
def _extract_otp(self, text):
match = re.search(r"\b\d{6}\b", text)
return match.group(0) if match else None
def clear_inbox(self):
"""
Delete all existing messages from the Inbox.
"""
print("Deleting all existing messages from the email inbox")
with imaplib.IMAP4_SSL(host="imap.gmail.com") as imap:
imap.login(self.original_email, self.password)
imap.select("inbox")
imap.store("1:*", "+FLAGS", "\\Deleted")
imap.expunge()
imap.close()
@pytest.fixture
def setup_playwright():
playwright = sync_playwright().start()
browser = playwright.chromium.launch(
headless=True, args=["--ignore-certificate-errors", '--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process']
)
context = browser.new_context()
yield context # Yield the page object to the test function
browser.close()
playwright.stop()
def login_user(page: Page, account: EmailHandler):
user_email = account.get_email_address()
page.goto("https://dev.new.expensify.com:8082/")
account.clear_inbox()
page.locator('input[type="email"]').fill(user_email)
page.locator('button[tabindex="0"]').click()
try:
expect(page.get_by_role("button", name="Sign in")).to_be_visible()
page.get_by_test_id("validateCode").fill("123456")
except Exception as error:
print(error)
page.get_by_role("button", name="Join").click()
def create_a_new_workspace(page):
page.get_by_label("My settings").click()
page.get_by_test_id("InitialSettingsPage").get_by_label("Workspaces").get_by_text("Workspaces").click()
page.wait_for_timeout(
3000) # Waiting the page to load because the loading page has two buttons which might cause an exception because none of them work
try:
page.get_by_label("New workspace").click()
except:
page.get_by_label("New workspace").first.click() # In case there is no workspace yet
try:
return page.get_by_label("Test User's Workspace").inner_text().split('\n')[1]
except:
return page.get_by_label("Test User's Workspace").last.inner_text().split('\n')[1]
def access_target_workspace_chat(page, chat_name):
page.get_by_label("Back").first.click()
page.get_by_label("Inbox").click()
try:
expect(page.locator("#root")).not_to_contain_text("Say hello!")
expect(page.get_by_label("Chat welcome message")).not_to_contain_text("Welcome to #admins!")
page.get_by_test_id("DisplayNamesWithTooltip").filter(has_text=re.compile(rf"^{chat_name}$")).last.click()
except:
page.get_by_test_id("DisplayNamesWithTooltip").filter(has_text=re.compile(rf"^{chat_name}$")).first.click()
def submit_invalid_expense(page):
page.get_by_label("Create").last.click()
page.get_by_text("Submit expense", exact=True).click()
page.get_by_label("Distance").click()
page.get_by_test_id("IOURequestStartPage").get_by_role("button", name="Start").click()
page.get_by_test_id("IOURequestStepWaypoint").get_by_role("textbox").fill("abc") # Inputting invalid location
page.get_by_role("button", name="Save").click()
page.get_by_role("button", name="Stop").click()
page.get_by_test_id("IOURequestStepWaypoint").get_by_role("textbox").fill("def") # Inputting invalid location
page.get_by_role("button", name="Save").click()
page.get_by_role("button", name="Next").nth(1).click()
page.get_by_role("button", name="Submit expense").click()
@pytest.mark.parametrize("first_name, last_name", [("Test", "User")])
def test_selected_highlight(setup_playwright, first_name, last_name):
context = setup_playwright
# Start tracing for this context
context.tracing.start(screenshots=True, snapshots=True, sources=True)
try:
account = EmailHandler(email_address=BASE_EMAIL, password=PASSWORD)
user_email = account.get_email_address()
page = context.new_page()
login_user(page, account)
workspace_name = create_a_new_workspace(page)
access_target_workspace_chat(page, workspace_name)
page.get_by_label("Test User's Workspace").click()
page.get_by_test_id("ReportDetailsPage").get_by_text("Settings").click()
page.get_by_text("Notify me about new messages").click()
selected_option = page.get_by_test_id("selection-list").get_by_label("Immediately")
expect(selected_option).to_have_css("background-color", "rgba(0, 0, 0, 0)")
finally:
trace_path = "/app/expensify/user_tool/output_browser1.zip"
context.tracing.stop(path=trace_path)
trace_cleaner(trace_path)
# Closing the context if not already closed by the fixture
context.close()