def extract_transform()

in mozetl/taar/taar_dynamo.py [0:0]


def extract_transform(spark, run_date, sample_rate=0):
    currentDate = run_date
    currentDateString = currentDate.strftime("%Y%m%d")
    print("Processing %s" % currentDateString)

    # Get the data for the desired date out of parquet
    template = "s3://telemetry-parquet/main_summary/v4/submission_date_s3=%s"
    datasetForDate = spark.read.parquet(template % currentDateString)

    if sample_rate is not None and sample_rate != 0:
        print("Sample rate set to %0.9f" % sample_rate)
        datasetForDate = datasetForDate.sample(False, sample_rate)
    else:
        print("No sampling on dataset")

    print("Parquet data loaded")

    # Get the most recent (client_id, subsession_start_date) tuple
    # for each client since the main_summary might contain
    # multiple rows per client. We will use it to filter out the
    # full table with all the columns we require.

    clientShortList = datasetForDate.select(
        "client_id",
        "subsession_start_date",
        row_number()
        .over(Window.partitionBy("client_id").orderBy(desc("subsession_start_date")))
        .alias("clientid_rank"),
    )
    print("clientShortList selected")
    clientShortList = clientShortList.where("clientid_rank == 1").drop("clientid_rank")
    print("clientShortList selected")

    select_fields = [
        "client_id",
        "subsession_start_date",
        "subsession_length",
        "city",
        "locale",
        "os",
        "places_bookmarks_count",
        "scalar_parent_browser_engagement_tab_open_event_count",
        "scalar_parent_browser_engagement_total_uri_count",
        "scalar_parent_browser_engagement_unique_domains_count",
        "active_addons",
        "disabled_addons_ids",
    ]
    dataSubset = datasetForDate.select(*select_fields)
    print("datasetForDate select fields completed")

    # Join the two tables: only the elements in both dataframes
    # will make it through.
    clientsData = dataSubset.join(
        clientShortList, ["client_id", "subsession_start_date"]
    )

    print("clientsData join with client_id and subsession_start_date")

    # Convert the DataFrame to JSON and get an RDD out of it.
    subset = clientsData.select("client_id", "subsession_start_date")

    print("clientsData select of client_id and subsession_start_date completed")

    jsonDataRDD = clientsData.select(
        "city",
        "subsession_start_date",
        "subsession_length",
        "locale",
        "os",
        "places_bookmarks_count",
        "scalar_parent_browser_engagement_tab_open_event_count",
        "scalar_parent_browser_engagement_total_uri_count",
        "scalar_parent_browser_engagement_unique_domains_count",
        "active_addons",
        "disabled_addons_ids",
    ).toJSON()

    print("jsonDataRDD selected")
    rdd = subset.rdd.zip(jsonDataRDD)
    print("subset rdd has been zipped")

    # Filter out any records with invalid dates or client_id
    filtered_rdd = rdd.filter(filterDateAndClientID)
    print("rdd filtered by date and client_id")

    # Transform the JSON elements into a 4-tuple as per docstring
    merged_filtered_rdd = filtered_rdd.map(list_transformer)
    print("rdd has been transformed into tuples")

    return merged_filtered_rdd