in log-processor/lambda_function.py [0:0]
def lambda_handler(event, context):
records = []
record_counter = 0
for record in event['Records']:
# Extracting the record data in bytes and base64 decoding it
payload_in_bytes = base64.b64decode(record['kinesis']['data'])
# Converting the bytes payload to string
payload = "".join(map(chr, payload_in_bytes))
# dictionary where all the field and record value pairing will end up
payload_dict = {}
# counter to iterate over the record fields
counter = 0
# generate list from the tab-delimited log entry
payload_list = payload.strip().split('\t')
# Use field mappings configuration to perform data type conversion as needed
for field, data_type in FIELD_DATA_MAPPINGS.items():
if(payload_list[counter].strip() == '-'):
data_type = "str"
if(data_type == "int"):
payload_dict[field] = int(payload_list[counter].strip())
elif(data_type == "float"):
payload_dict[field] = float(payload_list[counter].strip())
else:
payload_dict[field] = payload_list[counter].strip()
counter = counter + 1
# Parse the headers and return as lists. This is useful if you want to log the header information as well
if('cs-headers' in payload_dict.keys()):
del payload_dict['cs-headers'] # remove this line and uncomment below to include cs-headers as a list in the record
#payload_dict['cs-headers'] = parse_headers(payload_dict['cs-headers'], 'cs-headers')
if('cs-header-names' in payload_dict.keys()):
del payload_dict['cs-header-names'] # remove this line and uncomment below to include cs-header-names as a list
#payload_dict['cs-header-names'] = parse_headers(payload_dict['cs-header-names'], 'cs-header-names')
dimensions_list = []
for field, value in payload_dict.items():
field_name = field.replace('-','_') # replace dashes in field names with underscore to adhere to Timsestream requirements
dimensions_list.append(
{ 'Name': field_name, 'Value': str(value) }
)
record = {
'Dimensions': dimensions_list,
'MeasureName': 'sc_bytes',
'MeasureValue': str(payload_dict['sc-bytes']),
'MeasureValueType': 'BIGINT',
'Time': str(int(payload_dict['timestamp'])),
'TimeUnit': 'SECONDS'
}
records.append(record)
record_counter = record_counter + 1
if(len(records) == 100):
write_batch_timestream(records, record_counter)
records = []
if(len(records) != 0):
write_batch_timestream(records, record_counter)
print('Successfully processed {} records.'.format(len(event['Records'])))