in source/log_parser/log-parser.py [0:0]
def get_outstanding_requesters(log, bucket_name, key_name, log_type):
log.debug('[get_outstanding_requesters] Start')
counter = {
'general': {},
'uriList': {}
}
outstanding_requesters = {
'general': {},
'uriList': {}
}
try:
# --------------------------------------------------------------------------------------------------------------
log.info("[get_outstanding_requesters] \tDownload file from S3")
# --------------------------------------------------------------------------------------------------------------
local_file_path = '/tmp/' + key_name.split('/')[-1]
s3 = create_client('s3')
s3.download_file(bucket_name, key_name, local_file_path)
# --------------------------------------------------------------------------------------------------------------
log.info("[get_outstanding_requesters] \tRead file content")
# --------------------------------------------------------------------------------------------------------------
error_count = 0
with gzip.open(local_file_path, 'r') as content:
for line in content:
try:
request_key = ""
uri = ""
return_code_index = None
if log_type == 'waf':
line = line.decode() # Remove the b in front of each field
line_data = json.loads(str(line))
request_key = datetime.datetime.fromtimestamp(int(line_data['timestamp']) / 1000.0).isoformat(
sep='T', timespec='minutes')
request_key += ' ' + line_data['httpRequest']['clientIp']
uri = urlparse(line_data['httpRequest']['uri']).path
elif log_type == 'alb':
line = line.decode('utf8')
if line.startswith('#'):
continue
line_data = line.split(LINE_FORMAT_ALB['delimiter'])
request_key = line_data[LINE_FORMAT_ALB['timestamp']].rsplit(':', 1)[0]
request_key += ' ' + line_data[LINE_FORMAT_ALB['source_ip']].rsplit(':', 1)[0]
return_code_index = LINE_FORMAT_ALB['code']
uri = urlparse(line_data[LINE_FORMAT_ALB['uri']]).path
elif log_type == 'cloudfront':
line = line.decode('utf8')
if line.startswith('#'):
continue
line_data = line.split(LINE_FORMAT_CLOUD_FRONT['delimiter'])
request_key = line_data[LINE_FORMAT_CLOUD_FRONT['date']]
request_key += ' ' + line_data[LINE_FORMAT_CLOUD_FRONT['time']][:-3]
request_key += ' ' + line_data[LINE_FORMAT_CLOUD_FRONT['source_ip']]
return_code_index = LINE_FORMAT_CLOUD_FRONT['code']
uri = urlparse(line_data[LINE_FORMAT_CLOUD_FRONT['uri']]).path
else:
return outstanding_requesters
if 'ignoredSufixes' in config['general'] and uri.endswith(
tuple(config['general']['ignoredSufixes'])):
log.debug(
"[get_outstanding_requesters] \t\tSkipping line %s. Included in ignoredSufixes." % line)
continue
if return_code_index == None or line_data[return_code_index] in config['general']['errorCodes']:
if request_key in counter['general'].keys():
counter['general'][request_key] += 1
else:
counter['general'][request_key] = 1
if 'uriList' in config and uri in config['uriList'].keys():
if uri not in counter['uriList'].keys():
counter['uriList'][uri] = {}
if request_key in counter['uriList'][uri].keys():
counter['uriList'][uri][request_key] += 1
else:
counter['uriList'][uri][request_key] = 1
except Exception as e:
error_count += 1
log.error("[get_outstanding_requesters] \t\tError to process line: %s" % line)
log.error(str(e))
if error_count == 5: #Allow 5 errors before stopping the function execution
raise
remove(local_file_path)
# --------------------------------------------------------------------------------------------------------------
log.info("[get_outstanding_requesters] \tKeep only outstanding requesters")
# --------------------------------------------------------------------------------------------------------------
threshold = 'requestThreshold' if log_type == 'waf' else "errorThreshold"
utc_now_timestamp_str = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z%z")
for k, num_reqs in counter['general'].items():
try:
k = k.split(' ')[-1]
if num_reqs >= config['general'][threshold]:
if k not in outstanding_requesters['general'].keys() or num_reqs > \
outstanding_requesters['general'][k]['max_counter_per_min']:
outstanding_requesters['general'][k] = {
'max_counter_per_min': num_reqs,
'updated_at': utc_now_timestamp_str
}
except Exception as e:
log.error(
"[get_outstanding_requesters] \t\tError to process outstanding requester: %s" % k)
for uri in counter['uriList'].keys():
for k, num_reqs in counter['uriList'][uri].items():
try:
k = k.split(' ')[-1]
if num_reqs >= config['uriList'][uri][threshold]:
if uri not in outstanding_requesters['uriList'].keys():
outstanding_requesters['uriList'][uri] = {}
if k not in outstanding_requesters['uriList'][uri].keys() or num_reqs > \
outstanding_requesters['uriList'][uri][k]['max_counter_per_min']:
outstanding_requesters['uriList'][uri][k] = {
'max_counter_per_min': num_reqs,
'updated_at': utc_now_timestamp_str
}
except Exception as e:
log.error(
"[get_outstanding_requesters] \t\tError to process outstanding requester: (%s) %s" % (uri, k))
except Exception as e:
log.error("[get_outstanding_requesters] \tError to read input file")
log.error(e)
log.debug('[get_outstanding_requesters] End')
return outstanding_requesters