jobs/extensions/extensions/main.py (511 lines of code) (raw):
# Load libraries
import pandas as pd
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import re
from argparse import ArgumentParser
from google.cloud import bigquery
from urllib.parse import urljoin
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver as ChromiumDriver
from selenium import webdriver
# Main website for Chrome Webstore
CHROME_WEBSTORE_URL = "https://chromewebstore.google.com"
ADDITIONAL_LINK_TO_GRAB = "https://chromewebstore.google.com/category/extensions"
# Links to ignore from the Chrome Webstore page
LIST_OF_LINKS_TO_IGNORE = [
"https://chromewebstore.google.com/user/installed",
"https://chrome.google.com/webstore/devconsole/",
"https://www.google.com/intl/en/about/products",
"https://support.google.com/chrome_webstore/answer/1047776?hl=en-US",
"https://myaccount.google.com/privacypolicy?hl=en-US",
"https://support.google.com/chrome_webstore?hl=en-US",
"https://accounts.google.com/TOS?loc=US&hl=en-US",
"https://support.google.com/accounts?hl=en-US&p=account_iph",
"https://support.google.com/accounts?p=signin_privatebrowsing&hl=en-US",
"https://ssl.gstatic.com/chrome/webstore/intl/en-US/gallery_tos.html",
"https://support.google.com/chrome_webstore/answer"
+ "/12225786?p=cws_reviews_results&hl=en-US",
"https://www.google.com/chrome/?brand=GGRF&utm_source=google.com"
+ "&utm_medium=material-callout&utm_campaign=cws&utm_keyword=GGRF",
"./",
]
# Define where to write the data in GCS and BQ and other variables
TARGET_PROJECT = "moz-fx-data-shared-prod"
TARGET_TABLE = "moz-fx-data-shared-prod.external_derived.chrome_extensions_v1"
GCS_BUCKET = "gs://moz-fx-data-prod-external-data/"
RESULTS_FPATH = "CHROME_EXTENSIONS/chrome_extensions_%s.csv"
TIMEOUT_IN_SECONDS = 10
MAX_CLICKS = 40 # Max load more button clicks
DRIVER_TYP = "Chrome" # "Chromium"
BINARY_LOC = "/usr/bin/google-chrome-stable" # "/usr/bin/chromium"
DRIVER_PATH = "/usr/local/bin/chromedriver" "" # "/usr/bin/chromedriver"
# --------------DEFINE REUSABLE FUNCTIONS------------------------
def initialize_driver(driver_type, binary_location, driver_path):
"""Inputs: Driver type (Chrome or Chromium), binary location, driver path
Outputs: A webdriver
"""
if driver_type == "Chromium":
options = Options()
options.binary_location = binary_location
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920,1080")
driver = ChromiumDriver(service=Service(driver_path), options=options)
elif driver_type == "Chrome":
options = Options()
options.binary_location = binary_location
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(driver_path), options=options)
else:
raise ValueError("DRIVER_TYPE needs to be either Chrome or Chromium")
return driver
def get_unique_links_from_webpage(
url, base_url, links_to_ignore, links_to_not_process, driver
):
"""Output: List of links found on webpage"""
driver.get(url)
time.sleep(3) # Wait for JS to load content
soup = BeautifulSoup(driver.page_source, "html.parser")
driver.quit()
links = [urljoin(base_url, a["href"]) for a in soup.find_all("a", href=True)]
unique_links = []
for link in links:
if (
link not in unique_links
and link not in links_to_ignore
and link not in links_to_not_process
):
unique_links.append(link)
return unique_links
def get_unique_links_from_html(html, base_url, links_to_ignore, links_to_not_process):
"""Input: Raw HTML string, base URL, links to ignore, and links already processed
Output: List of unique, absolute links on that page"""
soup = BeautifulSoup(html, "html.parser")
links = [urljoin(base_url, a["href"]) for a in soup.find_all("a", href=True)]
unique_links = []
for link in links:
if (
link not in unique_links
and link not in links_to_ignore
and link not in links_to_not_process
):
unique_links.append(link)
return unique_links
# Function to get the soup returned from a given page
def get_soup_from_webpage(webpage_url, timeout_seconds):
"""Input: Webpage URL, timeout
Output: BeautifulSoup class"""
response = requests.get(webpage_url, timeout=timeout_seconds)
soup_from_webpage = BeautifulSoup(response.text, "html.parser")
return soup_from_webpage
# Function to get all paragraphs from a soup
def get_paragraphs_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: List of paragraphs found in the soup"""
paragraphs = [p.text for p in webpage_soup.find_all("p")]
return paragraphs
def get_h1_headers_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: List of H1 Elements Found"""
headers = [h1.text for h1 in webpage_soup.find_all("h1")]
return headers
def get_h2_headers_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: List of H2 Elements Found
"""
headers = [h2.text for h2 in webpage_soup.find_all("h2")]
return headers
def get_divs_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: List of H2 Elements Found
"""
divs = [div.text for div in webpage_soup.find_all("div")]
return divs
def get_spans_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: List of Span Elements Found"""
spans = [span.text for span in webpage_soup.find_all("span")]
return spans
def get_website_url_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: Website URL (str) if found, otherwise return None"""
website_url = None
website_links = webpage_soup.find_all("a")
for website_link in website_links:
if website_link.has_attr("href") and "Website" in website_link.text:
website_url = website_link["href"]
return website_url
def get_category_from_soup(webpage_soup):
"""Input: Webpage Soup
Output: Category of the extension if found, otherwise return None"""
category = None
website_links = webpage_soup.find_all("a")
for link_nbr, website_link in enumerate(website_links):
if (
website_link.has_attr("href")
and "Extension" in website_link.text
and link_nbr + 1 < len(website_links)
):
# Get the next link text
category = website_links[link_nbr + 1].text.strip()
return category
def get_verified_domain(soup):
"""Input: soup (Class bs4.BeautifulSoup)
Output: verified_domain string if found, else None"""
target_text = "Created by the owner of the listed website. The publisher has a good record with no history of violations."
# Find the tag that contains the exact text
target_container = soup.find(
lambda tag: tag.name in ["div", "span"]
and target_text in tag.get_text(strip=True)
)
if not target_container:
return None
# Look for all <a> tags after this element
for a in target_container.find_all_next("a"):
href = a.get("href", "")
# Return the first non-Google-support href
if not href.startswith("https://support.google.com/chrome_webstore"):
return href
return None
def initialize_results_df():
"""Returns a dataframe with 0 rows with the desired format"""
results_df = pd.DataFrame(
columns=[
"submission_date",
"url",
"chrome_extension_name",
"star_rating",
"number_of_ratings",
"number_of_users",
"extension_version",
"extension_size",
"extension_languages",
"developer_desc",
"developer_email",
"developer_website",
"developer_phone",
"extension_updated_date",
"category",
"trader_status",
"featured",
"verified_domain",
"manifest_json",
]
)
return results_df
def check_if_detail_or_non_detail_page(url):
"""Input: A URL (string)
Output: Boolean, indicating if the URL contains /detail/ in it"""
detail_page = False
if "/detail/" in url:
detail_page = True
return detail_page
def check_if_load_more_button_present(webpage_soup):
"""Input: Webpage Soup
Output: Boolean indicating if load more button is on the page"""
buttons = webpage_soup.find_all("button")
for button in buttons:
if "Load more" in button.get_text(strip=True):
return True
return False
def get_links_from_non_detail_page(
url, list_of_links_already_processed, max_clicks, links_to_ignore_list, driver
):
# Wait for the driver to load the page
wait = WebDriverWait(driver, 10)
# Get the URL and wait 2 seconds
driver.get(url)
time.sleep(2)
# Initialize click count to 0 clicks
click_count = 0
# Click "Load more" until it's gone or max clicks reached
while click_count < max_clicks:
try:
load_more_button = wait.until(
EC.element_to_be_clickable(
(By.XPATH, '//button//span[contains(text(), "Load more")]')
)
)
driver.execute_script("arguments[0].click();", load_more_button)
print(f"[{click_count+1}] Clicked 'Load more'")
time.sleep(2)
click_count += 1
except Exception:
print("No more 'Load more' button or timeout.")
break
# Scroll to bottom to trigger lazy loading
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(4)
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
# Get set of links already processed
processed_set = set(list_of_links_already_processed)
# Get all extension card links (those that have '/detail/' in href)
all_links = driver.find_elements(By.TAG_NAME, "a")
extension_links = [
link.get_attribute("href")
for link in all_links
if link.get_attribute("href") and "/detail/" in link.get_attribute("href")
]
unique_extension_links = list(set(extension_links) - processed_set)
all_href_links = [
link.get_attribute("href") for link in all_links if link.get_attribute("href")
]
unique_all_links = set(all_href_links)
unique_non_extension_links = list(
unique_all_links
- set(unique_extension_links)
- processed_set
- set(links_to_ignore_list)
)
driver.quit()
return unique_extension_links, unique_non_extension_links
def pull_data_from_detail_page(url, timeout_limit, current_date):
"""Input: URL, timeout limit (integer), and current date"""
# Initialize as empty strings
number_of_ratings = None
chrome_extension_name = None
star_rating = None
number_of_users = None
extension_version = None
extension_size = None
extension_languages = None
developer_desc = None
developer_email = None
developer_website = None
developer_phone = None
extension_updated_date = None
trader_status = None
featured = False
manifest_json = None
# Get the soup from the current link
current_link_soup = get_soup_from_webpage(
webpage_url=url, timeout_seconds=timeout_limit
)
# Get paragraphs & headers from the current link
paragraphs_from_current_link_soup = get_paragraphs_from_soup(current_link_soup)
headers_from_current_link_soup = get_h1_headers_from_soup(current_link_soup)
h2_headers_from_current_link_soup = get_h2_headers_from_soup(current_link_soup)
divs_from_current_link_soup = get_divs_from_soup(current_link_soup)
spans_from_current_link_soup = get_spans_from_soup(current_link_soup)
# Get the developer website URL
developer_website = get_website_url_from_soup(current_link_soup)
# Get the number of ratings
for paragraph in paragraphs_from_current_link_soup:
# Check if this has the ratings information
if paragraph.endswith("ratings"):
# Get portion before the space
number_of_ratings = paragraph.split(" ")[0]
# Get the extension name
for header in headers_from_current_link_soup:
chrome_extension_name = header
# Get the star rating
for h2_header in h2_headers_from_current_link_soup:
if "out of 5" in h2_header:
pattern = r"^.*?out of 5"
match = re.search(pattern, h2_header)
if match:
star_rating = match.group(0).split(" ")[0]
# Loop through the divs
for div in divs_from_current_link_soup:
# Get the number of users
if " users" in div:
pattern = r"(\d{1,3}(?:,\d{3})*|\d+) (?=users)"
match = re.search(pattern, div)
if match:
number_of_users = match.group(0).split(" ")[0].replace(",", "")
if "Non-trader" == div:
trader_status = "Non-trader"
if "Trader" == div.strip():
trader_status = "Trader"
# Loop through spans
for span in spans_from_current_link_soup:
if "Featured" == span.strip():
featured = True
# Loop through divs
for index, div in enumerate(divs_from_current_link_soup):
# If you see updated in div and it's not the last div found
if "Updated" in div and index + 1 < len(divs_from_current_link_soup):
# The next div should have the extension_updated_date
extension_updated_date = divs_from_current_link_soup[index + 1]
if "Version" in div and index + 1 < len(divs_from_current_link_soup):
# The next div should have the extension version
extension_version = divs_from_current_link_soup[index + 1]
if "Size" in div and index + 1 < len(divs_from_current_link_soup):
# The next div should have the extension size
extension_size = divs_from_current_link_soup[index + 1]
if "Languages" in div and index + 1 < len(divs_from_current_link_soup):
# The next div should have language info
extension_languages = divs_from_current_link_soup[index + 1]
# Get all divs
all_divs = current_link_soup.find_all("div")
# Loop through each div
for idx, div in enumerate(all_divs):
developer_info = None
# If the div is developer and it's not the last div
if div.text.strip() == "Developer" and idx + 1 < len(all_divs):
# Get the next div after Developer
next_div = all_divs[idx + 1]
# Find the first nested tag
first_nested_tag = next_div.find()
# If there is a first nested tag
if first_nested_tag:
# Get developer info as all the text from that first nested tag
developer_info = first_nested_tag.get_text(separator="\n", strip=True)
# If website is in developer info
if "Website" in developer_info:
# Split on website, first part is developer desc
developer_desc = developer_info.split("Website")[0].replace(
"\n", " "
)
# If email is in developer info
if "Email" in developer_info:
developer_email_and_phone = (
developer_info.split("Email")[1].replace("\n", " ").strip()
)
# If phone is there, get developer email and phone
if "Phone" in developer_email_and_phone:
developer_email_and_phone_list = (
developer_email_and_phone.split("Phone")
)
developer_email = developer_email_and_phone_list[0]
developer_phone = developer_email_and_phone_list[1]
# If phone is not there, only get developer email
else:
developer_email = developer_email_and_phone
category = get_category_from_soup(current_link_soup)
verified_domain = get_verified_domain(current_link_soup)
# NOTE - Still need to add logic for manifest json
# Put the results into a dataframe
curr_link_results_df = pd.DataFrame(
{
"submission_date": [current_date],
"url": [url],
"chrome_extension_name": [chrome_extension_name],
"star_rating": [star_rating],
"number_of_ratings": [number_of_ratings],
"number_of_users": [number_of_users],
"extension_version": [extension_version],
"extension_size": [extension_size],
"extension_languages": [extension_languages],
"developer_desc": [developer_desc],
"developer_email": [developer_email],
"developer_website": [developer_website],
"developer_phone": [developer_phone],
"extension_updated_date": [extension_updated_date],
"category": [category],
"trader_status": [trader_status],
"featured": [featured],
"verified_domain": [verified_domain],
"manifest_json": [manifest_json],
}
)
return curr_link_results_df
def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
args = parser.parse_args()
# Get DAG logical date
logical_dag_date = datetime.strptime(args.date, "%Y-%m-%d").date()
logical_dag_date_string = logical_dag_date.strftime("%Y-%m-%d")
# Initialize an empty set to hold links that have already been processed
links_already_processed = set()
# Get all unique links found on the CHROME_WEBSTORE_URL (excluding links to ignore)
driver = initialize_driver(DRIVER_TYP, BINARY_LOC, DRIVER_PATH)
unique_links_on_chrome_webstore_page = get_unique_links_from_webpage(
url=CHROME_WEBSTORE_URL,
base_url=CHROME_WEBSTORE_URL,
links_to_ignore=LIST_OF_LINKS_TO_IGNORE,
links_to_not_process=links_already_processed,
driver=driver,
)
# Add on the additional link to grab
unique_links_on_chrome_webstore_page.append(ADDITIONAL_LINK_TO_GRAB)
# Add CHROME_WEBSTORE_URL to list of already processed so we don't process it again
links_already_processed.add(CHROME_WEBSTORE_URL)
links_already_processed.add(CHROME_WEBSTORE_URL + "/")
# If the chrome webstore URL is in there again with a slash on the end,
# then remove from list to not process twice
unique_links_on_chrome_webstore_page.remove(CHROME_WEBSTORE_URL + "/")
print(len(unique_links_on_chrome_webstore_page))
# Get the final cleaned up list of links on the main page we want to process
# excluding anything that starts with accounts
main_page_links_to_process = []
for link in unique_links_on_chrome_webstore_page:
if not link.startswith("https://accounts.google.com"):
main_page_links_to_process.append(link)
del unique_links_on_chrome_webstore_page
# Initialize a dataframe to store extension level results
results_df = initialize_results_df()
# Loop through the links found on the main page of the Chrome Webstore
for idx, current_link in enumerate(main_page_links_to_process):
print("Currently processing link: ", current_link)
percent_done = (idx + 1) / len(main_page_links_to_process) * 100
if idx % 5 == 0 or idx == len(main_page_links_to_process) - 1:
print(
f"""Progress: {percent_done:.1f}% ({idx + 1} of
{len(main_page_links_to_process)})"""
)
# Check if the link is a "detail page" or a "non detail page"
is_detail_page = check_if_detail_or_non_detail_page(current_link)
# If the link is a detail page and not already processed
if is_detail_page:
print("link is a detail page")
if current_link not in links_already_processed:
print("link is not yet processed, pulling data...")
# Get the data from that page
detail_page_results_df = pull_data_from_detail_page(
url=current_link,
timeout_limit=TIMEOUT_IN_SECONDS,
current_date=logical_dag_date_string,
)
# Append the data scraped to results_df
results_df = pd.concat([results_df, detail_page_results_df])
# Add the detail page link to links already processed
links_already_processed.add(current_link)
else:
print("link is already processed")
# If this link is not a detail page
else:
print("Link is not a detail page.")
# Get the HTML from the non detail page after clicking
# Load more button, plus update links already processed
# Initialize a driver
driver = initialize_driver(DRIVER_TYP, BINARY_LOC, DRIVER_PATH)
print("Getting links from the non detail page...")
detail_links_found, non_detail_links_found = get_links_from_non_detail_page(
current_link,
links_already_processed,
MAX_CLICKS,
LIST_OF_LINKS_TO_IGNORE,
driver,
)
# Print # of detail and non detail links found
print("# detail links found on page: ", str(len(detail_links_found)))
print("# non detail links found: ", str(len(non_detail_links_found)))
# Loop through each link on this page
print("Looping through detail links found...")
for detail_link in detail_links_found:
print("Processing detail link: ", detail_link)
try:
# Get the data from that page
detail_page_results_df = pull_data_from_detail_page(
url=detail_link,
timeout_limit=TIMEOUT_IN_SECONDS,
current_date=logical_dag_date_string,
)
# Append the data scraped to results_df
results_df = pd.concat([results_df, detail_page_results_df])
except Exception:
print(f"Failed to process detail page: {detail_link}")
links_already_processed.add(detail_link)
print("Done looping through detail links found.")
# Loop through all the non detail links found
print("Looping through non detail links found...")
for non_detail_link in non_detail_links_found:
print("Current non detail link: ", non_detail_link)
if non_detail_link in links_already_processed:
print("Already processed, not processing again")
else:
print("Processing non_detail_link: ", non_detail_link)
# Initialize driver below
driver = initialize_driver(DRIVER_TYP, BINARY_LOC, DRIVER_PATH)
# Try again to get the detail links
(
next_level_detail_links_found,
next_level_non_detail_links_found,
) = get_links_from_non_detail_page(
non_detail_link,
links_already_processed,
MAX_CLICKS,
LIST_OF_LINKS_TO_IGNORE,
driver,
)
for next_level_detail_link_found in next_level_detail_links_found:
try:
# Get the data from that page
detail_page_results_df = pull_data_from_detail_page(
url=next_level_detail_link_found,
timeout_limit=TIMEOUT_IN_SECONDS,
current_date=logical_dag_date_string,
)
# Append the data scraped to results_df
results_df = pd.concat([results_df, detail_page_results_df])
except Exception:
print(f"Failed to process: {next_level_detail_link_found}")
links_already_processed.add(next_level_detail_link_found)
for (
next_level_non_detail_link_found
) in next_level_non_detail_links_found:
print("Not processing: ", next_level_non_detail_link_found)
print("Currently only scrape 2 levels deep")
# Remove duplicates
results_df = results_df.drop_duplicates()
# Output summary
print(f"Scraped {len(results_df)} rows from {len(links_already_processed)} pages.")
# Write data to CSV in GCS
final_results_fpath = GCS_BUCKET + RESULTS_FPATH % (logical_dag_date_string)
results_df.to_csv(final_results_fpath, index=False)
print("Results written to: ", str(final_results_fpath))
# Write data to BQ table
# Open a connection to BQ
client = bigquery.Client(TARGET_PROJECT)
# If data already ran for this date, delete out
delete_query = f"""DELETE FROM
`moz-fx-data-shared-prod.external_derived.chrome_extensions_v1`
WHERE submission_date = '{logical_dag_date_string}'"""
del_job = client.query(delete_query)
del_job.result()
# Load data from GCS to BQ table - appending to what is already there
load_csv_to_gcp_job = client.load_table_from_uri(
final_results_fpath,
TARGET_TABLE,
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_NEVER",
write_disposition="WRITE_APPEND",
schema=[
{"name": "submission_date", "type": "DATE", "mode": "NULLABLE"},
{"name": "url", "type": "STRING", "mode": "NULLABLE"},
{"name": "chrome_extension_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "star_rating", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "number_of_ratings", "type": "STRING", "mode": "NULLABLE"},
{"name": "number_of_users", "type": "STRING", "mode": "NULLABLE"},
{"name": "extension_version", "type": "STRING", "mode": "NULLABLE"},
{"name": "extension_size", "type": "STRING", "mode": "NULLABLE"},
{"name": "extension_languages", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_desc", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_email", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_website", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_phone", "type": "STRING", "mode": "NULLABLE"},
{
"name": "extension_updated_date",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "category",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "trader_status",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "featured",
"type": "BOOLEAN",
"mode": "NULLABLE",
},
{
"name": "verified_domain",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "manifest_json",
"type": "JSON",
"mode": "NULLABLE",
},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_csv_to_gcp_job.result()
if __name__ == "__main__":
main()