in src/ddbToEs/ddbToEsSync.ts [68:129]
async handleDDBStreamEvent(event: any) {
try {
const idToCommand: Record<string, ESBulkCommand> = {};
const aliasesToCreate: { alias: string; index: string }[] = [];
for (let i = 0; i < event.Records.length; i += 1) {
const record = event.Records[i];
logger.debug('EventName: ', record.eventName);
const removeResource = this.ddbToEsHelper.isRemoveResource(record);
const ddbJsonImage = removeResource ? record.dynamodb.OldImage : record.dynamodb.NewImage;
const image = AWS.DynamoDB.Converter.unmarshall(ddbJsonImage);
logger.debug(image);
// Don't index binary files
if (isBinaryResource(image)) {
// eslint-disable-next-line no-continue
continue;
}
const alias = this.getAliasFn(image);
if (!this.knownAliases.has(alias.alias)) {
aliasesToCreate.push(alias);
}
const cmd = removeResource
? this.ddbToEsHelper.createBulkESDelete(image, alias.alias)
: this.ddbToEsHelper.createBulkESUpsert(image, alias.alias);
if (cmd) {
// Note this will overwrite the item if present
// DDB streams guarantee in-order delivery of all mutations to each item
// Meaning the last record in the event stream is the "newest"
idToCommand[cmd.id] = cmd;
}
}
if (!this.disableIndexAndAliasCreation) {
await this.ddbToEsHelper.createIndexAndAliasIfNotExist(aliasesToCreate);
// update cache of all known aliases
aliasesToCreate.forEach((alias) => this.knownAliases.add(alias.alias));
}
await this.ddbToEsHelper.executeEsCmds(Object.values(idToCommand));
} catch (e) {
logger.error(
'Synchronization failed! The resources that could be effected are: ',
event.Records.map(
(record: {
eventName: string;
dynamodb: { OldImage: AWS.DynamoDB.AttributeMap; NewImage: AWS.DynamoDB.AttributeMap };
}) => {
const image = this.ddbToEsHelper.isRemoveResource(record)
? record.dynamodb.OldImage
: record.dynamodb.NewImage;
return `{id: ${image.id.S}, vid: ${image.vid.N}}`;
},
),
);
logger.error('Failed to update ES records', e);
throw e;
}
}