in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [545:700]
private[kusto] def ingestRows(
rows: Iterator[InternalRow],
parameters: KustoWriteResource,
ingestClient: IngestClient,
ingestionProperties: IngestionProperties,
partitionsResults: CollectionAccumulator[PartitionResult],
batchIdForTracing: String): Unit = {
val partitionId = TaskContext.getPartitionId
val partitionIdString = TaskContext.getPartitionId.toString
val taskMap = new ConcurrentHashMap[String, BlobWriteResource]()
def ingest(
blobResource: BlobWriteResource,
size: Long,
sas: String,
flushImmediately: Boolean = false,
blobUUID: String,
kustoClient: ExtendedKustoClient): Unit = {
var props = ingestionProperties
val blobUri = blobResource.blob.getStorageUri.getPrimaryUri.toString
if (parameters.writeOptions.ensureNoDupBlobs || (!props.getFlushImmediately && flushImmediately)) {
// Need to copy the maybeSparkIngestionProperties so that only this blob ingestion will be effected
props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties)
}
if (parameters.writeOptions.ensureNoDupBlobs) {
val pref = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, batchIdForTracing)
val tag = pref + blobUUID
val ingestIfNotExist = new util.ArrayList[String]
ingestIfNotExist.addAll(props.getIngestIfNotExists)
val ingestBy: util.List[String] = new util.ArrayList[String]
ingestBy.addAll(props.getIngestByTags)
ingestBy.add(tag)
ingestIfNotExist.add(tag)
props.setIngestByTags(ingestBy)
props.setIngestIfNotExists(ingestIfNotExist)
}
if (!props.getFlushImmediately && flushImmediately) {
props.setFlushImmediately(true)
}
// write the data here
val partitionsResult = KDSU.retryApplyFunction(
i => {
Try(
ingestClient.ingestFromBlob(
new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()),
props)) match {
case Success(x) =>
<!-- The statuses of the ingestion operations are now set in the ingestion result -->
val blobUrlWithSas =
s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}"
val containerWithSas = new ContainerWithSas(blobUrlWithSas, null)
kustoClient.reportIngestionResult(containerWithSas, success = true)
x
case Failure(e: Throwable) =>
KDSU.reportExceptionAndThrow(
className,
e,
s"Queueing blob for ingestion, retry number '$i', in partition " +
s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}")
val blobUrlWithSas =
s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}"
val containerWithSas = new ContainerWithSas(blobUrlWithSas, null)
kustoClient.reportIngestionResult(containerWithSas, success = false)
null
}
},
this.retryConfig,
"Ingest into Kusto")
if (parameters.writeOptions.writeMode == WriteMode.Transactional) {
partitionsResults.add(PartitionResult(partitionsResult, partitionId))
}
KDSU.logInfo(
className,
s"Queued blob for ingestion in partition $partitionIdString " +
s"for requestId: '${parameters.writeOptions.requestId}")
}
val kustoClient = KustoClientCache.getClient(
parameters.coordinates.clusterUrl,
parameters.authentication,
parameters.coordinates.ingestionUrl,
parameters.coordinates.clusterAlias)
val maxBlobSize = parameters.writeOptions.batchLimit * KCONST.OneMegaByte
var curBlobUUID = UUID.randomUUID().toString
// This blobWriter will be used later to write the rows to blob storage from which it will be ingested to Kusto
val initialBlobWriter: BlobWriteResource = createBlobWriter(
parameters.coordinates,
parameters.tmpTableName,
kustoClient,
partitionIdString,
0,
curBlobUUID)
val timeZone = TimeZone.getTimeZone(parameters.writeOptions.timeZone).toZoneId
// Serialize rows to ingest and send to blob storage.
val lastBlobWriter = rows.zipWithIndex.foldLeft[BlobWriteResource](initialBlobWriter) {
case (blobWriter, row) =>
RowCSVWriterUtils.writeRowAsCSV(row._1, parameters.schema, timeZone, blobWriter.csvWriter)
val count = blobWriter.csvWriter.getCounter
val shouldNotCommitBlockBlob = count < maxBlobSize
if (shouldNotCommitBlockBlob) {
blobWriter
} else {
if (parameters.writeOptions.ensureNoDupBlobs) {
taskMap.put(curBlobUUID, blobWriter)
} else {
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob number ${row._2}, with size $count")
finalizeBlobWrite(blobWriter)
ingest(
blobWriter,
blobWriter.csvWriter.getCounter,
blobWriter.sas,
flushImmediately = !parameters.writeOptions.disableFlushImmediately,
curBlobUUID,
kustoClient)
curBlobUUID = UUID.randomUUID().toString
createBlobWriter(
parameters.coordinates,
parameters.tmpTableName,
kustoClient,
partitionIdString,
row._2,
curBlobUUID)
}
}
}
KDSU.logInfo(
className,
s"Finished serializing rows in partition $partitionIdString for " +
s"requestId: '${parameters.writeOptions.requestId}' ")
finalizeBlobWrite(lastBlobWriter)
if (lastBlobWriter.csvWriter.getCounter > 0) {
if (parameters.writeOptions.ensureNoDupBlobs) {
taskMap.put(curBlobUUID, lastBlobWriter)
} else {
ingest(
lastBlobWriter,
lastBlobWriter.csvWriter.getCounter,
lastBlobWriter.sas,
flushImmediately = false,
curBlobUUID,
kustoClient)
}
}
if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) {
taskMap.forEach((uuid, bw) => {
ingest(bw, bw.csvWriter.getCounter, bw.sas, flushImmediately = false, uuid, kustoClient)
})
}
}