for await()

in src/helpers.ts [681:718]


      for await (const chunk of datasource) {
        if (shouldAbort) break
        timeoutRef.refresh()
        const result = onDocument(chunk)
        const [action, payload] = Array.isArray(result) ? result : [result, chunk]
        const operation = Object.keys(action)[0]
        if (operation === 'index' || operation === 'create') {
          actionBody = serializer.serialize(action)
          payloadBody = typeof payload === 'string'
            ? payload
            : serializer.serialize(payload)
          chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
          bulkBody.push(actionBody, payloadBody)
        } else if (operation === 'update') {
          actionBody = serializer.serialize(action)
          payloadBody = typeof chunk === 'string'
            ? `{"doc":${chunk}}`
            : serializer.serialize({ doc: chunk, ...payload })
          chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
          bulkBody.push(actionBody, payloadBody)
        } else if (operation === 'delete') {
          actionBody = serializer.serialize(action)
          chunkBytes += Buffer.byteLength(actionBody)
          bulkBody.push(actionBody)
        } else {
          clearTimeout(timeoutRef)
          throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
        }

        if (chunkBytes >= flushBytes) {
          stats.bytes += chunkBytes
          const bulkBodyCopy = bulkBody.slice()
          bulkBody.length = 0
          chunkBytes = 0
          const send = await semaphore()
          send(bulkBodyCopy)
        }
      }