private[kusto] def ingestRows()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [545:700]


  private[kusto] def ingestRows(
      rows: Iterator[InternalRow],
      parameters: KustoWriteResource,
      ingestClient: IngestClient,
      ingestionProperties: IngestionProperties,
      partitionsResults: CollectionAccumulator[PartitionResult],
      batchIdForTracing: String): Unit = {
    val partitionId = TaskContext.getPartitionId
    val partitionIdString = TaskContext.getPartitionId.toString
    val taskMap = new ConcurrentHashMap[String, BlobWriteResource]()

    def ingest(
        blobResource: BlobWriteResource,
        size: Long,
        sas: String,
        flushImmediately: Boolean = false,
        blobUUID: String,
        kustoClient: ExtendedKustoClient): Unit = {
      var props = ingestionProperties
      val blobUri = blobResource.blob.getStorageUri.getPrimaryUri.toString
      if (parameters.writeOptions.ensureNoDupBlobs || (!props.getFlushImmediately && flushImmediately)) {
        // Need to copy the maybeSparkIngestionProperties so that only this blob ingestion will be effected
        props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties)
      }

      if (parameters.writeOptions.ensureNoDupBlobs) {
        val pref = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, batchIdForTracing)
        val tag = pref + blobUUID
        val ingestIfNotExist = new util.ArrayList[String]
        ingestIfNotExist.addAll(props.getIngestIfNotExists)
        val ingestBy: util.List[String] = new util.ArrayList[String]
        ingestBy.addAll(props.getIngestByTags)

        ingestBy.add(tag)
        ingestIfNotExist.add(tag)
        props.setIngestByTags(ingestBy)
        props.setIngestIfNotExists(ingestIfNotExist)
      }

      if (!props.getFlushImmediately && flushImmediately) {
        props.setFlushImmediately(true)
      }
      // write the data here
      val partitionsResult = KDSU.retryApplyFunction(
        i => {
          Try(
            ingestClient.ingestFromBlob(
              new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()),
              props)) match {
            case Success(x) =>
              <!-- The statuses of the ingestion operations are now set in the ingestion result -->
              val blobUrlWithSas =
                s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}"
              val containerWithSas = new ContainerWithSas(blobUrlWithSas, null)
              kustoClient.reportIngestionResult(containerWithSas, success = true)
              x
            case Failure(e: Throwable) =>
              KDSU.reportExceptionAndThrow(
                className,
                e,
                s"Queueing blob for ingestion, retry number '$i', in partition " +
                  s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}")
              val blobUrlWithSas =
                s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}"
              val containerWithSas = new ContainerWithSas(blobUrlWithSas, null)
              kustoClient.reportIngestionResult(containerWithSas, success = false)
              null
          }
        },
        this.retryConfig,
        "Ingest into Kusto")
      if (parameters.writeOptions.writeMode == WriteMode.Transactional) {
        partitionsResults.add(PartitionResult(partitionsResult, partitionId))
      }
      KDSU.logInfo(
        className,
        s"Queued blob for ingestion in partition $partitionIdString " +
          s"for requestId: '${parameters.writeOptions.requestId}")
    }
    val kustoClient = KustoClientCache.getClient(
      parameters.coordinates.clusterUrl,
      parameters.authentication,
      parameters.coordinates.ingestionUrl,
      parameters.coordinates.clusterAlias)
    val maxBlobSize = parameters.writeOptions.batchLimit * KCONST.OneMegaByte
    var curBlobUUID = UUID.randomUUID().toString
    // This blobWriter will be used later to write the rows to blob storage from which it will be ingested to Kusto
    val initialBlobWriter: BlobWriteResource = createBlobWriter(
      parameters.coordinates,
      parameters.tmpTableName,
      kustoClient,
      partitionIdString,
      0,
      curBlobUUID)
    val timeZone = TimeZone.getTimeZone(parameters.writeOptions.timeZone).toZoneId
    // Serialize rows to ingest and send to blob storage.
    val lastBlobWriter = rows.zipWithIndex.foldLeft[BlobWriteResource](initialBlobWriter) {
      case (blobWriter, row) =>
        RowCSVWriterUtils.writeRowAsCSV(row._1, parameters.schema, timeZone, blobWriter.csvWriter)

        val count = blobWriter.csvWriter.getCounter
        val shouldNotCommitBlockBlob = count < maxBlobSize
        if (shouldNotCommitBlockBlob) {
          blobWriter
        } else {
          if (parameters.writeOptions.ensureNoDupBlobs) {
            taskMap.put(curBlobUUID, blobWriter)
          } else {
            KDSU.logInfo(
              className,
              s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
                s"blob number ${row._2}, with size $count")
            finalizeBlobWrite(blobWriter)
            ingest(
              blobWriter,
              blobWriter.csvWriter.getCounter,
              blobWriter.sas,
              flushImmediately = !parameters.writeOptions.disableFlushImmediately,
              curBlobUUID,
              kustoClient)
            curBlobUUID = UUID.randomUUID().toString
            createBlobWriter(
              parameters.coordinates,
              parameters.tmpTableName,
              kustoClient,
              partitionIdString,
              row._2,
              curBlobUUID)
          }
        }
    }

    KDSU.logInfo(
      className,
      s"Finished serializing rows in partition $partitionIdString for " +
        s"requestId: '${parameters.writeOptions.requestId}' ")
    finalizeBlobWrite(lastBlobWriter)
    if (lastBlobWriter.csvWriter.getCounter > 0) {
      if (parameters.writeOptions.ensureNoDupBlobs) {
        taskMap.put(curBlobUUID, lastBlobWriter)
      } else {
        ingest(
          lastBlobWriter,
          lastBlobWriter.csvWriter.getCounter,
          lastBlobWriter.sas,
          flushImmediately = false,
          curBlobUUID,
          kustoClient)
      }
    }
    if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) {
      taskMap.forEach((uuid, bw) => {
        ingest(bw, bw.csvWriter.getCounter, bw.sas, flushImmediately = false, uuid, kustoClient)
      })
    }
  }