def __init__()

in source/scheduler/cdk/aws_solutions/scheduler/cdk/construct.py [0:0]


    def __init__(self, scope: Construct, construct_id: str, sync: bool = True):
        """
        Create a scheduler using AWS Step Functions
        :param scope: the scope of this construct
        :param construct_id: the ID of this construct
        :param sync: synchronously invoke the scheduled item (otherwise, set to False for async)
        """
        super().__init__(scope, construct_id)

        self.sync = sync
        self.scheduler_function = self._scheduler_function(scope, "GetNextTimestamp")
        self.scheduler_function_environment = Environment(self.scheduler_function)
        self.scheduler_table = self._scheduler_table(scope)

        self._scheduler_child_state_machines: List[IStateMachine] = []
        self._state_machine_namer = ResourceName(
            self,
            "SchedulerStateMachineName",
            purpose="personalize-scheduler",
            max_length=80,
        )

        # Layers required for the AWS Lambda Functions provisioned by the Scheduler construct
        layer_powertools = PowertoolsLayer.get_or_create(self)
        common_layers = [layer_powertools]

        # CRUD tasks/ states to integrate with the Scheduler
        self.create_scheduled_task = CreateScheduledTask(
            self,
            "create_scheduled_task",
            layers=common_layers,
            scheduler_table=self.scheduler_table,
            state_machine_arn=self.state_machine_arn,
            state_machine_executions_arn=self.state_machine_executions_arn,
        )
        self.read_scheduled_task = ReadScheduledTask(
            self,
            "read_scheduled_task",
            layers=common_layers,
            scheduler_table=self.scheduler_table,
            state_machine_arn=self.state_machine_arn,
        )
        self.update_scheduled_task = UpdateScheduledTask(
            self,
            "update_scheduled_task",
            layers=common_layers,
            scheduler_table=self.scheduler_table,
            state_machine_arn=self.state_machine_arn,
            state_machine_executions_arn=self.state_machine_executions_arn,
        )
        self.delete_scheduled_task = DeleteScheduledTask(
            self,
            "delete_scheduled_task",
            layers=common_layers,
            scheduler_table=self.scheduler_table,
            state_machine_arn=self.state_machine_arn,
        )

        read_scheduled_task_state = self.read_scheduled_task.state(
            self,
            "Load Scheduled Task",
            payload=TaskInput.from_object({"name.$": TASK_NAME_PATH}),
            result_path="$.task",
        )

        get_next_trigger_time = self.get_trigger("Get Next Trigger Time")

        invoke_step_function = (
            Wait(
                self,
                "Wait Until Schedule Trigger",
                time=WaitTime.timestamp_path(TRIGGER_AT_PATH),
            )
            .next(
                CustomState(
                    self,
                    "Invoke Step Function",
                    state_json=self._start_execution_task_json(
                        arn_to_invoke="$.task.state_machine.arn",
                        input="$.task.state_machine.input",
                        fallback=get_next_trigger_time,
                    ),
                )
            )
            .next(get_next_trigger_time)
            .next(
                CustomState(
                    self,
                    "Run Next Scheduled Task",
                    state_json=self._start_execution_task_json(
                        arn_to_invoke=self.state_machine_arn,
                        input={
                            "name.$": TASK_NAME_PATH,
                            "trigger_at.$": TRIGGER_AT_PATH,
                        },
                        allow_sync=False,
                    ),
                )
            )
        )

        choice_get_next_trigger = Choice(self, "Trigger Time Provided?")
        choice_get_next_trigger.when(
            Condition.is_not_present(TRIGGER_AT_PATH),
            self.get_trigger("Get Trigger Time"),
        )
        choice_get_next_trigger.afterwards().next(invoke_step_function)
        choice_get_next_trigger.otherwise(invoke_step_function)

        self._scheduler_definition = Chain.start(
            read_scheduled_task_state.next(choice_get_next_trigger)
        )

        self.state_machine = StateMachine(
            self,
            "SchedulerStateMachine",
            state_machine_name=self.state_machine_name,
            definition=self._scheduler_definition,
            tracing_enabled=True,
        )

        # permissions are applied at synthesis time based on the configuration of the scheduler
        Aspects.of(self).add(SchedulerPermissionsAspect(self))