private[kusto] def finalizeIngestionWhenWorkersSucceeded()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala [36:185]


  private[kusto] def finalizeIngestionWhenWorkersSucceeded(
      coordinates: KustoCoordinates,
      batchIdIfExists: String,
      tmpTableName: String,
      partitionsResults: CollectionAccumulator[PartitionResult],
      writeOptions: WriteOptions,
      crp: ClientRequestProperties,
      tableExists: Boolean,
      sparkContext: SparkContext,
      authentication: KustoAuthentication,
      kustoClient: ExtendedKustoClient,
      sinkStartTime: Instant): Unit = {
    if (!kustoClient.shouldIngestData(
        coordinates,
        writeOptions.maybeSparkIngestionProperties,
        tableExists,
        crp)) {
      KDSU.logInfo(myName, s"$IngestSkippedTrace '${coordinates.table}'")
    } else {
      val mergeTask = Future {
        val requestId = writeOptions.requestId
        val ingestionInfoString =
          s"RequestId: $requestId cluster: '${coordinates.clusterAlias}', " +
            s"database: '${coordinates.database}', table: '$tmpTableName' $batchIdIfExists"
        KDSU.logInfo(
          myName,
          s"Polling on ingestion results for requestId: $requestId, will move data to " +
            s"destination table when finished")

        try {
          if (writeOptions.pollingOnDriver) {
            partitionsResults.value.asScala.foreach(partitionResult =>
              pollOnResult(
                partitionResult,
                requestId,
                writeOptions.timeout.toMillis,
                ingestionInfoString,
                !writeOptions.ensureNoDupBlobs))
          } else {
            KDSU.logWarn(
              myName,
              "IMPORTANT: It's highly recommended to set pollingOnDriver to true on production!\tRead here why https://github.com/Azure/azure-kusto-spark/blob/master/docs/KustoSink.md#supported-options")
            // Specifiying numSlices = 1 so that only one task is created
            val resultsRdd =
              sparkContext.parallelize(partitionsResults.value.asScala, numSlices = 1)
            resultsRdd.sparkContext.setJobDescription("Polling on ingestion results")
            resultsRdd.foreachPartition((results: Iterator[PartitionResult]) =>
              results.foreach(partitionResult =>
                pollOnResult(
                  partitionResult,
                  requestId,
                  writeOptions.timeout.toMillis,
                  ingestionInfoString,
                  !writeOptions.ensureNoDupBlobs)))
          }

          if (partitionsResults.value.size > 0) {
            val pref = KDSU.getDedupTagsPrefix(writeOptions.requestId, batchIdIfExists)
            val moveOperation = (_: Int) => {
              val client = KustoClientCache.getClient(
                coordinates.clusterUrl,
                authentication,
                coordinates.ingestionUrl,
                coordinates.clusterAlias)
              client.executeEngine(
                coordinates.database,
                generateTableAlterMergePolicyCommand(
                  tmpTableName,
                  allowMerge = false,
                  allowRebuild = false),
                "alterMergePolicyCommand",
                crp)
              // Drop dedup tags
              if (writeOptions.ensureNoDupBlobs) {
                client.retryAsyncOp(
                  coordinates.database,
                  generateExtentTagsDropByPrefixCommand(tmpTableName, pref),
                  crp,
                  writeOptions.timeout,
                  s"drops extents from temp table '$tmpTableName' ",
                  "extentsDrop",
                  writeOptions.requestId)
              }
              client.moveExtents(
                coordinates.database,
                tmpTableName,
                coordinates.table.get,
                crp,
                writeOptions,
                sinkStartTime)
            }
            // Move data to real table
            // Protect tmp table from merge/rebuild and move data to the table requested by customer. This operation is atomic.
            // We are using the ingestIfNotExists Tags here too (on top of the check at the start of the flow) so that if
            // several flows started together only one of them would ingest
            KDSU.logInfo(
              myName,
              s"Final ingestion step: Moving extents from '$tmpTableName, requestId: ${writeOptions.requestId}," +
                s"$batchIdIfExists")

            if (writeOptions.pollingOnDriver) {
              moveOperation(0)
            } else {
              // Specifiying numSlices = 1 so that only one task is created
              val moveExtentsRdd = sparkContext.parallelize(Seq(1), numSlices = 1)
              moveExtentsRdd.sparkContext.setJobDescription("Moving extents to target table")
              moveExtentsRdd.foreach(moveOperation)
            }

            KDSU.logInfo(
              myName,
              s"write to Kusto table '${coordinates.table.get}' finished successfully " +
                s"requestId: ${writeOptions.requestId} $batchIdIfExists")
          } else {
            KDSU.logWarn(
              myName,
              s"write to Kusto table '${coordinates.table.get}' finished with no data written " +
                s"requestId: ${writeOptions.requestId} $batchIdIfExists")
          }
        } catch {
          case ex: Exception =>
            KDSU.reportExceptionAndThrow(
              myName,
              ex,
              "Trying to poll on pending ingestions",
              coordinates.clusterUrl,
              coordinates.database,
              coordinates.table.getOrElse("Unspecified table name"),
              writeOptions.requestId)
        } finally {
          kustoClient.cleanupIngestionByProducts(coordinates.database, tmpTableName, crp)
        }
      }

      if (!writeOptions.isAsync) {
        try {
          Await.result(mergeTask, writeOptions.timeout)
        } catch {
          case _: TimeoutException =>
            KDSU.reportExceptionAndThrow(
              myName,
              new TimeoutException("Timed out polling on ingestion status"),
              "polling on ingestion status",
              coordinates.clusterUrl,
              coordinates.database,
              coordinates.table.getOrElse("Unspecified table name"))
        }
      }
    }
  }