in templates/aws-cloudfront-waf/source/access_handler/access-handler.py [0:0]
def send_anonymous_usage_data(log, scope, ipset_name_v4, ipset_arn_v4, ipset_name_v6, ipset_arn_v6):
try:
if 'SEND_ANONYMOUS_USAGE_DATA' not in environ or os.getenv('SEND_ANONYMOUS_USAGE_DATA').lower() != 'yes':
return
log.info("[send_anonymous_usage_data] Start")
metric_prefix = os.getenv('METRIC_NAME_PREFIX')
cw = boto3.client('cloudwatch')
usage_data = {
"data_type": "bad_bot",
"bad_bot_ip_set_size": 0,
"allowed_requests": 0,
"blocked_requests_all": 0,
"blocked_requests_bad_bot": 0,
"waf_type": os.getenv('LOG_TYPE')
}
# --------------------------------------------------------------------------------------------------------------
log.info("[send_anonymous_usage_data] Get num allowed requests")
# --------------------------------------------------------------------------------------------------------------
try:
response = cw.get_metric_statistics(
MetricName='AllowedRequests',
Namespace='AWS/WAFV2',
Statistics=['Sum'],
Period=12 * 3600,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12 * 3600),
EndTime=datetime.datetime.utcnow(),
Dimensions=[
{
"Name": "Rule",
"Value": "ALL"
},
{
"Name": "WebACL",
"Value": os.getenv('STACK_NAME')
},
{
"Name": "Region",
"Value": os.getenv('AWS_REGION')
}
]
)
if len(response['Datapoints']) > 0:
usage_data['allowed_requests'] = response['Datapoints'][0]['Sum']
except Exception as error:
log.info("[send_anonymous_usage_data] Failed to get Num Allowed Requests")
log.error(str(error))
# --------------------------------------------------------------------------------------------------------------
log.info("[send_anonymous_usage_data] Get num blocked requests - all rules")
# --------------------------------------------------------------------------------------------------------------
try:
response = cw.get_metric_statistics(
MetricName='BlockedRequests',
Namespace='AWS/WAFV2',
Statistics=['Sum'],
Period=12 * 3600,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12 * 3600),
EndTime=datetime.datetime.utcnow(),
Dimensions=[
{
"Name": "Rule",
"Value": "ALL"
},
{
"Name": "WebACL",
"Value": os.getenv('STACK_NAME')
},
{
"Name": "Region",
"Value": os.getenv('AWS_REGION')
}
]
)
if len(response['Datapoints']) > 0:
usage_data['blocked_requests_all'] = response['Datapoints'][0]['Sum']
except Exception as error:
log.info("[send_anonymous_usage_data] Failed to get num blocked requests - all rules")
log.error(str(error))
# --------------------------------------------------------------------------------------------------------------
log.info("[send_anonymous_usage_data] Get bad bot data")
# --------------------------------------------------------------------------------------------------------------
if 'IP_SET_ID_BAD_BOTV4' in environ or 'IP_SET_ID_BAD_BOTV6' in environ:
try:
countv4 = 0
response = waflib.get_ip_set(log, scope, ipset_name_v4, ipset_arn_v4)
log.info(response)
if response is not None:
countv4 = len(response['IPSet']['Addresses'])
log.info("Bad Bot CountV4 %s", countv4)
countv6 = 0
response = waflib.get_ip_set(log, scope, ipset_name_v6, ipset_arn_v6)
log.info(response)
if response is not None:
countv6 = len(response['IPSet']['Addresses'])
log.info("Bad Bot CountV6 %s", countv6)
usage_data['bad_bot_ip_set_size'] = str(countv4 + countv6)
response = cw.get_metric_statistics(
MetricName='BlockedRequests',
Namespace='AWS/WAFV2',
Statistics=['Sum'],
Period=12 * 3600,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12 * 3600),
EndTime=datetime.datetime.utcnow(),
Dimensions=[
{
"Name": "Rule",
"Value": metric_prefix + 'BadBotRule'
},
{
"Name": "WebACL",
"Value": os.getenv('STACK_NAME')
},
{
"Name": "Region",
"Value": os.getenv('AWS_REGION')
}
]
)
if len(response['Datapoints']) > 0:
usage_data['blocked_requests_bad_bot'] = response['Datapoints'][0]['Sum']
except Exception as error:
log.info("[send_anonymous_usage_data] Failed to get bad bot data")
log.error(str(error))
# --------------------------------------------------------------------------------------------------------------
log.info("[send_anonymous_usage_data] Send Data")
# --------------------------------------------------------------------------------------------------------------
response = send_metrics(data=usage_data)
response_code = response.status_code
log.info('[send_anonymous_usage_data] Response Code: {}'.format(response_code))
log.info("[send_anonymous_usage_data] End")
except Exception as error:
log.info("[send_anonymous_usage_data] Failed to Send Data")
log.error(str(error))