in templates/aws-cloudfront-waf/source/access_handler/access-handler.py [0:0]
def lambda_handler(event, context):
log = logging.getLogger()
log.info('[lambda_handler] Start')
log_level = str(os.getenv('LOG_LEVEL').upper())
if log_level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
log_level = 'ERROR'
log.setLevel(log_level)
# ----------------------------------------------------------
# Read inputs parameters
# ----------------------------------------------------------
try:
scope = os.getenv('SCOPE')
ipset_name_v4 = os.getenv('IP_SET_NAME_BAD_BOTV4')
ipset_name_v6 = os.getenv('IP_SET_NAME_BAD_BOTV6')
ipset_arn_v4 = os.getenv('IP_SET_ID_BAD_BOTV4')
ipset_arn_v6 = os.getenv('IP_SET_ID_BAD_BOTV6')
# Fixed as old line had security exposure based on user supplied IP address
log.info("Event->%s<-", str(event))
source_ip = str(event['requestContext']['identity']['sourceIp'])
log.info("scope = %s", scope)
log.info("ipset_name_v4 = %s", ipset_name_v4)
log.info("ipset_name_v6 = %s", ipset_name_v6)
log.info("IPARNV4 = %s", ipset_arn_v4)
log.info("IPARNV6 = %s", ipset_arn_v6)
log.info("source_ip = %s", source_ip)
except Exception as e:
log.error(e)
raise
new_address = []
output = None
try:
ip_type = "IPV%s" % ip_address(source_ip).version
if ip_type == "IPV4":
new_address.append(IPv4Network(source_ip).with_prefixlen)
ipset = waflib.get_ip_set(log, scope, ipset_name_v4, ipset_arn_v4)
# merge old addresses with this one
log.info(ipset)
current_list = ipset["IPSet"]["Addresses"]
log.info(current_list)
new_list = list(set(current_list) | set(new_address))
log.info(new_list)
output = waflib.update_ip_set(log, scope, ipset_name_v4, ipset_arn_v4, new_list)
elif ip_type == "IPV6":
new_address.append(IPv6Network(source_ip).with_prefixlen)
ipset = waflib.get_ip_set(log, scope, ipset_name_v6, ipset_arn_v6)
# merge old addresses with this one
log.info(ipset)
current_list = ipset["IPSet"]["Addresses"]
log.info(current_list)
new_list = list(set(current_list) | set(new_address))
log.info(new_list)
output = waflib.update_ip_set(log, scope, ipset_name_v6, ipset_arn_v6, new_list)
except Exception as e:
log.error(e)
raise
finally:
log.info("Output->%s<-", output)
message = "message: [%s] Thanks for the visit." % source_ip
response = {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': message
}
if output is not None:
send_anonymous_usage_data(log, scope, ipset_name_v4, ipset_arn_v4, ipset_name_v6, ipset_arn_v6)
log.info('[lambda_handler] End')
return response