in src/data_load/load.py [0:0]
def compute_reports(reports_file):
with open(reports_file) as file:
ingestion_results = json.load(file)
computed_baseline = []
for ingestion_name in ingestion_results:
part_results = ingestion_results.get(ingestion_name)
success = []
failed = []
incomplete = []
for result in part_results:
if result.get(STATUS) == FINISHED:
success.append(result)
elif result.get(STATUS) == FAILED:
failed.append(result)
else:
incomplete.append(result)
logger.info(f"Ingestion results {success}")
average_time_taken = sum([s.get(TIME_TAKEN)
for s in success]) / len(success)
min_start_time = min([s.get(START_TIME) for s in success])
max_end_time = max([s.get(END_TIME) for s in success])
computed_baseline.append({
"data_type": ingestion_name,
"time_taken_in_minutes": (max_end_time - min_start_time) / (60 * 1000),
"ingestion_runs_successful": len(success),
"ingestion_runs_failed": len(failed),
"ingestion_runs_incomplete": len(incomplete),
"avg_time_taken_per_dag_run_in_seconds": average_time_taken
})
with open('generated_reports.json', 'w') as f:
json.dump(computed_baseline, f, indent=4)
logger.info(f"Reports are generated and saved to {f.name}")