async executeEsCmds()

in src/ddbToEs/ddbToEsHelper.ts [211:251]


    async executeEsCmds(cmds: ESBulkCommand[]) {
        const bulkCmds: any[] = cmds.flatMap((cmd: ESBulkCommand) => {
            return cmd.bulkCommand;
        });

        if (bulkCmds.length === 0) {
            return;
        }
        const listOfIds = cmds.map((cmd) => {
            return cmd.id;
        });
        logger.info(`Starting bulk sync operation on ids: `, listOfIds);
        try {
            const { body: bulkResponse } = await this.ElasticSearch.bulk({
                refresh: 'wait_for',
                body: bulkCmds,
            });

            if (bulkResponse.errors) {
                const erroredDocuments: any[] = [];
                // The presence of the `error` key indicates that the operation
                // that we did for the document has failed.
                bulkResponse.items.forEach((action: any) => {
                    const operation = Object.keys(action)[0];
                    if (action[operation].error) {
                        erroredDocuments.push({
                            status: action[operation].status,
                            error: action[operation].error,
                            index: action[operation]._index,
                            id: action[operation]._id,
                            esOperation: operation,
                        });
                    }
                });
                throw new Error(JSON.stringify(erroredDocuments));
            }
        } catch (error) {
            logger.error(`Bulk sync operation failed on ids: `, listOfIds);
            throw error;
        }
    }