in images/controlplane/status/app/cluster_status_check.py [0:0]
def handler(event, context):
try:
eks = boto3.client('eks')
response = eks.list_updates(name=config_dict[ConfigKey.cluster_name_key])
for update_id in response['updateIds']:
update_desc = eks.describe_update(
name=config_dict[ConfigKey.cluster_name_key], updateId=update_id)
update_type = update_desc['update']['type']
if update_type == 'VersionUpdate':
update_status = update_desc['update']['status']
update_version = get_update_version(update_desc)
if update_version == config_dict[ConfigKey.cluster_version_key]:
if update_status == 'Successful':
send_update('SUCCESS')
else:
logger.info(f'Control Plane upgrade is: {update_status} update id is: {update_id}')
else:
logger.info(f'Control Plane EKS version found :{update_version}')
try:
if event['RequestType'] is not None:
if event['RequestType'] == 'Create' or event['RequestType'] == 'Update' or event['RequestType'] == 'Delete':
status = cfnresponse.SUCCESS
cfnresponse.send(event, context, status,None)
except Exception as ex:
pass
except Exception as ex:
logger.exception(ex)
send_update("FAILURE")
status = cfnresponse.FAILED
cfnresponse.send(event, context, status,None)
logger.info('ending run...')