def _transact_put()

in source/scheduler/common/aws_solutions/scheduler/common/base.py [0:0]


    def _transact_put(self, task: Task) -> bool:
        if not task.schedule or not isinstance(task.schedule.expression, str):
            raise ValueError(
                "to create a task, it must have a schedule (e.g. cron(* * * * ? *)"
            )
        if not isinstance(task.state_machine, dict):
            raise ValueError("task state_machine must be a dictionary")
        if (
            "arn" not in task.state_machine.keys()
            or "input" not in task.state_machine.keys()
        ):
            raise ValueError("task state_machine must have an arn and input")
        if not isinstance(task.state_machine["arn"], str):
            raise ValueError("task state_machine.arn must be a string")
        if not isinstance(task.state_machine["input"], dict):
            raise ValueError("task state_machine.input must be a dictionary")

        latest_task = self.read(task)
        version_curr = 0 if not latest_task else latest_task.latest
        version_next = version_curr + 1

        if version_curr != 0 and task == latest_task:
            logger.info(f"task {task.name} unchanged from version {version_curr}")
            return False

        if version_curr == 0:
            metrics.add_metric("JobsCreated", unit=MetricUnit.Count, value=1)

        self.ddb_cli.transact_write_items(
            TransactItems=[
                {
                    "Update": {
                        "TableName": self.table_name,
                        "Key": Task.key(task, 0),
                        # Conditional write makes the update idempotent here
                        # since the conditional check is on the same attribute
                        # that is being updated.
                        "ConditionExpression": "attribute_not_exists(#latest) OR #latest = :latest",
                        "UpdateExpression": "SET #latest = :version_next, #schedule = :schedule, #state_machine_input = :state_machine_input, #state_machine_arn = :state_machine_arn",
                        "ExpressionAttributeNames": {
                            "#latest": "latest",
                            "#schedule": "schedule",
                            "#state_machine_arn": "state_machine_arn",
                            "#state_machine_input": "state_machine_input",
                        },
                        "ExpressionAttributeValues": {
                            ":latest": version_curr,
                            ":version_next": version_next,
                            ":schedule": task.schedule.expression,
                            ":state_machine_input": task.state_machine.get("input"),
                            ":state_machine_arn": task.state_machine.get("arn"),
                        },
                    }
                },
                {
                    "Put": {
                        "TableName": self.table_name,
                        "Item": {
                            **Task.key(task, version_next),
                            "schedule": task.schedule.expression,
                            "state_machine_input": task.state_machine.get("input"),
                            "state_machine_arn": task.state_machine.get("arn"),
                        },
                    }
                },
            ]
        )
        logger.info(f"put scheduled task for {task.name} with version {version_next}")
        return True