in mozetl/bhr_collection/bhr_collection.py [0:0]
def etl_job_daily(sc, sql_context, config=None):
final_config = {}
final_config.update(default_config)
if config is not None:
final_config.update(config)
if final_config["hang_profile_out_filename"] is None:
final_config["hang_profile_out_filename"] = final_config[
"hang_profile_in_filename"
]
iterations = (final_config["end_date"] - final_config["start_date"]).days + 1
job_start = time.time()
current_date = None
transformed = None
usage_hours = None
for x in range(iterations):
iteration_start = time.time()
current_date = final_config["start_date"] + timedelta(days=x)
date_str = current_date.strftime("%Y%m%d")
data = time_code(
"Getting data",
lambda: get_data(sc, sql_context, final_config, current_date),
)
if data is None:
print("No data")
continue
transformed, usage_hours = transform_pings(sc, data, final_config)
profile_processor = ProfileProcessor(final_config)
profile_processor.ingest(transformed, usage_hours)
profile = profile_processor.process_into_profile()
filepath = "%s_%s" % (final_config["hang_profile_out_filename"], date_str)
print("writing file %s" % filepath)
write_file(filepath, profile, final_config)
filepath = "%s_current" % final_config["hang_profile_out_filename"]
print("writing file %s" % filepath)
write_file(filepath, profile, final_config)
gc.collect()
print_progress(job_start, iterations, x, iteration_start, date_str)