mozetl/system_check.py (4 lines of code) (raw):
""""A system check for testing integration of various libraries with mozetl.
This sub-module will print out relevant version info. It will also read data
from `main_summary` and print basic statistics to verify that the system is
correctly set-up.
"""
import sys
import click
import logging
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from mozetl.utils import (
format_as_submission_date,
format_spark_path,
stop_session_safely,
)
logging.basicConfig(level=logging.DEBUG)
@click.command()
@click.option("--local/--no-local", default=False)
@click.option(
"--submission-date-s3",
type=str,
default=format_as_submission_date(datetime.now() - timedelta(2)),
)
@click.option("--input-bucket", type=str, default="telemetry-parquet")
@click.option("--input-prefix", type=str, default="main_summary/v4")
@click.option("--output-bucket", type=str, default="telemetry-test-bucket")
@click.option("--output-prefix", type=str, default="mozetl_system_check")
def main(
local, submission_date_s3, input_bucket, input_prefix, output_bucket, output_prefix
):
# print argument information
for k, v in locals().items():
print("{}: {}".format(k, v))
print("Python version: {}".format(sys.version_info))
spark = SparkSession.builder.getOrCreate()
print("Spark version: {}".format(spark.version))
# run a basic count over a sample of `main_summary` from 2 days ago
if not local:
ds_nodash = submission_date_s3
input_path = format_spark_path(input_bucket, input_prefix)
output_path = format_spark_path(output_bucket, output_prefix)
print(
"Reading data for {ds_nodash} from {input_path} and writing to {output_path}".format(
ds_nodash=ds_nodash, input_path=input_path, output_path=output_path
)
)
path = "{}/submission_date_s3={}/sample_id={}".format(input_path, ds_nodash, 1)
subset = spark.read.parquet(path)
print("Saw {} documents".format(subset.count()))
summary = subset.select(
"memory_mb", "cpu_cores", "subsession_length"
).describe()
summary.show()
summary.write.parquet(
output_path + "/submission_date_s3={}/".format(ds_nodash), mode="overwrite"
)
stop_session_safely(spark)
print("Done!")