issues/216/user_tool.py (210 lines of code) (raw):
import sys, os
base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, base_path)
import email
import imaplib
import os
import re
import time
from email.header import decode_header
import pytest
from playwright.async_api import (
Page,
Playwright,
TimeoutError,
async_playwright,
expect,
)
from utils.trace_cleaner import trace_cleaner
NOW = int(time.time())
USER_EMAIL = "xecuycismfsga+k7gse6h@gmail.com"
PARTICIPANT_EMAILS = [
"xecuycismfsga+eawdvh@gmail.com",
"xecuycismfsga+nzfgxzu@gmail.com",
]
PASSWORD = "uwgb zzoc cofa kaaq"
SLOW_MO = 1000
CACHED: bool = True
PROXY_URL = os.getenv("PROXY_URL")
def get_magic_code(
user_email: str,
password: str,
current_timestamp: int,
retries=5,
delay=3,
slow_mo: int | None = None, # Add slow_mo parameter
cached: bool = False,
):
if cached:
return "123456"
imap = imaplib.IMAP4_SSL("imap.gmail.com")
imap.login(user_email, password)
additional_delay = (slow_mo / 1000) if slow_mo else 0
total_delay = delay + additional_delay
for _attempt in range(retries):
print(f"Attempt {_attempt}")
if _attempt > 0:
print(f"Waiting {total_delay} seconds before next attempt...")
time.sleep(total_delay)
imap.select("inbox")
status, messages = imap.search(
None, '(UNSEEN SUBJECT "Expensify magic sign-in code:")'
)
if status == "OK":
email_ids = messages[0].split()
if email_ids:
latest_email_id = email_ids[-1]
status, msg_data = imap.fetch(latest_email_id, "(RFC822)")
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
email_date = msg.get("Date")
email_timestamp = email.utils.mktime_tz(
email.utils.parsedate_tz(email_date)
)
current_utc = time.time()
imap.store(latest_email_id, "+FLAGS", "\\Seen")
print(
f"Email time: {email_timestamp}, Current time: {current_utc}"
)
if email_timestamp < current_timestamp:
print(
f"Found old email from {email_date}, waiting for new one..."
)
break # Break the response_part loop
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding or "utf-8")
match = re.search(
r"Expensify magic sign-in code: (\d+)", subject
)
if match:
code = match.group(1)
imap.logout()
return code
imap.logout()
print("Max retries reached. Email not found.")
return None
async def create_user(page: Page, firstname: str, lastname: str):
await page.get_by_role("button", name="Join").click()
try:
onboarding = page.get_by_text("Track and budget expenses")
await onboarding.wait_for(state="visible", timeout=2000)
await onboarding.click()
await page.get_by_role("textbox", name="First name").fill(firstname)
await page.get_by_role("textbox", name="Last name").fill(lastname)
await page.get_by_role("button", name="Continue").click()
except TimeoutError:
pass
async def login(
page: Page,
user_email: str,
password: str,
slow_mo: int | None = None,
cached: bool = False,
):
magic_code = get_magic_code(
user_email, password, NOW, retries=3, delay=5, slow_mo=slow_mo, cached=cached
)
if magic_code is None:
raise ValueError("Failed to retrieve magic code")
await page.get_by_role("textbox").fill(magic_code)
async def login_or_create_user(
page: Page,
first_name: str = "John",
last_name: str = "Doe",
user_email: str = USER_EMAIL,
password: str = PASSWORD,
slow_mo: int | None = None,
cached: bool = False,
):
await page.get_by_test_id("username").fill(user_email)
await page.get_by_role("button", name="Continue").click()
try:
await page.get_by_role("button", name="Join").wait_for(
state="visible", timeout=2000
)
await create_user(page, first_name, last_name)
except TimeoutError:
await login(page, user_email, password, slow_mo, cached=cached)
async def run(playwright: Playwright) -> None:
browser = await playwright.chromium.launch(
headless=True,
slow_mo=SLOW_MO,
proxy={"server": PROXY_URL} if PROXY_URL else None,
args=[
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
],
)
context = await browser.new_context()
await context.tracing.start(screenshots=True, snapshots=True, title="test_welcome_group_message")
try:
page = await context.new_page()
await page.goto("https://dev.new.expensify.com:8082/")
await login_or_create_user(page, slow_mo=SLOW_MO, cached=CACHED)
has_text = (
f':has-text("{PARTICIPANT_EMAILS[0]}"):has-text("{PARTICIPANT_EMAILS[1]}")'
)
group_chat = (
page.get_by_test_id("BaseSidebarScreen")
.locator(f'button[aria-label="Navigates to a chat"]{has_text}')
.first
)
group_details = page.locator(
f'div[data-testid="DisplayNamesWithTooltip"]{has_text}'
).last
try:
await group_chat.wait_for(state="visible", timeout=3000)
await group_chat.click()
await group_details.click()
await page.get_by_label("Leave").click()
await page.get_by_label("Back").last.click() # Go back home
except TimeoutError:
pass
await page.get_by_label("Start chat (Floating action)").click()
await page.get_by_text("Start chat").click()
for user_email in PARTICIPANT_EMAILS:
await page.get_by_test_id("selection-list-text-input").fill(user_email)
await (
page.get_by_label(user_email)
.get_by_role("button", name="Add to group")
.click()
)
await page.locator("button").filter(has_text="Next").click()
await page.get_by_role("button", name="Start group").click()
await group_details.click()
await page.get_by_text("Members").click()
await page.reload()
await (
page.get_by_test_id("selection-list")
.get_by_label(PARTICIPANT_EMAILS[1])
.first.click()
)
await page.get_by_role("button", name="Remove from group").click()
await page.get_by_role("button", name="Remove", exact=True).click()
await page.get_by_test_id("ReportParticipantsPage").get_by_label("Back").click()
await page.get_by_test_id("ReportDetailsPage").get_by_label("Back").click()
welcome_message = (
page.get_by_test_id("report-actions-list")
.locator('div:has-text("This chat is with")')
.last
)
await expect(welcome_message).not_to_have_text(PARTICIPANT_EMAILS[1])
group_details = page.locator(
f'div[data-testid="DisplayNamesWithTooltip"]:has-text("{PARTICIPANT_EMAILS[0]}")'
).last
await group_details.last.click()
await page.get_by_label("Leave").click()
except Exception as e:
raise e
finally:
trace_path = "/app/expensify/user_tool/output_browser1.zip"
await context.tracing.stop(path=trace_path)
trace_cleaner(trace_path)
await context.close()
await browser.close()
@pytest.mark.asyncio
async def test_welcome_group_message():
async with async_playwright() as playwright:
await run(playwright)