def _lambda_handler()

in packages/amplify-graphql-searchable-transformer/streaming-lambda/python_streaming_function.py [0:0]


def _lambda_handler(event, context):
    logger.debug('Event: %s', event)
    records = event['Records']

    ddb_deserializer = StreamTypeDeserializer()
    opensearch_actions = []  # Items to be added/updated/removed from OpenSearch - for bulk API
    cnt_insert = cnt_modify = cnt_remove = 0

    for record in records:
        # Handle both native DynamoDB Streams or Streams data from Kinesis (for manual replay)
        logger.debug('Record: %s', record)
        if record.get('eventSource') == 'aws:dynamodb':
            ddb = record['dynamodb']
            ddb_table_name = get_table_name_from_arn(record['eventSourceARN'])
            doc_seq = ddb['SequenceNumber']
        elif record.get('eventSource') == 'aws:kinesis':
            ddb = json.loads(base64.b64decode(record['kinesis']['data']))
            ddb_table_name = ddb['SourceTable']
            doc_seq = record['kinesis']['sequenceNumber']
        else:
            logger.error('Ignoring non-DynamoDB event sources: %s',
                        record.get('eventSource'))
            continue

        # Compute DynamoDB table, type and index for item
        doc_table = ddb_table_name.lower()
        doc_type = DOC_TYPE
        doc_table_parts = doc_table.split('-')
        doc_opensearch_index_name = doc_table_parts[0] if len(doc_table_parts) > 0  else doc_table

        # Dispatch according to event TYPE
        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        logger.debug('doc_table=%s, event_name=%s, seq=%s',
                    doc_table, event_name, doc_seq)

        # Treat events from a Kinesis stream as INSERTs
        if event_name == 'AWS:KINESIS:RECORD':
            event_name = 'INSERT'

        is_ddb_insert_or_update = (event_name == 'INSERT') or (event_name == 'MODIFY')
        is_ddb_delete = event_name == 'REMOVE'
        image_name = 'NewImage' if is_ddb_insert_or_update else 'OldImage'

        if image_name not in ddb:
            logger.warning(
                'Cannot process stream if it does not contain ' + image_name)
            continue
        logger.debug(image_name + ': %s', ddb[image_name])
        # Deserialize DynamoDB type to Python types
        doc_fields = ddb_deserializer.deserialize({'M': ddb[image_name]})
        
        # Sync enabled APIs do soft delete. We need to delete the record in OpenSearch if _deleted field is set
        if OPENSEARCH_USE_EXTERNAL_VERSIONING and event_name == 'MODIFY' and '_deleted' in  doc_fields and doc_fields['_deleted']:
            is_ddb_insert_or_update = False
            is_ddb_delete = True
            
         # Update counters
        if event_name == 'INSERT':
            cnt_insert += 1
        elif event_name == 'MODIFY':
            cnt_modify += 1
        elif event_name == 'REMOVE':
            cnt_remove += 1
        else:
            logger.warning('Unsupported event_name: %s', event_name)

        logger.debug('Deserialized doc_fields: %s', doc_fields)

        if ('Keys' in ddb):
            doc_id = compute_doc_index(ddb['Keys'], ddb_deserializer)
        else:
            logger.error('Cannot find keys in ddb record')

        # If DynamoDB INSERT or MODIFY, send 'index' to OpenSearch
        if is_ddb_insert_or_update:
            # Generate OpenSearch payload for item
            action = {'index': {'_index': doc_opensearch_index_name,
                                '_type': doc_type,
                                '_id': doc_id}}
            # Add external versioning if necessary
            if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['index'].update([
                    ('version_type', 'external'),
                    ('version', doc_fields['_version'])
                ])
                doc_fields.pop('_ttl', None)
                doc_fields.pop('_version', None)
            # Append OpenSearch Action line with 'index' directive
            opensearch_actions.append(json.dumps(action))
            # Append JSON payload
            opensearch_actions.append(json.dumps(doc_fields, cls=DDBTypesEncoder))
            # migration step remove old key if it exists
            if ('id' in doc_fields) and (event_name == 'MODIFY') :
                action = {'delete': {'_index': doc_opensearch_index_name, '_type': doc_type,
                    '_id': compute_doc_index(ddb['Keys'], ddb_deserializer, True)}}
                opensearch_actions.append(json.dumps(action))
        # If DynamoDB REMOVE, send 'delete' to OpenSearch
        elif is_ddb_delete:
            action = {'delete': {'_index': doc_opensearch_index_name,
                                '_type': doc_type, '_id': doc_id}}
            if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['delete'].update([
                    ('version_type', 'external'),
                    ('version', doc_fields['_version'])
                ])
            # Action line with 'delete' directive
            opensearch_actions.append(json.dumps(action))

    # Prepare bulk payload
    opensearch_actions.append('')  # Add one empty line to force final \n
    opensearch_payload = '\n'.join(opensearch_actions)
    logger.info('Posting to OpenSearch: inserts=%s updates=%s deletes=%s, total_lines=%s, bytes_total=%s',
                cnt_insert, cnt_modify, cnt_remove, len(opensearch_actions) - 1, len(opensearch_payload))
    post_to_opensearch(opensearch_payload)  # Post to OpenSearch with exponential backoff