in connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala [36:185]
private[kusto] def finalizeIngestionWhenWorkersSucceeded(
coordinates: KustoCoordinates,
batchIdIfExists: String,
tmpTableName: String,
partitionsResults: CollectionAccumulator[PartitionResult],
writeOptions: WriteOptions,
crp: ClientRequestProperties,
tableExists: Boolean,
sparkContext: SparkContext,
authentication: KustoAuthentication,
kustoClient: ExtendedKustoClient,
sinkStartTime: Instant): Unit = {
if (!kustoClient.shouldIngestData(
coordinates,
writeOptions.maybeSparkIngestionProperties,
tableExists,
crp)) {
KDSU.logInfo(myName, s"$IngestSkippedTrace '${coordinates.table}'")
} else {
val mergeTask = Future {
val requestId = writeOptions.requestId
val ingestionInfoString =
s"RequestId: $requestId cluster: '${coordinates.clusterAlias}', " +
s"database: '${coordinates.database}', table: '$tmpTableName' $batchIdIfExists"
KDSU.logInfo(
myName,
s"Polling on ingestion results for requestId: $requestId, will move data to " +
s"destination table when finished")
try {
if (writeOptions.pollingOnDriver) {
partitionsResults.value.asScala.foreach(partitionResult =>
pollOnResult(
partitionResult,
requestId,
writeOptions.timeout.toMillis,
ingestionInfoString,
!writeOptions.ensureNoDupBlobs))
} else {
KDSU.logWarn(
myName,
"IMPORTANT: It's highly recommended to set pollingOnDriver to true on production!\tRead here why https://github.com/Azure/azure-kusto-spark/blob/master/docs/KustoSink.md#supported-options")
// Specifiying numSlices = 1 so that only one task is created
val resultsRdd =
sparkContext.parallelize(partitionsResults.value.asScala, numSlices = 1)
resultsRdd.sparkContext.setJobDescription("Polling on ingestion results")
resultsRdd.foreachPartition((results: Iterator[PartitionResult]) =>
results.foreach(partitionResult =>
pollOnResult(
partitionResult,
requestId,
writeOptions.timeout.toMillis,
ingestionInfoString,
!writeOptions.ensureNoDupBlobs)))
}
if (partitionsResults.value.size > 0) {
val pref = KDSU.getDedupTagsPrefix(writeOptions.requestId, batchIdIfExists)
val moveOperation = (_: Int) => {
val client = KustoClientCache.getClient(
coordinates.clusterUrl,
authentication,
coordinates.ingestionUrl,
coordinates.clusterAlias)
client.executeEngine(
coordinates.database,
generateTableAlterMergePolicyCommand(
tmpTableName,
allowMerge = false,
allowRebuild = false),
"alterMergePolicyCommand",
crp)
// Drop dedup tags
if (writeOptions.ensureNoDupBlobs) {
client.retryAsyncOp(
coordinates.database,
generateExtentTagsDropByPrefixCommand(tmpTableName, pref),
crp,
writeOptions.timeout,
s"drops extents from temp table '$tmpTableName' ",
"extentsDrop",
writeOptions.requestId)
}
client.moveExtents(
coordinates.database,
tmpTableName,
coordinates.table.get,
crp,
writeOptions,
sinkStartTime)
}
// Move data to real table
// Protect tmp table from merge/rebuild and move data to the table requested by customer. This operation is atomic.
// We are using the ingestIfNotExists Tags here too (on top of the check at the start of the flow) so that if
// several flows started together only one of them would ingest
KDSU.logInfo(
myName,
s"Final ingestion step: Moving extents from '$tmpTableName, requestId: ${writeOptions.requestId}," +
s"$batchIdIfExists")
if (writeOptions.pollingOnDriver) {
moveOperation(0)
} else {
// Specifiying numSlices = 1 so that only one task is created
val moveExtentsRdd = sparkContext.parallelize(Seq(1), numSlices = 1)
moveExtentsRdd.sparkContext.setJobDescription("Moving extents to target table")
moveExtentsRdd.foreach(moveOperation)
}
KDSU.logInfo(
myName,
s"write to Kusto table '${coordinates.table.get}' finished successfully " +
s"requestId: ${writeOptions.requestId} $batchIdIfExists")
} else {
KDSU.logWarn(
myName,
s"write to Kusto table '${coordinates.table.get}' finished with no data written " +
s"requestId: ${writeOptions.requestId} $batchIdIfExists")
}
} catch {
case ex: Exception =>
KDSU.reportExceptionAndThrow(
myName,
ex,
"Trying to poll on pending ingestions",
coordinates.clusterUrl,
coordinates.database,
coordinates.table.getOrElse("Unspecified table name"),
writeOptions.requestId)
} finally {
kustoClient.cleanupIngestionByProducts(coordinates.database, tmpTableName, crp)
}
}
if (!writeOptions.isAsync) {
try {
Await.result(mergeTask, writeOptions.timeout)
} catch {
case _: TimeoutException =>
KDSU.reportExceptionAndThrow(
myName,
new TimeoutException("Timed out polling on ingestion status"),
"polling on ingestion status",
coordinates.clusterUrl,
coordinates.database,
coordinates.table.getOrElse("Unspecified table name"))
}
}
}
}