in source/scheduler/cdk/aws_solutions/scheduler/cdk/construct.py [0:0]
def __init__(self, scope: Construct, construct_id: str, sync: bool = True):
"""
Create a scheduler using AWS Step Functions
:param scope: the scope of this construct
:param construct_id: the ID of this construct
:param sync: synchronously invoke the scheduled item (otherwise, set to False for async)
"""
super().__init__(scope, construct_id)
self.sync = sync
self.scheduler_function = self._scheduler_function(scope, "GetNextTimestamp")
self.scheduler_function_environment = Environment(self.scheduler_function)
self.scheduler_table = self._scheduler_table(scope)
self._scheduler_child_state_machines: List[IStateMachine] = []
self._state_machine_namer = ResourceName(
self,
"SchedulerStateMachineName",
purpose="personalize-scheduler",
max_length=80,
)
# Layers required for the AWS Lambda Functions provisioned by the Scheduler construct
layer_powertools = PowertoolsLayer.get_or_create(self)
common_layers = [layer_powertools]
# CRUD tasks/ states to integrate with the Scheduler
self.create_scheduled_task = CreateScheduledTask(
self,
"create_scheduled_task",
layers=common_layers,
scheduler_table=self.scheduler_table,
state_machine_arn=self.state_machine_arn,
state_machine_executions_arn=self.state_machine_executions_arn,
)
self.read_scheduled_task = ReadScheduledTask(
self,
"read_scheduled_task",
layers=common_layers,
scheduler_table=self.scheduler_table,
state_machine_arn=self.state_machine_arn,
)
self.update_scheduled_task = UpdateScheduledTask(
self,
"update_scheduled_task",
layers=common_layers,
scheduler_table=self.scheduler_table,
state_machine_arn=self.state_machine_arn,
state_machine_executions_arn=self.state_machine_executions_arn,
)
self.delete_scheduled_task = DeleteScheduledTask(
self,
"delete_scheduled_task",
layers=common_layers,
scheduler_table=self.scheduler_table,
state_machine_arn=self.state_machine_arn,
)
read_scheduled_task_state = self.read_scheduled_task.state(
self,
"Load Scheduled Task",
payload=TaskInput.from_object({"name.$": TASK_NAME_PATH}),
result_path="$.task",
)
get_next_trigger_time = self.get_trigger("Get Next Trigger Time")
invoke_step_function = (
Wait(
self,
"Wait Until Schedule Trigger",
time=WaitTime.timestamp_path(TRIGGER_AT_PATH),
)
.next(
CustomState(
self,
"Invoke Step Function",
state_json=self._start_execution_task_json(
arn_to_invoke="$.task.state_machine.arn",
input="$.task.state_machine.input",
fallback=get_next_trigger_time,
),
)
)
.next(get_next_trigger_time)
.next(
CustomState(
self,
"Run Next Scheduled Task",
state_json=self._start_execution_task_json(
arn_to_invoke=self.state_machine_arn,
input={
"name.$": TASK_NAME_PATH,
"trigger_at.$": TRIGGER_AT_PATH,
},
allow_sync=False,
),
)
)
)
choice_get_next_trigger = Choice(self, "Trigger Time Provided?")
choice_get_next_trigger.when(
Condition.is_not_present(TRIGGER_AT_PATH),
self.get_trigger("Get Trigger Time"),
)
choice_get_next_trigger.afterwards().next(invoke_step_function)
choice_get_next_trigger.otherwise(invoke_step_function)
self._scheduler_definition = Chain.start(
read_scheduled_task_state.next(choice_get_next_trigger)
)
self.state_machine = StateMachine(
self,
"SchedulerStateMachine",
state_machine_name=self.state_machine_name,
definition=self._scheduler_definition,
tracing_enabled=True,
)
# permissions are applied at synthesis time based on the configuration of the scheduler
Aspects.of(self).add(SchedulerPermissionsAspect(self))