async handleDDBStreamEvent()

in src/ddbToEs/ddbToEsSync.ts [68:129]


    async handleDDBStreamEvent(event: any) {
        try {
            const idToCommand: Record<string, ESBulkCommand> = {};
            const aliasesToCreate: { alias: string; index: string }[] = [];

            for (let i = 0; i < event.Records.length; i += 1) {
                const record = event.Records[i];
                logger.debug('EventName: ', record.eventName);

                const removeResource = this.ddbToEsHelper.isRemoveResource(record);
                const ddbJsonImage = removeResource ? record.dynamodb.OldImage : record.dynamodb.NewImage;
                const image = AWS.DynamoDB.Converter.unmarshall(ddbJsonImage);
                logger.debug(image);
                // Don't index binary files
                if (isBinaryResource(image)) {
                    // eslint-disable-next-line no-continue
                    continue;
                }

                const alias = this.getAliasFn(image);

                if (!this.knownAliases.has(alias.alias)) {
                    aliasesToCreate.push(alias);
                }

                const cmd = removeResource
                    ? this.ddbToEsHelper.createBulkESDelete(image, alias.alias)
                    : this.ddbToEsHelper.createBulkESUpsert(image, alias.alias);

                if (cmd) {
                    // Note this will overwrite the item if present
                    // DDB streams guarantee in-order delivery of all mutations to each item
                    // Meaning the last record in the event stream is the "newest"
                    idToCommand[cmd.id] = cmd;
                }
            }
            if (!this.disableIndexAndAliasCreation) {
                await this.ddbToEsHelper.createIndexAndAliasIfNotExist(aliasesToCreate);
                // update cache of all known aliases
                aliasesToCreate.forEach((alias) => this.knownAliases.add(alias.alias));
            }
            await this.ddbToEsHelper.executeEsCmds(Object.values(idToCommand));
        } catch (e) {
            logger.error(
                'Synchronization failed! The resources that could be effected are: ',
                event.Records.map(
                    (record: {
                        eventName: string;
                        dynamodb: { OldImage: AWS.DynamoDB.AttributeMap; NewImage: AWS.DynamoDB.AttributeMap };
                    }) => {
                        const image = this.ddbToEsHelper.isRemoveResource(record)
                            ? record.dynamodb.OldImage
                            : record.dynamodb.NewImage;
                        return `{id: ${image.id.S}, vid: ${image.vid.N}}`;
                    },
                ),
            );

            logger.error('Failed to update ES records', e);
            throw e;
        }
    }