def update_status()

in cookbooks/aws-parallelcluster-computefleet/files/clusterstatusmgtd/clusterstatusmgtd.py [0:0]


    def update_status(self, current_status, next_status):  # noqa: D102
        try:
            updated_attributes = self._table.update_item(
                Key={"Id": self.DB_KEY},
                UpdateExpression="set #dt.#st=:s, #dt.#lut=:t",
                ExpressionAttributeNames={
                    "#dt": self.DB_DATA,
                    "#st": self.COMPUTE_FLEET_STATUS_ATTRIBUTE,
                    "#lut": self.COMPUTE_FLEET_LAST_UPDATED_TIME_ATTRIBUTE,
                },
                ExpressionAttributeValues={
                    ":s": str(next_status),
                    ":t": str(datetime.now(tz=timezone.utc)),
                },
                ConditionExpression=Attr(f"{self.DB_DATA}.{self.COMPUTE_FLEET_STATUS_ATTRIBUTE}").eq(
                    str(current_status)
                ),
                ReturnValues="ALL_NEW",
            )

            return updated_attributes.get("Attributes").get(f"{self.DB_DATA}")
        except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException as e:
            raise ComputeFleetStatusManager.ConditionalStatusUpdateFailedError(e)