in export-neptune-to-elasticsearch/lambda/kinesis_to_elasticsearch.py [0:0]
def lambda_bulk_handler(event, context):
"""A Python AWS Lambda function to process Kinesis aggregated
records in a bulk fashion."""
logger.info('Starting bulk loading')
raw_kinesis_records = event['Records']
logger.info('Aggregated Kinesis record count: {}'.format(len(raw_kinesis_records)))
# Deaggregate all records in one call
user_records = deaggregate_records(raw_kinesis_records)
total_records = len(user_records)
logger.info('Deaggregated record count: {}'.format(total_records))
log_stream = {
"records": [],
"lastEventId": {
"commitNum": -1,
"opNum": 0
},
"totalRecords": total_records
}
first_commit_num = None
first_op_num = None
prev_commit_num = None
prev_op_num = None
commit_nums = set()
for user_record in user_records:
records_json = base64.b64decode(user_record['kinesis']['data'])
try:
records = json.loads(records_json)
except Exception as e:
logger.error('Error parsing JSON: \'{}\': {}'.format(records_json, str(e)))
raise e
for record in records:
commit_num = record['eventId']['commitNum']
op_num = record['eventId']['opNum']
if log_commit_nums:
commit_nums.add(commit_num)
if not first_commit_num:
first_commit_num = commit_num
if not first_op_num:
first_op_num = op_num
#logger.info('Stream record: (commitNum: {}, opNum: {})'.format(commit_num, op_num))
#if prev_commit_num and commit_num < prev_commit_num:
# logger.warn('Current commitNum [{}] is less than previous commitNum [{}]'.format(commit_num, prev_commit_num))
if prev_commit_num and commit_num == prev_commit_num:
if prev_op_num and op_num < prev_op_num:
logger.warn('Current opNum [{}] is less than previous opNum [{}] (commitNum [{}])'.format(op_num, prev_op_num, commit_num))
log_stream['records'].append(record)
prev_commit_num = commit_num
prev_op_num = op_num
log_stream['lastEventId']['commitNum'] = prev_commit_num if prev_commit_num else -1
log_stream['lastEventId']['opNum'] = prev_op_num if prev_op_num else 0
logger.info('Log stream record count: {}'.format(len(log_stream['records'])))
logger.info('First record: (commitNum: {}, opNum: {})'.format(first_commit_num, first_op_num))
logger.info('Last record: (commitNum: {}, opNum: {})'.format(prev_commit_num, prev_op_num))
if log_commit_nums:
logger.info('Commit nums: {}'.format(commit_nums))
for result in handler.handle_records(log_stream):
records_processed = result.records_processed
logger.info('{} records processed'.format(records_processed))
#metrics_publisher_client.publish_metrics(metrics_publisher_client.generate_record_processed_metrics(records_processed))
logger.info('Finished bulk loading')