cookbooks/aws-parallelcluster-computefleet/files/compute_fleet_status/compute_fleet_status.py (129 lines of code) (raw):
# pylint: disable=W0719
import argparse
import datetime
import json
import sys
import boto3
import dateutil
from boto3.dynamodb.conditions import Attr
DB_KEY = "COMPUTE_FLEET"
DB_DATA = "Data"
COMPUTE_FLEET_STATUS_ATTRIBUTE = "status"
COMPUTE_FLEET_LAST_UPDATED_TIME_ATTRIBUTE = "lastStatusUpdatedTime"
def to_utc_datetime(time_in, default_timezone=datetime.timezone.utc) -> datetime.datetime:
"""
Convert a given string, datetime or int into utc datetime.
:param time_in: Time in a format that may be parsed, integers are assumed to
be timestamps in UTC timezone.
:param default_timezone: Timezone to assum in the event that the time is
unspecified in the input parameter. This applies only for datetime and str inputs
:return time as a datetime in UTC timezone
"""
if isinstance(time_in, int):
if time_in > 1e12:
time_in /= 1000
time_ = datetime.datetime.utcfromtimestamp(time_in)
time_ = time_.replace(tzinfo=datetime.timezone.utc)
elif isinstance(time_in, str):
time_ = dateutil.parser.parse(time_in)
elif isinstance(time_in, datetime.date):
time_ = time_in
else:
raise TypeError("to_utc_datetime object must be 'str', 'int' or 'datetime'.")
if time_.tzinfo is None:
time_ = time_.replace(tzinfo=default_timezone)
return time_.astimezone(datetime.timezone.utc)
def to_iso_timestr(time_in: datetime.datetime) -> str:
"""
Convert a given datetime ISO 8601 format with milliseconds.
:param time_in: datetime to be converted
:return time in ISO 8601 UTC format with ms (e.g. 2021-07-15T01:22:02.655Z)
"""
if time_in.tzinfo is None:
time_ = time_in.replace(tzinfo=datetime.timezone.utc)
else:
time_ = time_in.astimezone(datetime.timezone.utc)
return to_utc_datetime(time_).isoformat(timespec="milliseconds")[:-6] + "Z"
def update_item(table, status, current_status):
table.update_item(
Key={"Id": DB_KEY},
UpdateExpression="set #dt.#st=:s, #dt.#lut=:t",
ExpressionAttributeNames={
"#dt": DB_DATA,
"#st": COMPUTE_FLEET_STATUS_ATTRIBUTE,
"#lut": COMPUTE_FLEET_LAST_UPDATED_TIME_ATTRIBUTE,
},
ExpressionAttributeValues={
":s": str(status),
":t": str(datetime.datetime.now(tz=datetime.timezone.utc)),
},
ConditionExpression=Attr(f"{DB_DATA}.{COMPUTE_FLEET_STATUS_ATTRIBUTE}").eq(str(current_status)),
)
def update_status_with_last_updated_time(table_name, region, status):
"""Get compute fleet status and the last compute fleet status updated time."""
try:
table = boto3.resource("dynamodb", region_name=region).Table(table_name)
current_status = get_dynamo_db_data(table).get(COMPUTE_FLEET_STATUS_ATTRIBUTE)
if current_status == status:
return
elif current_status == "RUNNING":
update_item(table, status, current_status)
else:
raise Exception(f"Could not update compute fleet status from '{current_status}' to {status}.")
except Exception as e:
raise Exception(f"Failed when updating fleet status with error: {e}")
def get_dynamo_db_data(table):
try:
compute_fleet_item = table.get_item(ConsistentRead=True, Key={"Id": DB_KEY})
if not compute_fleet_item or "Item" not in compute_fleet_item:
raise Exception("COMPUTE_FLEET data not found in db table")
db_data = compute_fleet_item["Item"].get(DB_DATA)
return db_data
except Exception as e:
raise Exception(f"Failed when retrieving data from DynamoDB with error {e}.")
def get_status_with_last_updated_time(table_name, region):
"""Get compute fleet status and the last compute fleet status updated time."""
try:
table = boto3.resource("dynamodb", region_name=region).Table(table_name)
dynamo_db_data = get_dynamo_db_data(table)
print(
json.dumps(
{
COMPUTE_FLEET_STATUS_ATTRIBUTE: dynamo_db_data.get(COMPUTE_FLEET_STATUS_ATTRIBUTE),
COMPUTE_FLEET_LAST_UPDATED_TIME_ATTRIBUTE: to_iso_timestr(
dateutil.parser.parse(dynamo_db_data.get(COMPUTE_FLEET_LAST_UPDATED_TIME_ATTRIBUTE))
),
},
sort_keys=True,
indent=4,
)
)
except Exception as e:
raise Exception(f"Failed when retrieving fleet status from DynamoDB with error {e}.")
def main():
try:
parser = argparse.ArgumentParser(description="Get or update compute fleet status of scheduler plugin.")
parser.add_argument(
"--table-name",
type=str,
required=True,
help="DynamoDB table name",
)
parser.add_argument(
"--region",
type=str,
required=True,
help="Region of cluster",
)
parser.add_argument(
"--status",
type=str,
required=False,
help="Specify the compute fleet status to set, can be PROTECTED",
choices={"PROTECTED"},
)
parser.add_argument(
"--action",
type=str,
required=True,
help="Get or update compute-fleet-status",
choices={"update", "get"},
)
args = parser.parse_args()
if args.action == "update" and not args.status:
parser.error("ERROR: --status is required when 'action' is specified to 'update'.")
elif args.action == "get" and args.status:
parser.error("ERROR: --status can not be specified when 'action' is 'get'.")
if args.action == "update":
update_status_with_last_updated_time(args.table_name, args.region, args.status)
else:
get_status_with_last_updated_time(args.table_name, args.region)
except Exception as e:
print(f"ERROR: Failed to {args.action} compute fleet status, exception: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()