in enrichment_function/import_findings/app.py [0:0]
def lambda_handler(event, context):
status_code = 200
message ='function complete'
assume_role_name = os.environ["ORG_ROLE"]
#Deserialize event into strongly typed object
aws_event:AWSEvent = Marshaller.unmarshall(event, AWSEvent)
enrichment_text = ""
enrichment_author = "Security Hub - Enrichment Automation"
enrichment_finding_id = ""
enrichment_finding_arn = ""
#log the event
logger.debug(aws_event)
finding = aws_event.detail.findings[0]
#store this Finding's ID, ARN and Account ID
enrichment_finding_id = finding["Id"]
enrichment_finding_arn = finding["ProductArn"]
account_id = str(finding['AwsAccountId'])
logger.debug("Finding ID: %s " , enrichment_finding_id + " and product arn " + enrichment_finding_arn)
try:
#lookup and build the finding note and user defined fields based on account Id
enrichment_text, tags_dict = enrich_finding(account_id, assume_role_name)
logger.debug("Text to post: %s" , enrichment_text)
logger.debug("User defined Fields %s" , json.dumps(tags_dict))
#add the note to the finding and add a userDefinedField to use in the event bridge rule and prevent repeat lookups
response = secHubClient.batch_update_findings(
FindingIdentifiers=[
{
'Id': enrichment_finding_id,
'ProductArn': enrichment_finding_arn
}
],
Note={
'Text': enrichment_text,
'UpdatedBy': enrichment_author
},
UserDefinedFields=tags_dict
)
except ClientError as error:
logger.warn(error.response['Error']['Message'])
status_code = 500
message = error.response['Error']['Message']
except Exception as error:
status_code = 500
message = "Unexpected Error occured"
else:
if response["UnprocessedFindings"]:
status_code = 500
message = 'Failed to update finding'
logger.warning("Failed to update finding %s", response["UnprocessedFindings"])
else:
logger.info("successfully posted note to finding: %s" , enrichment_finding_id + "API response: " + str(response))
return {
'statusCode': status_code,
'body': json.dumps(message)
}