in cookbooks/aws-parallelcluster-computefleet/files/clusterstatusmgtd/clusterstatusmgtd.py [0:0]
def update_status(self, current_status, next_status): # noqa: D102
try:
updated_attributes = self._table.update_item(
Key={"Id": self.DB_KEY},
UpdateExpression="set #dt.#st=:s, #dt.#lut=:t",
ExpressionAttributeNames={
"#dt": self.DB_DATA,
"#st": self.COMPUTE_FLEET_STATUS_ATTRIBUTE,
"#lut": self.COMPUTE_FLEET_LAST_UPDATED_TIME_ATTRIBUTE,
},
ExpressionAttributeValues={
":s": str(next_status),
":t": str(datetime.now(tz=timezone.utc)),
},
ConditionExpression=Attr(f"{self.DB_DATA}.{self.COMPUTE_FLEET_STATUS_ATTRIBUTE}").eq(
str(current_status)
),
ReturnValues="ALL_NEW",
)
return updated_attributes.get("Attributes").get(f"{self.DB_DATA}")
except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException as e:
raise ComputeFleetStatusManager.ConditionalStatusUpdateFailedError(e)