in source/code/handlers/schedule_handler.py [0:0]
def handle_scheduler_tasks(self, task_config):
started_tasks = {}
start = datetime.now()
last_run_dt = self._get_last_run()
self._logger.info(INFO_LAST_SAVED, last_run_dt.isoformat())
if self.configuration_update:
self._logger.info(INFO_CONFIG_RUN, self.updated_task)
current_dt = self._set_last_run()
already_ran_this_minute = last_run_dt == current_dt
if already_ran_this_minute and not (self.configuration_update or self.execute_task_request):
self._logger.info(INFO_TASK_SCHEDULER_ALREADY_RAN)
else:
self._logger.info(INFO_CURRENT_SCHEDULING_DT, current_dt)
task = None
enabled_tasks = 0
next_executed_task = None
utc = pytz.timezone("UTC")
tasks = [t for t in task_config.get_tasks() if
t.get(handlers.TASK_INTERVAL) is not None
and t.get(handlers.TASK_ENABLED, True)]
try:
for task in tasks:
enabled_tasks += 1
self._logger.debug_enabled = task[handlers.TASK_DEBUG]
task_name = task[handlers.TASK_NAME]
# timezone for specific task
task_timezone = pytz.timezone(task[handlers.TASK_TIMEZONE])
# create cron expression to test if task needs te be executed
task_cron_expression = CronExpression(expression=task[handlers.TASK_INTERVAL])
localized_last_run = last_run_dt.astimezone(task_timezone)
localized_current_dt = current_dt.astimezone(task_timezone)
next_execution = task_cron_expression.first_within_next(timedelta(hours=24), localized_current_dt)
next_execution_utc = next_execution.astimezone(utc).replace(
microsecond=0) if next_execution is not None else None
if next_execution_utc is not None:
if next_executed_task is None or next_execution_utc < next_executed_task[0]:
next_executed_task = (next_execution_utc, task)
if already_ran_this_minute:
continue
# test if task needs te be executed since last run of ops automator
execute_dt_since_last = task_cron_expression.last_since(localized_last_run, localized_current_dt)
if execute_dt_since_last is None:
if next_execution is not None:
next_execution = next_execution.astimezone(task_timezone)
self._logger.info(INFO_NEXT_EXECUTION, task_name, next_execution.isoformat(), task_timezone)
else:
self._logger.info(INFO_NO_NEXT_WITHIN, task_name)
continue
self._logger.info(INFO_SCHEDULED_TASK, task_name, execute_dt_since_last, task_timezone,
str(safe_json(task, indent=2)))
# create an event for lambda function that starts execution by selecting for resources for this task
task_group, sub_tasks = self._execute_task(task, execute_dt_since_last)
started_tasks[task_name] = {
"task-group": task_group,
"sub-tasks": sub_tasks
}
if started_tasks:
self._logger.info(INFO_STARTED_TASKS, enabled_tasks, ",".join(started_tasks))
else:
self._logger.info(INFO_NO_TASKS_STARTED, enabled_tasks)
self._set_next_schedule_event(current_dt, next_executed_task)
running_time = float((datetime.now() - start).total_seconds())
return safe_dict({
"datetime": datetime.now().isoformat(),
"running-time": running_time,
"event-datetime": current_dt.isoformat(),
"enabled_tasks": enabled_tasks,
"started-tasks": started_tasks
})
except ValueError as ex:
self._logger.error(ERR_SCHEDULE_HANDLER, ex, safe_json(task, indent=2))