def lambda_bulk_handler()

in export-neptune-to-elasticsearch/lambda/kinesis_to_elasticsearch.py [0:0]


def lambda_bulk_handler(event, context):
    """A Python AWS Lambda function to process Kinesis aggregated
    records in a bulk fashion."""
    
    logger.info('Starting bulk loading')
    
    raw_kinesis_records = event['Records']
    
    logger.info('Aggregated Kinesis record count: {}'.format(len(raw_kinesis_records)))
    
    # Deaggregate all records in one call
    user_records = deaggregate_records(raw_kinesis_records)
    
    total_records = len(user_records)
    
    logger.info('Deaggregated record count: {}'.format(total_records))
    
    log_stream = {
            "records": [],
            "lastEventId": {
                "commitNum": -1,
                "opNum": 0
            },
            "totalRecords": total_records
        }
        
    first_commit_num = None
    first_op_num = None
    prev_commit_num = None
    prev_op_num = None
    commit_nums = set()
        
    for user_record in user_records:
        records_json = base64.b64decode(user_record['kinesis']['data'])
        try:          
            records = json.loads(records_json)
        except Exception as e:
            logger.error('Error parsing JSON: \'{}\': {}'.format(records_json, str(e)))
            raise e
        for record in records:
            
            commit_num = record['eventId']['commitNum']
            op_num = record['eventId']['opNum']
            
            if log_commit_nums:
                commit_nums.add(commit_num)
            
            if not first_commit_num:
                first_commit_num = commit_num
                
            if not first_op_num:
                first_op_num = op_num
                
            #logger.info('Stream record: (commitNum: {}, opNum: {})'.format(commit_num, op_num)) 
            
            #if prev_commit_num and commit_num < prev_commit_num:
            #    logger.warn('Current commitNum [{}] is less than previous commitNum [{}]'.format(commit_num, prev_commit_num))
                
            if prev_commit_num and commit_num == prev_commit_num:
                if prev_op_num and op_num < prev_op_num:
                    logger.warn('Current opNum [{}] is less than previous opNum [{}] (commitNum [{}])'.format(op_num, prev_op_num, commit_num))
                    
            log_stream['records'].append(record)
            
            prev_commit_num = commit_num
            prev_op_num = op_num
            
    log_stream['lastEventId']['commitNum'] = prev_commit_num if prev_commit_num else -1
    log_stream['lastEventId']['opNum'] = prev_op_num if prev_op_num else 0
        
    logger.info('Log stream record count: {}'.format(len(log_stream['records']))) 
    logger.info('First record: (commitNum: {}, opNum: {})'.format(first_commit_num, first_op_num))
    logger.info('Last record: (commitNum: {}, opNum: {})'.format(prev_commit_num, prev_op_num))  
    
    if log_commit_nums:
        logger.info('Commit nums: {}'.format(commit_nums))  
        
    for result in handler.handle_records(log_stream):
        records_processed = result.records_processed
        logger.info('{} records processed'.format(records_processed))
        #metrics_publisher_client.publish_metrics(metrics_publisher_client.generate_record_processed_metrics(records_processed))
        
    logger.info('Finished bulk loading')