in lib/Helpers.js [513:548]
for await (const chunk of datasource) {
if (shouldAbort === true) break
timeoutRef.refresh()
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0]
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
actionBody = serializer.serialize(action[0])
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
: serializer.serialize({ doc: chunk, ...action[1] })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
actionBody = serializer.serialize(action)
chunkBytes += Buffer.byteLength(actionBody)
bulkBody.push(actionBody)
} else {
clearTimeout(timeoutRef)
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
}
if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes
const send = await semaphore()
send(bulkBody.slice())
bulkBody.length = 0
chunkBytes = 0
}
}