in source/log_parser/log-parser.py [0:0]
def merge_outstanding_requesters(log, bucket_name, key_name, log_type, output_key_name, outstanding_requesters):
log.debug('[merge_outstanding_requesters] Start')
force_update = False
need_update = False
s3 = create_client('s3')
# --------------------------------------------------------------------------------------------------------------
log.info("[merge_outstanding_requesters] \tCalculate Last Update Age")
# --------------------------------------------------------------------------------------------------------------
response = None
try:
response = s3.head_object(Bucket=bucket_name, Key=output_key_name)
except Exception:
log.info('[merge_outstanding_requesters] No file to be merged.')
need_update = True
return outstanding_requesters, need_update
utc_last_modified = response['LastModified'].astimezone(datetime.timezone.utc)
utc_now_timestamp = datetime.datetime.now(datetime.timezone.utc)
utc_now_timestamp_str = utc_now_timestamp.strftime("%Y-%m-%d %H:%M:%S %Z%z")
last_update_age = int(((utc_now_timestamp - utc_last_modified).total_seconds()) / 60)
# --------------------------------------------------------------------------------------------------------------
log.info("[merge_outstanding_requesters] \tDownload current blocked IPs")
# --------------------------------------------------------------------------------------------------------------
local_file_path = '/tmp/' + key_name.split('/')[-1] + '_REMOTE.json'
s3.download_file(bucket_name, output_key_name, local_file_path)
# ----------------------------------------------------------------------------------------------------------
log.info("[merge_outstanding_requesters] \tProcess outstanding requesters files")
# ----------------------------------------------------------------------------------------------------------
remote_outstanding_requesters = {
'general': {},
'uriList': {}
}
with open(local_file_path, 'r') as file_content:
remote_outstanding_requesters = json.loads(file_content.read())
remove(local_file_path)
threshold = 'requestThreshold' if log_type == 'waf' else "errorThreshold"
try:
if 'general' in remote_outstanding_requesters:
for k, v in remote_outstanding_requesters['general'].items():
try:
if k in outstanding_requesters['general'].keys():
log.info(
"[merge_outstanding_requesters] \t\tUpdating general data of BLOCK %s rule" % k)
outstanding_requesters['general'][k]['updated_at'] = utc_now_timestamp_str
if v['max_counter_per_min'] > outstanding_requesters['general'][k]['max_counter_per_min']:
outstanding_requesters['general'][k]['max_counter_per_min'] = v['max_counter_per_min']
else:
utc_prev_updated_at = datetime.datetime.strptime(v['updated_at'],
"%Y-%m-%d %H:%M:%S %Z%z").astimezone(
datetime.timezone.utc)
total_diff_min = ((utc_now_timestamp - utc_prev_updated_at).total_seconds()) / 60
if v['max_counter_per_min'] < config['general'][threshold]:
force_update = True
log.info(
"[merge_outstanding_requesters] \t\t%s is bellow the current general threshold" % k)
elif total_diff_min < config['general']['blockPeriod']:
log.debug("[merge_outstanding_requesters] \t\tKeeping %s in general" % k)
outstanding_requesters['general'][k] = v
else:
force_update = True
log.info("[merge_outstanding_requesters] \t\t%s expired in general" % k)
except Exception:
log.error("[merge_outstanding_requesters] \tError merging general %s rule" % k)
except Exception:
log.error('[merge_outstanding_requesters] Failed to process general group.')
try:
if 'uriList' in remote_outstanding_requesters:
if 'uriList' not in config or len(config['uriList']) == 0:
force_update = True
log.info(
"[merge_outstanding_requesters] \t\tCurrent config file does not contain uriList anymore")
else:
for uri in remote_outstanding_requesters['uriList'].keys():
if 'ignoredSufixes' in config['general'] and uri.endswith(
tuple(config['general']['ignoredSufixes'])):
force_update = True
log.info(
"[merge_outstanding_requesters] \t\t%s is in current ignored sufixes list." % uri)
continue
for k, v in remote_outstanding_requesters['uriList'][uri].items():
try:
if uri in outstanding_requesters['uriList'].keys() and k in \
outstanding_requesters['uriList'][uri].keys():
log.info(
"[merge_outstanding_requesters] \t\tUpdating uriList (%s) data of BLOCK %s rule" % (
uri, k))
outstanding_requesters['uriList'][uri][k]['updated_at'] = utc_now_timestamp_str
if v['max_counter_per_min'] > outstanding_requesters['uriList'][uri][k][
'max_counter_per_min']:
outstanding_requesters['uriList'][uri][k]['max_counter_per_min'] = v[
'max_counter_per_min']
else:
utc_prev_updated_at = datetime.datetime.strptime(v['updated_at'],
"%Y-%m-%d %H:%M:%S %Z%z").astimezone(
datetime.timezone.utc)
total_diff_min = ((utc_now_timestamp - utc_prev_updated_at).total_seconds()) / 60
if v['max_counter_per_min'] < config['uriList'][uri][threshold]:
force_update = True
log.info(
"[merge_outstanding_requesters] \t\t%s is bellow the current uriList (%s) threshold" % (
k, uri))
elif total_diff_min < config['general']['blockPeriod']:
log.debug(
"[merge_outstanding_requesters] \t\tKeeping %s in uriList (%s)" % (k, uri))
if uri not in outstanding_requesters['uriList'].keys():
outstanding_requesters['uriList'][uri] = {}
outstanding_requesters['uriList'][uri][k] = v
else:
force_update = True
log.info(
"[merge_outstanding_requesters] \t\t%s expired in uriList (%s)" % (k, uri))
except Exception:
log.error(
"[merge_outstanding_requesters] \tError merging uriList (%s) %s rule" % (uri, k))
except Exception:
log.error('[merge_outstanding_requesters] Failed to process uriList group.')
need_update = (force_update or
last_update_age > int(os.getenv('MAX_AGE_TO_UPDATE')) or
len(outstanding_requesters['general']) > 0 or
len(outstanding_requesters['uriList']) > 0)
log.debug('[merge_outstanding_requesters] End')
return outstanding_requesters, need_update