in templates/aws-cloudfront-waf/source/reputation_lists_parser/reputation-lists.py [0:0]
def send_anonymous_usage_data(log, scope):
try:
if 'SEND_ANONYMOUS_USAGE_DATA' not in os.environ or os.getenv('SEND_ANONYMOUS_USAGE_DATA').lower() != 'yes':
return
log.debug("[send_anonymous_usage_data] Start")
cw = boto3.client('cloudwatch')
usage_data = {
"data_type": "reputation_lists",
"ipv4_reputation_lists_size": 0,
"ipv4_reputation_lists": 0,
"ipv6_reputation_lists_size": 0,
"ipv6_reputation_lists": 0,
"allowed_requests": 0,
"blocked_requests": 0,
"blocked_requests_ip_reputation_lists": 0,
"waf_type": os.getenv('LOG_TYPE')
}
# --------------------------------------------------------------------------------------------------------------
log.debug("[send_anonymous_usage_data] Get size of the Reputation List IP set")
# --------------------------------------------------------------------------------------------------------------
try:
response = waflib.get_ip_set(log, scope, os.getenv('IP_SET_NAME_REPUTATIONV4'),
os.getenv('IP_SET_ID_REPUTATIONV4'))
if response is not None:
usage_data['ipv4_reputation_lists_size'] = len(response['IPSet']['Addresses'])
usage_data['ipv4_reputation_lists'] = response['IPSet']['Addresses']
except Exception as error:
log.debug("[send_anonymous_usage_data] Failed to get size of the Reputation List IPV4 set")
log.debug(str(error))
try:
response = waflib.get_ip_set(log, scope, os.getenv('IP_SET_NAME_REPUTATIONV6'),
os.getenv('IP_SET_ID_REPUTATIONV6'))
if response is not None:
usage_data['ipv6_reputation_lists_size'] = len(response['IPSet']['Addresses'])
usage_data['ipv6_reputation_lists'] = response['IPSet']['Addresses']
except Exception as error:
log.debug("[send_anonymous_usage_data] Failed to get size of the Reputation List IPV6 set")
log.debug(str(error))
# --------------------------------------------------------------------------------------------------------------
log.debug("[send_anonymous_usage_data] Get total number of allowed requests")
# --------------------------------------------------------------------------------------------------------------
try:
response = cw.get_metric_statistics(
MetricName='AllowedRequests',
Namespace='AWS/WAFV2',
Statistics=['Sum'],
Period=3600,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=3600),
EndTime=datetime.datetime.utcnow(),
Dimensions=[
{
"Name": "Rule",
"Value": "ALL"
},
{
"Name": "WebACL",
"Value": os.getenv('STACK_NAME')
},
{
"Name": "Region",
"Value": os.getenv('AWS_REGION')
}
]
)
if len(response['Datapoints']):
usage_data['allowed_requests'] = response['Datapoints'][0]['Sum']
except Exception as error:
log.debug("[send_anonymous_usage_data] Failed to get Num Allowed Requests")
log.debug(str(error))
# --------------------------------------------------------------------------------------------------------------
log.debug("[send_anonymous_usage_data] Get total number of blocked requests")
# --------------------------------------------------------------------------------------------------------------
try:
response = cw.get_metric_statistics(
MetricName='BlockedRequests',
Namespace='AWS/WAFV2',
Statistics=['Sum'],
Period=3600,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=3600),
EndTime=datetime.datetime.utcnow(),
Dimensions=[
{
"Name": "Rule",
"Value": "ALL"
},
{
"Name": "WebACL",
"Value": os.getenv('STACK_NAME')
},
{
"Name": "Region",
"Value": os.getenv('AWS_REGION')
}
]
)
if len(response['Datapoints']):
usage_data['blocked_requests'] = response['Datapoints'][0]['Sum']
except Exception as error:
log.debug("[send_anonymous_usage_data] Failed to get Num Allowed Requests")
log.debug(str(error))
# --------------------------------------------------------------------------------------------------------------
log.debug("[send_anonymous_usage_data] Get total number of blocked requests for Reputation Lists Rule")
# --------------------------------------------------------------------------------------------------------------
try:
response = cw.get_metric_statistics(
MetricName='BlockedRequests',
Namespace='AWS/WAFV2',
Statistics=['Sum'],
Period=3600,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=3600),
EndTime=datetime.datetime.utcnow(),
Dimensions=[
{
"Name": "Rule",
"Value": os.getenv('IPREPUTATIONLIST_METRICNAME')
},
{
"Name": "WebACL",
"Value": os.getenv('STACK_NAME')
},
{
"Name": "Region",
"Value": os.getenv('AWS_REGION')
}
]
)
if len(response['Datapoints']):
usage_data['blocked_requests_ip_reputation_lists'] = response['Datapoints'][0]['Sum']
except Exception as error:
log.debug("[send_anonymous_usage_data] Failed to get Num Allowed Requests")
log.debug(str(error))
# --------------------------------------------------------------------------------------------------------------
log.info("[send_anonymous_usage_data] Send Data")
# --------------------------------------------------------------------------------------------------------------
response = send_metrics(data=usage_data)
response_code = response.status_code
log.debug('[send_anonymous_usage_data] Response Code: {}'.format(response_code))
log.debug("[send_anonymous_usage_data] End")
except Exception as error:
log.debug("[send_anonymous_usage_data] Failed to send data")