private def ingestToTemporaryTableByWorkers()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [484:513]


  private def ingestToTemporaryTableByWorkers(
      batchIdForTracing: String,
      rows: Iterator[InternalRow],
      partitionsResults: CollectionAccumulator[PartitionResult],
      ingestionProperties: IngestionProperties,
      parameters: KustoWriteResource): Unit = {
    val partitionId = TaskContext.getPartitionId
    KDSU.logInfo(
      className,
      s"Processing partition: '$partitionId' in requestId: '${parameters.writeOptions.requestId}'$batchIdForTracing")
    val clientCache = KustoClientCache.getClient(
      parameters.coordinates.clusterUrl,
      parameters.authentication,
      parameters.coordinates.ingestionUrl,
      parameters.coordinates.clusterAlias)
    val ingestClient = clientCache.ingestClient
    CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo)

    val reqRetryOpts = new RequestRetryOptions(
      RetryPolicyType.FIXED,
      KCONST.QueueRetryAttempts,
      Duration.ofSeconds(KCONST.DefaultTimeoutQueueing),
      null,
      null,
      null)
    ingestClient.setQueueRequestOptions(reqRetryOpts)
    // We force blocking here, since the driver can only complete the ingestion process
    // once all partitions are ingested into the temporary table
    ingestRowsIntoKusto(rows, ingestClient, ingestionProperties, partitionsResults, batchIdForTracing, parameters)
  }