def get_outstanding_requesters()

in source/log_parser/log-parser.py [0:0]


def get_outstanding_requesters(log, bucket_name, key_name, log_type):
    log.debug('[get_outstanding_requesters] Start')

    counter = {
        'general': {},
        'uriList': {}
    }
    outstanding_requesters = {
        'general': {},
        'uriList': {}
    }

    try:
        # --------------------------------------------------------------------------------------------------------------
        log.info("[get_outstanding_requesters] \tDownload file from S3")
        # --------------------------------------------------------------------------------------------------------------
        local_file_path = '/tmp/' + key_name.split('/')[-1]
        s3 = create_client('s3')
        s3.download_file(bucket_name, key_name, local_file_path)

        # --------------------------------------------------------------------------------------------------------------
        log.info("[get_outstanding_requesters] \tRead file content")
        # --------------------------------------------------------------------------------------------------------------
        error_count = 0
        with gzip.open(local_file_path, 'r') as content:
            for line in content:
                try:
                    request_key = ""
                    uri = ""
                    return_code_index = None

                    if log_type == 'waf':
                        line = line.decode()  # Remove the b in front of each field
                        line_data = json.loads(str(line))

                        request_key = datetime.datetime.fromtimestamp(int(line_data['timestamp']) / 1000.0).isoformat(
                            sep='T', timespec='minutes')
                        request_key += ' ' + line_data['httpRequest']['clientIp']
                        uri = urlparse(line_data['httpRequest']['uri']).path

                    elif log_type == 'alb':
                        line = line.decode('utf8')
                        if line.startswith('#'):
                            continue

                        line_data = line.split(LINE_FORMAT_ALB['delimiter'])
                        request_key = line_data[LINE_FORMAT_ALB['timestamp']].rsplit(':', 1)[0]
                        request_key += ' ' + line_data[LINE_FORMAT_ALB['source_ip']].rsplit(':', 1)[0]
                        return_code_index = LINE_FORMAT_ALB['code']
                        uri = urlparse(line_data[LINE_FORMAT_ALB['uri']]).path

                    elif log_type == 'cloudfront':
                        line = line.decode('utf8')
                        if line.startswith('#'):
                            continue

                        line_data = line.split(LINE_FORMAT_CLOUD_FRONT['delimiter'])
                        request_key = line_data[LINE_FORMAT_CLOUD_FRONT['date']]
                        request_key += ' ' + line_data[LINE_FORMAT_CLOUD_FRONT['time']][:-3]
                        request_key += ' ' + line_data[LINE_FORMAT_CLOUD_FRONT['source_ip']]
                        return_code_index = LINE_FORMAT_CLOUD_FRONT['code']
                        uri = urlparse(line_data[LINE_FORMAT_CLOUD_FRONT['uri']]).path

                    else:
                        return outstanding_requesters

                    if 'ignoredSufixes' in config['general'] and uri.endswith(
                            tuple(config['general']['ignoredSufixes'])):
                        log.debug(
                            "[get_outstanding_requesters] \t\tSkipping line %s. Included in ignoredSufixes." % line)
                        continue

                    if return_code_index == None or line_data[return_code_index] in config['general']['errorCodes']:
                        if request_key in counter['general'].keys():
                            counter['general'][request_key] += 1
                        else:
                            counter['general'][request_key] = 1

                        if 'uriList' in config and uri in config['uriList'].keys():
                            if uri not in counter['uriList'].keys():
                                counter['uriList'][uri] = {}

                            if request_key in counter['uriList'][uri].keys():
                                counter['uriList'][uri][request_key] += 1
                            else:
                                counter['uriList'][uri][request_key] = 1

                except Exception as e:
                    error_count += 1
                    log.error("[get_outstanding_requesters] \t\tError to process line: %s" % line)
                    log.error(str(e))
                    if error_count == 5:  #Allow 5 errors before stopping the function execution
                        raise
        remove(local_file_path)

        # --------------------------------------------------------------------------------------------------------------
        log.info("[get_outstanding_requesters] \tKeep only outstanding requesters")
        # --------------------------------------------------------------------------------------------------------------
        threshold = 'requestThreshold' if log_type == 'waf' else "errorThreshold"
        utc_now_timestamp_str = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z%z")
        for k, num_reqs in counter['general'].items():
            try:
                k = k.split(' ')[-1]
                if num_reqs >= config['general'][threshold]:
                    if k not in outstanding_requesters['general'].keys() or num_reqs > \
                            outstanding_requesters['general'][k]['max_counter_per_min']:
                        outstanding_requesters['general'][k] = {
                            'max_counter_per_min': num_reqs,
                            'updated_at': utc_now_timestamp_str
                        }
            except Exception as e:
                log.error(
                    "[get_outstanding_requesters] \t\tError to process outstanding requester: %s" % k)

        for uri in counter['uriList'].keys():
            for k, num_reqs in counter['uriList'][uri].items():
                try:
                    k = k.split(' ')[-1]
                    if num_reqs >= config['uriList'][uri][threshold]:
                        if uri not in outstanding_requesters['uriList'].keys():
                            outstanding_requesters['uriList'][uri] = {}

                        if k not in outstanding_requesters['uriList'][uri].keys() or num_reqs > \
                                outstanding_requesters['uriList'][uri][k]['max_counter_per_min']:
                            outstanding_requesters['uriList'][uri][k] = {
                                'max_counter_per_min': num_reqs,
                                'updated_at': utc_now_timestamp_str
                            }
                except Exception as e:
                    log.error(
                        "[get_outstanding_requesters] \t\tError to process outstanding requester: (%s) %s" % (uri, k))

    except Exception as e:
        log.error("[get_outstanding_requesters] \tError to read input file")
        log.error(e)

    log.debug('[get_outstanding_requesters] End')
    return outstanding_requesters