in jobs/extensions/extensions/main.py [0:0]
def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
args = parser.parse_args()
# Get DAG logical date
logical_dag_date = datetime.strptime(args.date, "%Y-%m-%d").date()
logical_dag_date_string = logical_dag_date.strftime("%Y-%m-%d")
# Initialize an empty set to hold links that have already been processed
links_already_processed = set()
# Get all unique links found on the CHROME_WEBSTORE_URL (excluding links to ignore)
driver = initialize_driver(DRIVER_TYP, BINARY_LOC, DRIVER_PATH)
unique_links_on_chrome_webstore_page = get_unique_links_from_webpage(
url=CHROME_WEBSTORE_URL,
base_url=CHROME_WEBSTORE_URL,
links_to_ignore=LIST_OF_LINKS_TO_IGNORE,
links_to_not_process=links_already_processed,
driver=driver,
)
# Add on the additional link to grab
unique_links_on_chrome_webstore_page.append(ADDITIONAL_LINK_TO_GRAB)
# Add CHROME_WEBSTORE_URL to list of already processed so we don't process it again
links_already_processed.add(CHROME_WEBSTORE_URL)
links_already_processed.add(CHROME_WEBSTORE_URL + "/")
# If the chrome webstore URL is in there again with a slash on the end,
# then remove from list to not process twice
unique_links_on_chrome_webstore_page.remove(CHROME_WEBSTORE_URL + "/")
print(len(unique_links_on_chrome_webstore_page))
# Get the final cleaned up list of links on the main page we want to process
# excluding anything that starts with accounts
main_page_links_to_process = []
for link in unique_links_on_chrome_webstore_page:
if not link.startswith("https://accounts.google.com"):
main_page_links_to_process.append(link)
del unique_links_on_chrome_webstore_page
# Initialize a dataframe to store extension level results
results_df = initialize_results_df()
# Loop through the links found on the main page of the Chrome Webstore
for idx, current_link in enumerate(main_page_links_to_process):
print("Currently processing link: ", current_link)
percent_done = (idx + 1) / len(main_page_links_to_process) * 100
if idx % 5 == 0 or idx == len(main_page_links_to_process) - 1:
print(
f"""Progress: {percent_done:.1f}% ({idx + 1} of
{len(main_page_links_to_process)})"""
)
# Check if the link is a "detail page" or a "non detail page"
is_detail_page = check_if_detail_or_non_detail_page(current_link)
# If the link is a detail page and not already processed
if is_detail_page:
print("link is a detail page")
if current_link not in links_already_processed:
print("link is not yet processed, pulling data...")
# Get the data from that page
detail_page_results_df = pull_data_from_detail_page(
url=current_link,
timeout_limit=TIMEOUT_IN_SECONDS,
current_date=logical_dag_date_string,
)
# Append the data scraped to results_df
results_df = pd.concat([results_df, detail_page_results_df])
# Add the detail page link to links already processed
links_already_processed.add(current_link)
else:
print("link is already processed")
# If this link is not a detail page
else:
print("Link is not a detail page.")
# Get the HTML from the non detail page after clicking
# Load more button, plus update links already processed
# Initialize a driver
driver = initialize_driver(DRIVER_TYP, BINARY_LOC, DRIVER_PATH)
print("Getting links from the non detail page...")
detail_links_found, non_detail_links_found = get_links_from_non_detail_page(
current_link,
links_already_processed,
MAX_CLICKS,
LIST_OF_LINKS_TO_IGNORE,
driver,
)
# Print # of detail and non detail links found
print("# detail links found on page: ", str(len(detail_links_found)))
print("# non detail links found: ", str(len(non_detail_links_found)))
# Loop through each link on this page
print("Looping through detail links found...")
for detail_link in detail_links_found:
print("Processing detail link: ", detail_link)
try:
# Get the data from that page
detail_page_results_df = pull_data_from_detail_page(
url=detail_link,
timeout_limit=TIMEOUT_IN_SECONDS,
current_date=logical_dag_date_string,
)
# Append the data scraped to results_df
results_df = pd.concat([results_df, detail_page_results_df])
except Exception:
print(f"Failed to process detail page: {detail_link}")
links_already_processed.add(detail_link)
print("Done looping through detail links found.")
# Loop through all the non detail links found
print("Looping through non detail links found...")
for non_detail_link in non_detail_links_found:
print("Current non detail link: ", non_detail_link)
if non_detail_link in links_already_processed:
print("Already processed, not processing again")
else:
print("Processing non_detail_link: ", non_detail_link)
# Initialize driver below
driver = initialize_driver(DRIVER_TYP, BINARY_LOC, DRIVER_PATH)
# Try again to get the detail links
(
next_level_detail_links_found,
next_level_non_detail_links_found,
) = get_links_from_non_detail_page(
non_detail_link,
links_already_processed,
MAX_CLICKS,
LIST_OF_LINKS_TO_IGNORE,
driver,
)
for next_level_detail_link_found in next_level_detail_links_found:
try:
# Get the data from that page
detail_page_results_df = pull_data_from_detail_page(
url=next_level_detail_link_found,
timeout_limit=TIMEOUT_IN_SECONDS,
current_date=logical_dag_date_string,
)
# Append the data scraped to results_df
results_df = pd.concat([results_df, detail_page_results_df])
except Exception:
print(f"Failed to process: {next_level_detail_link_found}")
links_already_processed.add(next_level_detail_link_found)
for (
next_level_non_detail_link_found
) in next_level_non_detail_links_found:
print("Not processing: ", next_level_non_detail_link_found)
print("Currently only scrape 2 levels deep")
# Remove duplicates
results_df = results_df.drop_duplicates()
# Output summary
print(f"Scraped {len(results_df)} rows from {len(links_already_processed)} pages.")
# Write data to CSV in GCS
final_results_fpath = GCS_BUCKET + RESULTS_FPATH % (logical_dag_date_string)
results_df.to_csv(final_results_fpath, index=False)
print("Results written to: ", str(final_results_fpath))
# Write data to BQ table
# Open a connection to BQ
client = bigquery.Client(TARGET_PROJECT)
# If data already ran for this date, delete out
delete_query = f"""DELETE FROM
`moz-fx-data-shared-prod.external_derived.chrome_extensions_v1`
WHERE submission_date = '{logical_dag_date_string}'"""
del_job = client.query(delete_query)
del_job.result()
# Load data from GCS to BQ table - appending to what is already there
load_csv_to_gcp_job = client.load_table_from_uri(
final_results_fpath,
TARGET_TABLE,
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_NEVER",
write_disposition="WRITE_APPEND",
schema=[
{"name": "submission_date", "type": "DATE", "mode": "NULLABLE"},
{"name": "url", "type": "STRING", "mode": "NULLABLE"},
{"name": "chrome_extension_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "star_rating", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "number_of_ratings", "type": "STRING", "mode": "NULLABLE"},
{"name": "number_of_users", "type": "STRING", "mode": "NULLABLE"},
{"name": "extension_version", "type": "STRING", "mode": "NULLABLE"},
{"name": "extension_size", "type": "STRING", "mode": "NULLABLE"},
{"name": "extension_languages", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_desc", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_email", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_website", "type": "STRING", "mode": "NULLABLE"},
{"name": "developer_phone", "type": "STRING", "mode": "NULLABLE"},
{
"name": "extension_updated_date",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "category",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "trader_status",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "featured",
"type": "BOOLEAN",
"mode": "NULLABLE",
},
{
"name": "verified_domain",
"type": "STRING",
"mode": "NULLABLE",
},
{
"name": "manifest_json",
"type": "JSON",
"mode": "NULLABLE",
},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_csv_to_gcp_job.result()