def lambda_handler()

in edge/python/rt_log_transformer/rt_log_transformer/app.py [0:0]


def lambda_handler(firehose_records_input, context):
    log.info("Received records for processing from DeliveryStream: " +
             firehose_records_input['deliveryStreamArn'] + ", Region: " +
             firehose_records_input['region'] + ", and InvocationId: " +
             firehose_records_input['invocationId'])

    # Create return value.
    firehose_records_output = {'records': []}

    # Create result object.
    # Go through records and process them

    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload_bytes = base64.b64decode(firehose_record_input['data'])
        payload = "".join(map(chr, payload_bytes))

        payload_list = payload.strip().split('\t')
        cs_host = payload_list[7].strip()
        c_ip = payload_list[1].strip()
        version = validate_ip_version(c_ip)
        isp = isp_from_ip(c_ip, version)
        if version == "invalid":
            log.info("The ip address " + c_ip + " is invalid")
            continue
        country_name = country_code_from_geo_name(
            geo_name_from_ip(c_ip, version))
        content = payload.strip().split('\n')[0]
        payload = content + '\t' + isp + '\t' + country_name + '\n'
        log.info("New payload: " + payload)
        payload_encoded_ascii = payload.encode('ascii')
        payload_base64 = base64.b64encode(payload_encoded_ascii).decode(
            "utf-8")
        log.info(payload_base64)

        log.info("Record that was received")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(
            int(float(payload_list[0])))
        partition_keys = {
            "domain": cs_host,
            "year": event_timestamp.strftime('%Y'),
            "month": event_timestamp.strftime('%m'),
            "day": event_timestamp.strftime('%d'),
            "hour": event_timestamp.strftime('%H'),
            "minute": event_timestamp.strftime('%M')
        }

        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {
            'recordId': firehose_record_input['recordId'],
            'data': payload_base64,
            'result': 'Ok',
            'metadata': {
                'partitionKeys': partition_keys
            }
        }
        firehose_records_output['records'].append(firehose_record_output)

    log.info(json.dumps(firehose_records_output))

    # At the end return processed records
    return firehose_records_output