in lambda-functions/import_findings_security_hub.py [0:0]
def process_message(event):
""" Process Lambda Event """
if event['messageType'] == 'CodeScanReport':
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = os.environ['AWS_REGION']
created_at = event['createdAt']
source_repository = event['source_repository']
source_branch = event['source_branch']
source_commitid = event['source_commitid']
build_id = event['build_id']
report_type = event['reportType']
finding_type = FINDING_TYPE_TEMPLATE.format(report_type)
generator_id = f"{report_type.lower()}-{source_repository}-{source_branch}"
### upload to S3 bucket
s3 = boto3.client('s3')
s3bucket = "pipeline-artifact-bucket-" + account_id
key = f"reports/{event['reportType']}/{build_id}-{created_at}.json"
s3.put_object(Bucket=s3bucket, Body=json.dumps(event), Key=key, ServerSideEncryption='aws:kms')
report_url = f"https://s3.console.aws.amazon.com/s3/object/{s3bucket}/{key}?region={region}"
### OWASP SCA scanning report parsing
if event['reportType'] == 'OWASP-Dependency-Check':
severity = 50
FINDING_TITLE = "OWASP Dependecy Check Analysis"
dep_pkgs = len(event['report']['dependencies'])
for i in range(dep_pkgs):
if "packages" in event['report']['dependencies'][i]:
confidence = event['report']['dependencies'][i]['packages'][0]['confidence']
url = event['report']['dependencies'][i]['packages'][0]['url']
finding_id = f"{i}-{report_type.lower()}-{build_id}"
finding_description = f"Package: {event['report']['dependencies'][i]['packages'][0]['id']}, Confidence: {confidence}, URL: {url}"
created_at = datetime.now(timezone.utc).isoformat()
### find the vulnerability severity level
if confidence == "HIGHEST":
normalized_severity = 80
else:
normalized_severity = 50
securityhub.import_finding_to_sh(i, account_id, region, created_at, source_repository, source_branch, source_commitid, build_id, report_url, finding_id, generator_id, normalized_severity, severity, finding_type, FINDING_TITLE, finding_description, BEST_PRACTICES_OWASP)
### PHPStan SAST scanning report parsing
if event['reportType'] == 'PHPStan':
severity = 50
FINDING_TITLE = "PHPStan StaticCode Analysis"
report_count = event['report']['totals']['file_errors']
for i in range(report_count):
for filename in event['report']['files']:
finding_id = f"{i}-{report_type.lower()}-{build_id}"
finding_description = f"Message: {event['report']['files'][filename]['messages'][0]['message']}, file: {filename}, line: {event['report']['files'][filename]['messages'][0]['line']}"
created_at = datetime.now(timezone.utc).isoformat()
normalized_severity = 60
### find the vulnerability severity level
is_ignorable = f"{event['report']['files'][filename]['messages'][0]['ignorable']}"
if is_ignorable == "true":
normalized_severity = 30
else:
normalized_severity = 60
### Calling Securityhub function to post the findings
securityhub.import_finding_to_sh(i, account_id, region, created_at, source_repository, source_branch, source_commitid, build_id, report_url, finding_id, generator_id, normalized_severity, severity, finding_type, FINDING_TITLE, finding_description, BEST_PRACTICES_OWASP)
### SonarQube SAST scanning report parsing
elif event['reportType'] == 'SONAR-QUBE':
severity = 50
FINDING_TITLE = "SonarQube StaticCode Analysis"
report_count = event['report']['total']
for i in range(report_count):
finding_id = f"{i}-{report_type.lower()}-{source_repository}-{source_branch}-{build_id}"
finding_description = f"{event['report']['issues'][i]['type']}-{event['report']['issues'][i]['message']}-{i}, component: {event['report']['issues'][i]['component']}"
created_at = datetime.now(timezone.utc).isoformat()
report_severity = event['report']['issues'][i]['severity']
### find the vulnerability severity level
if report_severity == 'MAJOR':
normalized_severity = 70
elif report_severity == 'BLOCKER':
normalized_severity = 90
elif report_severity == 'CRITICAL':
normalized_severity = 90
else:
normalized_severity= 20
### Calling Securityhub function to post the findings
securityhub.import_finding_to_sh(i, account_id, region, created_at, source_repository, source_branch, source_commitid, build_id, report_url, finding_id, generator_id, normalized_severity, severity, finding_type, FINDING_TITLE, finding_description, BEST_PRACTICES_OWASP)
### OWASP Zap SAST scanning report parsing
elif event['reportType'] == 'OWASP-Zap':
severity = 50
FINDING_TITLE = "OWASP ZAP DynamicCode Analysis"
alert_ct = event['report']['site'][0]['alerts']
alert_count = len(alert_ct)
for alertno in range(alert_count):
risk_desc = event['report']['site'][0]['alerts'][alertno]['riskdesc']
riskletters = risk_desc[0:3]
### find the vulnerability severity level
if riskletters == 'Hig':
normalized_severity = 70
elif riskletters == 'Med':
normalized_severity = 60
elif riskletters == 'Low' or riskletters == 'Inf':
normalized_severity = 30
else:
normalized_severity = 90
instances = len(event['report']['site'][0]['alerts'][alertno]['instances'])
finding_description = f"{alertno}-Vulerability:{event['report']['site'][0]['alerts'][alertno]['alert']}-Total occurances of this issue:{instances}"
finding_id = f"{alertno}-{report_type.lower()}-{build_id}"
created_at = datetime.now(timezone.utc).isoformat()
### Calling Securityhub function to post the findings
securityhub.import_finding_to_sh(alertno, account_id, region, created_at, source_repository, source_branch, source_commitid, build_id, report_url, finding_id, generator_id, normalized_severity, severity, finding_type, FINDING_TITLE, finding_description, BEST_PRACTICES_OWASP)
else:
print("Invalid report type was provided")
else:
logger.error("Report type not supported:")