in dagify/converter/utils.py [0:0]
def calculate_cron_schedule(task):
"""Function to calculate cron schedule for a given task"""
timefrom = task.get_attribute("TIMEFROM")
if not timefrom:
return None
schedule_interval = None
minute = timefrom[2:]
hour = timefrom[:2]
weekdays = task.get_attribute("WEEKDAYS")
if weekdays:
day_of_week = ",".join(weekdays.split(","))
else:
day_of_week = "*"
month_abbreviations = ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"]
# Get the list of months that are set to "1"
months = [i + 1 for i, month in enumerate(month_abbreviations) if task.get_attribute(month) == "1"]
if months:
months.sort()
# Identify consecutive month ranges
month_ranges = []
current_range = [months[0]]
for i in range(1, len(months)):
if months[i] == current_range[-1] + 1: # Check for consecutive months
current_range.append(months[i])
else:
month_ranges.append(current_range) # Start a new range if not consecutive
current_range = [months[i]]
month_ranges.append(current_range) # Add the last range
month_parts = []
for r in month_ranges:
if len(r) == 1:
month_parts.append(str(r[0])) # Single month
else:
month_parts.append(f"{r[0]}-{r[-1]}") # Month range
month_schedule = ",".join(month_parts)
else:
month_schedule = "*"
schedule_interval = [minute, hour, "*", month_schedule, day_of_week] # Day of month to start is set to "*"
schedule_interval = " ".join(schedule_interval)
return schedule_interval