in packages/amplify-graphql-searchable-transformer/streaming-lambda/python_streaming_function.py [0:0]
def _lambda_handler(event, context):
logger.debug('Event: %s', event)
records = event['Records']
ddb_deserializer = StreamTypeDeserializer()
opensearch_actions = [] # Items to be added/updated/removed from OpenSearch - for bulk API
cnt_insert = cnt_modify = cnt_remove = 0
for record in records:
# Handle both native DynamoDB Streams or Streams data from Kinesis (for manual replay)
logger.debug('Record: %s', record)
if record.get('eventSource') == 'aws:dynamodb':
ddb = record['dynamodb']
ddb_table_name = get_table_name_from_arn(record['eventSourceARN'])
doc_seq = ddb['SequenceNumber']
elif record.get('eventSource') == 'aws:kinesis':
ddb = json.loads(base64.b64decode(record['kinesis']['data']))
ddb_table_name = ddb['SourceTable']
doc_seq = record['kinesis']['sequenceNumber']
else:
logger.error('Ignoring non-DynamoDB event sources: %s',
record.get('eventSource'))
continue
# Compute DynamoDB table, type and index for item
doc_table = ddb_table_name.lower()
doc_type = DOC_TYPE
doc_table_parts = doc_table.split('-')
doc_opensearch_index_name = doc_table_parts[0] if len(doc_table_parts) > 0 else doc_table
# Dispatch according to event TYPE
event_name = record['eventName'].upper() # INSERT, MODIFY, REMOVE
logger.debug('doc_table=%s, event_name=%s, seq=%s',
doc_table, event_name, doc_seq)
# Treat events from a Kinesis stream as INSERTs
if event_name == 'AWS:KINESIS:RECORD':
event_name = 'INSERT'
is_ddb_insert_or_update = (event_name == 'INSERT') or (event_name == 'MODIFY')
is_ddb_delete = event_name == 'REMOVE'
image_name = 'NewImage' if is_ddb_insert_or_update else 'OldImage'
if image_name not in ddb:
logger.warning(
'Cannot process stream if it does not contain ' + image_name)
continue
logger.debug(image_name + ': %s', ddb[image_name])
# Deserialize DynamoDB type to Python types
doc_fields = ddb_deserializer.deserialize({'M': ddb[image_name]})
# Sync enabled APIs do soft delete. We need to delete the record in OpenSearch if _deleted field is set
if OPENSEARCH_USE_EXTERNAL_VERSIONING and event_name == 'MODIFY' and '_deleted' in doc_fields and doc_fields['_deleted']:
is_ddb_insert_or_update = False
is_ddb_delete = True
# Update counters
if event_name == 'INSERT':
cnt_insert += 1
elif event_name == 'MODIFY':
cnt_modify += 1
elif event_name == 'REMOVE':
cnt_remove += 1
else:
logger.warning('Unsupported event_name: %s', event_name)
logger.debug('Deserialized doc_fields: %s', doc_fields)
if ('Keys' in ddb):
doc_id = compute_doc_index(ddb['Keys'], ddb_deserializer)
else:
logger.error('Cannot find keys in ddb record')
# If DynamoDB INSERT or MODIFY, send 'index' to OpenSearch
if is_ddb_insert_or_update:
# Generate OpenSearch payload for item
action = {'index': {'_index': doc_opensearch_index_name,
'_type': doc_type,
'_id': doc_id}}
# Add external versioning if necessary
if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
action['index'].update([
('version_type', 'external'),
('version', doc_fields['_version'])
])
doc_fields.pop('_ttl', None)
doc_fields.pop('_version', None)
# Append OpenSearch Action line with 'index' directive
opensearch_actions.append(json.dumps(action))
# Append JSON payload
opensearch_actions.append(json.dumps(doc_fields, cls=DDBTypesEncoder))
# migration step remove old key if it exists
if ('id' in doc_fields) and (event_name == 'MODIFY') :
action = {'delete': {'_index': doc_opensearch_index_name, '_type': doc_type,
'_id': compute_doc_index(ddb['Keys'], ddb_deserializer, True)}}
opensearch_actions.append(json.dumps(action))
# If DynamoDB REMOVE, send 'delete' to OpenSearch
elif is_ddb_delete:
action = {'delete': {'_index': doc_opensearch_index_name,
'_type': doc_type, '_id': doc_id}}
if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
action['delete'].update([
('version_type', 'external'),
('version', doc_fields['_version'])
])
# Action line with 'delete' directive
opensearch_actions.append(json.dumps(action))
# Prepare bulk payload
opensearch_actions.append('') # Add one empty line to force final \n
opensearch_payload = '\n'.join(opensearch_actions)
logger.info('Posting to OpenSearch: inserts=%s updates=%s deletes=%s, total_lines=%s, bytes_total=%s',
cnt_insert, cnt_modify, cnt_remove, len(opensearch_actions) - 1, len(opensearch_payload))
post_to_opensearch(opensearch_payload) # Post to OpenSearch with exponential backoff