in src/ddbToEs/ddbToEsHelper.ts [211:251]
async executeEsCmds(cmds: ESBulkCommand[]) {
const bulkCmds: any[] = cmds.flatMap((cmd: ESBulkCommand) => {
return cmd.bulkCommand;
});
if (bulkCmds.length === 0) {
return;
}
const listOfIds = cmds.map((cmd) => {
return cmd.id;
});
logger.info(`Starting bulk sync operation on ids: `, listOfIds);
try {
const { body: bulkResponse } = await this.ElasticSearch.bulk({
refresh: 'wait_for',
body: bulkCmds,
});
if (bulkResponse.errors) {
const erroredDocuments: any[] = [];
// The presence of the `error` key indicates that the operation
// that we did for the document has failed.
bulkResponse.items.forEach((action: any) => {
const operation = Object.keys(action)[0];
if (action[operation].error) {
erroredDocuments.push({
status: action[operation].status,
error: action[operation].error,
index: action[operation]._index,
id: action[operation]._id,
esOperation: operation,
});
}
});
throw new Error(JSON.stringify(erroredDocuments));
}
} catch (error) {
logger.error(`Bulk sync operation failed on ids: `, listOfIds);
throw error;
}
}