def get_data()

in mozetl/bhr_collection/bhr_collection.py [0:0]


def get_data(sc, sql_context, config, date, end_date=None):
    sql_context.sql("set spark.sql.shuffle.partitions={}".format(sc.defaultParallelism))

    if end_date is None:
        end_date = date

    submission_start_str = date - timedelta(days=5)
    submission_end_str = end_date + timedelta(days=5)

    date_str = date.strftime("%Y%m%d")
    end_date_str = end_date.strftime("%Y%m%d")

    max_sample_slices = 10000
    sample_slices = int(
        max(min(max_sample_slices * config["sample_size"], max_sample_slices), 1)
    )

    sql = f"""
    SELECT
      environment,
      application,
      payload,
    FROM
      `moz-fx-data-shared-prod.telemetry_stable.bhr_v4`
    WHERE
      -- Use document_id to sample
      ABS(MOD(FARM_FINGERPRINT(document_id), {max_sample_slices})) < {sample_slices}
      AND submission_timestamp BETWEEN '{submission_start_str}' AND '{submission_end_str}'
    """

    pings_df = (
        spark.read.format("bigquery")
        .option("query", sql)
        .option("viewsEnabled", True)
        # sql results need to be saved to a table
        .option("materializationProject", "moz-fx-data-shared-prod")
        .option("materializationDataset", "tmp")
        .load()
    )

    if config["exclude_modules"]:
        properties = [
            "environment/system/os/name",
            "environment/system/os/version",
            "application/architecture",
            "application/build_id",
            "payload/hangs",
            "payload/time_since_last_ping",
        ]
    else:
        properties = [
            "environment/system/os/name",
            "environment/system/os/version",
            "application/architecture",
            "application/build_id",
            "payload/modules",
            "payload/hangs",
            "payload/time_since_last_ping",
        ]

    print("%d results total" % pings_df.rdd.count())
    mapped = pings_df.rdd.map(lambda p: get_ping_properties(p, properties)).filter(
        lambda p: properties_are_not_none(p, properties)
    )

    try:
        result = mapped.filter(
            lambda p: p["application/build_id"][:8] >= date_str
            and p["application/build_id"][:8] <= end_date_str
        )
        print("%d results after first filter" % result.count())
        return result
    except ValueError:
        return None