in automation/lambda/handlers/index.py [0:0]
def handler(event, context):
logger.info(f'helper received event: {json.dumps(event)}')
response_data = dict({
"Data": "DONE",
"Status": "SUCCESS",
"CustomResourcePhysicalID": None
})
response_data["CustomResourcePhysicalID"] = \
context.aws_request_id if context and context.aws_request_id else uuid.uuid4()
resource_properties = event.get('ResourceProperties')
resource_type = event.get("ResourceType")
if resource_type == "Custom::SendSecrets":
region_name = resource_properties.get("RegionName")
secret_name = resource_properties.get("ESSecretName")
topic = resource_properties.get("Topic")
kibana_url = resource_properties.get("KibanaURL")
if event["RequestType"] == "Create":
secrets = get_secret(secret_name, None, region_name)
if secrets:
secrets = json.loads(secrets)
message = f"AWS LogAggregator Kibana Dashboard URL: {kibana_url} and secrets are as below \n" \
f" Username: {secrets['username']} \n Password:{secrets['password']}\n "
sns_client.publish(
TopicArn=topic,
Message=message,
Subject='AWS LogAggregator Kibana Dashboard'
)
if event["RequestType"] == "Delete":
pass
elif resource_type == "Custom::GenerateConfig":
region = resource_properties.get("RegionName")
dynamodb_table = resource_properties.get("ConfigTable")
cluster_id = resource_properties.get("EMRClusterId")
if event["RequestType"] == "Create":
config = prepare_log_config(event)
dynamodb_client = boto3.resource("dynamodb", region)
table = dynamodb_client.Table(dynamodb_table)
table.put_item(Item=config)
if event["RequestType"] == "Delete":
response_data["CustomResourcePhysicalID"] = event["PhysicalResourceId"]
try:
emr_client = session.client('emr')
emr_client.set_termination_protection(
JobFlowIds=[cluster_id],
TerminationProtected=False
)
except Exception:
pass
else:
new_step_id = None
step_id = None
topic = None
# Get event name to confirm whether this is an add step workflow and cancel step workflow
for record in event['Records']:
event_name = record['eventName']
event_source = record['eventSource']
if event_source == 'aws:dynamodb':
# Add step workflow via DynamoDB insert or modify trigger
if event_name in ['INSERT', 'MODIFY']:
# Extract log source name and type
source_name = record['dynamodb']['Keys']['source_name']['S']
source_type = record['dynamodb']['NewImage']['source_type']['S']
# Get dynamodb name
dynamodb_table = record['eventSourceARN'].split('/')[1]
# Confirm if config_json has been provided
try:
config_json = record['dynamodb']['NewImage']['config_json']['M']
cluster_id = record['dynamodb']['NewImage']['cluster_id']['S']
region = record['dynamodb']['NewImage']['region']['S']
topic = record['dynamodb']['NewImage']['topic']['S']
except Exception as e:
logger.error(e)
error_message = f'Failed to validate Config JSON for {source_name}. ' \
f'config_json has to be provided in the DynamoDB item as a map. ' \
f'String will not be accepted.'
# Notify admin if failed to prepare log config json
sns_client.publish(
TopicArn=topic,
Message=error_message,
Subject='Error: Config JSON',
)
return None
emr_client = session.client('emr')
# Prepare inputs for state machine / step functions
# Validate if EMR cluster (by cluster ID) is running
try:
emr_client.describe_cluster(ClusterId=cluster_id)
except Exception as e:
logger.error(str(e))
error_message = f'Not able to find any running EMR cluster {cluster_id}'
sns_client.publish(
TopicArn=topic,
Message=error_message,
Subject='Error: EMR Cluster not found.',
)
return None
# INFO: Duplicate step check
logger.info('Confirm if an EMR step with name \'' + source_name + '\' has been added before')
step_id = get_step_id_by_name(source_name, cluster_id, emr_client)
if step_id:
error_message = f'Step {source_name} already been added as step ID {step_id}'
logger.info(error_message)
# Prepare notification message and send over to Admin
if topic:
sns_client.publish(
TopicArn=topic,
Message=error_message,
Subject='Config JSON error',
)
return None
else:
logger.info(f'EMR step with name {source_name} has not been added')
# Set termination protection for EMR cluster
logger.info('Turn on termination protection for EMR cluster')
emr_client.set_termination_protection(
JobFlowIds=[cluster_id],
TerminationProtected=True
)
# Get spark submit arguments if available
try:
spark_submit_args = ('spark-submit ' + record['dynamodb']['NewImage']['spark_submit_args']['S'])
except Exception as e:
logger.info(
'Spark submit arguement was not provided by user. Default spark submit arguement will be applied')
# Add job/step to EMR cluster
new_step_id = submit_emr_step(dynamodb_table, source_name, source_type, spark_submit_args,
cluster_id, emr_client, region)
logger.info('New step ID ' + new_step_id)
message = f'New Job by {new_step_id} is being added to Cluster {cluster_id}'
# TODO: iftik, send step ID over to step functions for monitoring
# Notify admin of newly added EMR log job.
sns_client.publish(
TopicArn=topic,
Message=message,
Subject='Info: New Job added',
)
# Terminate this Lambda execution after sending the message to Step Function
return None
# Cancel step workflow. To be defined.
elif event_name == 'REMOVE':
logger.info('REMOVED ' + record['dynamodb']['Keys']['source_name']['S'])
response_data["CustomResourcePhysicalID"] = new_step_id or step_id
return None
send(event, context, response_data["Status"], response_data,
physical_resource_id=response_data["CustomResourcePhysicalID"])