in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [344:444]
private def streamRowsIntoKustoByWorkers(
batchIdForTracing: String,
rows: Iterator[InternalRow],
ingestionProperties: IngestionProperties,
parameters: KustoWriteResource): Unit = {
val streamingClient = KustoClientCache
.getClient(
parameters.coordinates.clusterUrl,
parameters.authentication,
parameters.coordinates.ingestionUrl,
parameters.coordinates.clusterAlias)
.streamingClient
val timeZone = TimeZone.getTimeZone(parameters.writeOptions.timeZone).toZoneId
// TODO - use a pool of two streams?
// var curBbId = 0
// val byteArrayPool = Array[ByteArrayOutputStream](new ByteArrayOutputStream(), null)// Init the 2nd lazy.
// val byteArrayOutputStream = byteArrayPool[]
var byteArrayOutputStream = new ByteArrayOutputStreamWithOffset()
var streamWriter = new OutputStreamWriter(byteArrayOutputStream)
var writer = new BufferedWriter(streamWriter)
var csvWriter = CountingWriter(writer)
var totalSize = 0L
var lastIndex = 0
for ((row, index) <- rows.zipWithIndex) {
RowCSVWriterUtils.writeRowAsCSV(row, parameters.schema, timeZone, csvWriter)
if (csvWriter.getCounter >= parameters.writeOptions.streamIngestUncompressedMaxSize) {
KDSU.logWarn(
className,
s"Batch $batchIdForTracing exceeds the max streaming size ${parameters.writeOptions.streamIngestUncompressedMaxSize} " +
s"MB compressed!.Streaming ${csvWriter.getCounter} bytes from batch $batchIdForTracing." +
s"Index of the batch ($index).")
writer.flush()
streamWriter.flush()
if (lastIndex != 0) {
// Split the byteArrayOutputStream into two - i need actually that the one we write to will be reset
val firstBB = byteArrayOutputStream.toByteArray
val bb2 = byteArrayOutputStream.createNewFromOffset(lastIndex)
byteArrayOutputStream = bb2
totalSize += lastIndex
streamBytesIntoKusto(
batchIdForTracing,
firstBB,
ingestionProperties,
parameters.writeOptions,
streamingClient,
lastIndex
) // =4mb-size(last row)
lastIndex = bb2.size()
// TODO Is it really better > (other option is to copy the data from the stream to a new stream - which i try to avoid)?
streamWriter = new OutputStreamWriter(byteArrayOutputStream)
writer = new BufferedWriter(streamWriter)
csvWriter = CountingWriter(writer, bb2.size())
} else {
KDSU.logInfo(
className,
s"Streaming one line as individual byte as the row size is ${csvWriter.getCounter}. Batch id: $batchIdForTracing.")
streamBytesIntoKusto(
batchIdForTracing,
byteArrayOutputStream.getByteArrayOrCopy,
ingestionProperties,
parameters.writeOptions,
streamingClient,
byteArrayOutputStream.size())
byteArrayOutputStream.reset()
totalSize += csvWriter.getCounter
csvWriter.resetCounter()
}
} else {
// flush before counting output size
writer.flush()
lastIndex = byteArrayOutputStream
.size() // TODO Can i simply use csvWriter.getCounter without flush? (we count all bytes and no transformation is done)
}
}
// Close all resources
writer.flush()
byteArrayOutputStream.flush()
IOUtils.close(writer, byteArrayOutputStream)
if (csvWriter.getCounter > 0) {
KDSU.logInfo(
className,
s"Streaming final batch of ${csvWriter.getCounter} bytes from batch $batchIdForTracing.")
totalSize += csvWriter.getCounter
streamBytesIntoKusto(
batchIdForTracing,
byteArrayOutputStream.getByteArrayOrCopy,
ingestionProperties,
parameters.writeOptions,
streamingClient,
byteArrayOutputStream.size())
}
if (totalSize > WarnStreamingBytes) {
KDSU.logWarn(
className,
s"Total of $totalSize bytes were ingested in the batch. Please consider 'Queued' writeMode for ingestion.")
}
}