in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [484:513]
private def ingestToTemporaryTableByWorkers(
batchIdForTracing: String,
rows: Iterator[InternalRow],
partitionsResults: CollectionAccumulator[PartitionResult],
ingestionProperties: IngestionProperties,
parameters: KustoWriteResource): Unit = {
val partitionId = TaskContext.getPartitionId
KDSU.logInfo(
className,
s"Processing partition: '$partitionId' in requestId: '${parameters.writeOptions.requestId}'$batchIdForTracing")
val clientCache = KustoClientCache.getClient(
parameters.coordinates.clusterUrl,
parameters.authentication,
parameters.coordinates.ingestionUrl,
parameters.coordinates.clusterAlias)
val ingestClient = clientCache.ingestClient
CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo)
val reqRetryOpts = new RequestRetryOptions(
RetryPolicyType.FIXED,
KCONST.QueueRetryAttempts,
Duration.ofSeconds(KCONST.DefaultTimeoutQueueing),
null,
null,
null)
ingestClient.setQueueRequestOptions(reqRetryOpts)
// We force blocking here, since the driver can only complete the ingestion process
// once all partitions are ingested into the temporary table
ingestRowsIntoKusto(rows, ingestClient, ingestionProperties, partitionsResults, batchIdForTracing, parameters)
}