private def streamRowsIntoKustoByWorkers()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [344:444]


  private def streamRowsIntoKustoByWorkers(
      batchIdForTracing: String,
      rows: Iterator[InternalRow],
      ingestionProperties: IngestionProperties,
      parameters: KustoWriteResource): Unit = {
    val streamingClient = KustoClientCache
      .getClient(
        parameters.coordinates.clusterUrl,
        parameters.authentication,
        parameters.coordinates.ingestionUrl,
        parameters.coordinates.clusterAlias)
      .streamingClient

    val timeZone = TimeZone.getTimeZone(parameters.writeOptions.timeZone).toZoneId
    // TODO - use a pool of two streams?
    //    var curBbId = 0
    //    val byteArrayPool = Array[ByteArrayOutputStream](new ByteArrayOutputStream(), null)// Init the 2nd lazy.
    //    val byteArrayOutputStream = byteArrayPool[]
    var byteArrayOutputStream = new ByteArrayOutputStreamWithOffset()
    var streamWriter = new OutputStreamWriter(byteArrayOutputStream)
    var writer = new BufferedWriter(streamWriter)
    var csvWriter = CountingWriter(writer)
    var totalSize = 0L
    var lastIndex = 0
    for ((row, index) <- rows.zipWithIndex) {
      RowCSVWriterUtils.writeRowAsCSV(row, parameters.schema, timeZone, csvWriter)
      if (csvWriter.getCounter >= parameters.writeOptions.streamIngestUncompressedMaxSize) {
        KDSU.logWarn(
          className,
          s"Batch $batchIdForTracing exceeds the max streaming size ${parameters.writeOptions.streamIngestUncompressedMaxSize} " +
            s"MB compressed!.Streaming ${csvWriter.getCounter} bytes from batch $batchIdForTracing." +
            s"Index of the batch ($index).")
        writer.flush()
        streamWriter.flush()
        if (lastIndex != 0) {
          // Split the byteArrayOutputStream into two - i need actually that the one we write to will be reset
          val firstBB = byteArrayOutputStream.toByteArray
          val bb2 = byteArrayOutputStream.createNewFromOffset(lastIndex)
          byteArrayOutputStream = bb2
          totalSize += lastIndex

          streamBytesIntoKusto(
            batchIdForTracing,
            firstBB,
            ingestionProperties,
            parameters.writeOptions,
            streamingClient,
            lastIndex
          ) // =4mb-size(last row)
          lastIndex = bb2.size()
          // TODO Is it really better > (other option is to copy the data from the stream to a new stream - which i try to avoid)?
          streamWriter = new OutputStreamWriter(byteArrayOutputStream)
          writer = new BufferedWriter(streamWriter)
          csvWriter = CountingWriter(writer, bb2.size())
        } else {
          KDSU.logInfo(
            className,
            s"Streaming one line as individual byte as the row size is ${csvWriter.getCounter}. Batch id: $batchIdForTracing.")
          streamBytesIntoKusto(
            batchIdForTracing,
            byteArrayOutputStream.getByteArrayOrCopy,
            ingestionProperties,
            parameters.writeOptions,
            streamingClient,
            byteArrayOutputStream.size())
          byteArrayOutputStream.reset()
          totalSize += csvWriter.getCounter
          csvWriter.resetCounter()
        }
      } else {
        // flush before counting output size
        writer.flush()
        lastIndex = byteArrayOutputStream
          .size() // TODO Can i simply use csvWriter.getCounter without flush? (we count all bytes and no transformation is done)
      }
    }

    // Close all resources
    writer.flush()
    byteArrayOutputStream.flush()
    IOUtils.close(writer, byteArrayOutputStream)
    if (csvWriter.getCounter > 0) {
      KDSU.logInfo(
        className,
        s"Streaming final batch of ${csvWriter.getCounter} bytes from batch $batchIdForTracing.")
      totalSize += csvWriter.getCounter

      streamBytesIntoKusto(
        batchIdForTracing,
        byteArrayOutputStream.getByteArrayOrCopy,
        ingestionProperties,
        parameters.writeOptions,
        streamingClient,
        byteArrayOutputStream.size())
    }
    if (totalSize > WarnStreamingBytes) {
      KDSU.logWarn(
        className,
        s"Total of $totalSize bytes were ingested in the batch. Please consider 'Queued' writeMode for ingestion.")
    }
  }