def calculate_cron_schedule()

in dagify/converter/utils.py [0:0]


def calculate_cron_schedule(task):
    """Function to calculate cron schedule for a given task"""
    timefrom = task.get_attribute("TIMEFROM")

    if not timefrom:
        return None

    schedule_interval = None
    minute = timefrom[2:]
    hour = timefrom[:2]
    weekdays = task.get_attribute("WEEKDAYS")
    if weekdays:
        day_of_week = ",".join(weekdays.split(","))
    else:
        day_of_week = "*"

    month_abbreviations = ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"]
    # Get the list of months that are set to "1"
    months = [i + 1 for i, month in enumerate(month_abbreviations) if task.get_attribute(month) == "1"]
    if months:
        months.sort()
        # Identify consecutive month ranges
        month_ranges = []
        current_range = [months[0]]
        for i in range(1, len(months)):
            if months[i] == current_range[-1] + 1:  # Check for consecutive months
                current_range.append(months[i])
            else:
                month_ranges.append(current_range)  # Start a new range if not consecutive
                current_range = [months[i]]
        month_ranges.append(current_range)  # Add the last range

        month_parts = []
        for r in month_ranges:
            if len(r) == 1:
                month_parts.append(str(r[0]))  # Single month
            else:
                month_parts.append(f"{r[0]}-{r[-1]}")  # Month range

        month_schedule = ",".join(month_parts)
    else:
        month_schedule = "*"

    schedule_interval = [minute, hour, "*", month_schedule, day_of_week]  # Day of month to start is set to "*"
    schedule_interval = " ".join(schedule_interval)
    return schedule_interval