in source/scheduler/common/aws_solutions/scheduler/common/base.py [0:0]
def _transact_put(self, task: Task) -> bool:
if not task.schedule or not isinstance(task.schedule.expression, str):
raise ValueError(
"to create a task, it must have a schedule (e.g. cron(* * * * ? *)"
)
if not isinstance(task.state_machine, dict):
raise ValueError("task state_machine must be a dictionary")
if (
"arn" not in task.state_machine.keys()
or "input" not in task.state_machine.keys()
):
raise ValueError("task state_machine must have an arn and input")
if not isinstance(task.state_machine["arn"], str):
raise ValueError("task state_machine.arn must be a string")
if not isinstance(task.state_machine["input"], dict):
raise ValueError("task state_machine.input must be a dictionary")
latest_task = self.read(task)
version_curr = 0 if not latest_task else latest_task.latest
version_next = version_curr + 1
if version_curr != 0 and task == latest_task:
logger.info(f"task {task.name} unchanged from version {version_curr}")
return False
if version_curr == 0:
metrics.add_metric("JobsCreated", unit=MetricUnit.Count, value=1)
self.ddb_cli.transact_write_items(
TransactItems=[
{
"Update": {
"TableName": self.table_name,
"Key": Task.key(task, 0),
# Conditional write makes the update idempotent here
# since the conditional check is on the same attribute
# that is being updated.
"ConditionExpression": "attribute_not_exists(#latest) OR #latest = :latest",
"UpdateExpression": "SET #latest = :version_next, #schedule = :schedule, #state_machine_input = :state_machine_input, #state_machine_arn = :state_machine_arn",
"ExpressionAttributeNames": {
"#latest": "latest",
"#schedule": "schedule",
"#state_machine_arn": "state_machine_arn",
"#state_machine_input": "state_machine_input",
},
"ExpressionAttributeValues": {
":latest": version_curr,
":version_next": version_next,
":schedule": task.schedule.expression,
":state_machine_input": task.state_machine.get("input"),
":state_machine_arn": task.state_machine.get("arn"),
},
}
},
{
"Put": {
"TableName": self.table_name,
"Item": {
**Task.key(task, version_next),
"schedule": task.schedule.expression,
"state_machine_input": task.state_machine.get("input"),
"state_machine_arn": task.state_machine.get("arn"),
},
}
},
]
)
logger.info(f"put scheduled task for {task.name} with version {version_next}")
return True