in dagify/converter/report_generator.py [0:0]
def check_schedules(self, dag_divider):
"""Function to check schedules exist and generate reprot table"""
title = "Updated Job Schedules"
columns = ["JOB NAME", "DAG DIVIDER", "CHANGE IN SCHEDULE", "ORIGINAL JOB SCHEDULE", "DAG SCHEDULE"]
rows = []
prev_divider = None
dag_schedule = None
universal_format = self.uf
tasks = universal_format.get_tasks()
for tIdx, task in enumerate(tasks):
current_divider = task.get_attribute(dag_divider)
if not prev_divider:
prev_divider = current_divider
if prev_divider and prev_divider != current_divider: # New DAG and schedule will be created
prev_divider = current_divider
dag_schedule = None # reset schedule to empty
if not dag_schedule: # first schedule for this dag , defined for all jobs with this divider
dag_schedule = calculate_cron_schedule(task)
if not dag_schedule:
dag_schedule = "@daily"
if current_divider == prev_divider: # Tabulate if schedule varies or not for a job under same dag_divider
current_schedule = calculate_cron_schedule(task)
job_name = task.get_attribute("JOBNAME")
if current_schedule != dag_schedule:
rows.append((job_name, current_divider, "YES", current_schedule, dag_schedule))
else:
rows.append((job_name, current_divider, "NO", current_schedule, dag_schedule))
return title, columns, rows