in source/Orchestrator/send_notifications.py [0:0]
def lambda_handler(event, context):
# Expected input:
# Notification:
# Message: string
# State: string
# Details?: string
# updateSecHub: yes|NO
# Payload: contains the Step Function Input to the previous state and consists of:
# Finding?: json
# ControlId?: string
# SecurityStandard?: string
# EventType?: string
message_prefix = event['Notification'].get('ExecId','')
if message_prefix:
message_prefix += ': '
message_suffix = event['Notification'].get('AffectedObject', '')
if message_suffix:
message_suffix = f' ({message_suffix})'
# Get finding status
finding_status = 'FAILED' # default state
if event['Notification']['State'].upper == 'SUCCESS':
finding_status = 'RESOLVED'
elif event['Notification']['State'].upper == 'QUEUED':
finding_status = 'PENDING'
# elif event['Notification']['State'].upper == 'FAILED':
# finding_status = 'FAILED'
finding = None
finding_info = ''
if 'Finding' in event:
finding = sechub_findings.Finding(event['Finding'])
finding_info = {
'finding_id': finding.uuid,
'finding_description': finding.description,
'standard_name': finding.standard_name,
'standard_version': finding.standard_version,
'standard_control': finding.standard_control,
'title': finding.title,
'region': finding.region,
'account': finding.account_id,
'finding_arn': finding.arn
}
# Send anonymous metrics
if 'EventType' in event and 'Finding' in event:
metrics = Metrics(event['EventType'])
metrics_data = metrics.get_metrics_from_finding(event['Finding'])
metrics_data['status'] = finding_status
metrics.send_metrics(metrics_data)
if event['Notification']['State'].upper() in ('SUCCESS', 'QUEUED'):
notification = sechub_findings.SHARRNotification(
event.get('SecurityStandard', 'SHARR'),
AWS_REGION,
event.get('ControlId', None)
)
notification.severity = 'INFO'
notification.send_to_sns = True
elif event['Notification']['State'].upper() == 'FAILED':
notification = sechub_findings.SHARRNotification(
event.get('SecurityStandard', 'SHARR'),
AWS_REGION,
event.get('ControlId', None)
)
notification.severity = 'ERROR'
notification.send_to_sns = True
elif event['Notification']['State'].upper() == 'WRONGSTANDARD':
notification = sechub_findings.SHARRNotification('SHARR',AWS_REGION, None)
notification.severity = 'ERROR'
elif event['Notification']['State'].upper() == 'LAMBDAERROR':
notification = sechub_findings.SHARRNotification('SHARR',AWS_REGION, None)
notification.severity = 'ERROR'
else:
notification = sechub_findings.SHARRNotification(
event.get('SecurityStandard', 'SHARR'),
AWS_REGION,
event.get('ControlId', None)
)
notification.severity = 'ERROR'
if finding:
finding.flag(event['Notification']['Message'])
notification.message = message_prefix + event['Notification']['Message'] + message_suffix
if 'Details' in event['Notification'] and event['Notification']['Details'] != 'MISSING':
notification.logdata = format_details_for_output(event['Notification']['Details'])
notification.finding_info = finding_info
notification.notify()