in connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala [187:240]
def pollOnResult(
partitionResult: PartitionResult,
requestId: String,
timeout: Long,
ingestionInfoString: String,
shouldThrowOnTagsAlreadyExists: Boolean): Unit = {
var finalRes: Option[IngestionStatus] = None
KDSU
.doWhile[Option[IngestionStatus]](
() => {
try {
finalRes = Some(partitionResult.ingestionResult.getIngestionStatusCollection.get(0))
finalRes
} catch {
case e: TableServiceErrorException =>
KDSU.reportExceptionAndThrow(
myName,
e,
s"TableServiceErrorException : RequestId: $requestId",
shouldNotThrow = true)
None
case e: Exception =>
KDSU.reportExceptionAndThrow(
myName,
e,
s"Failed to fetch operation status. RequestId: $requestId")
None
}
},
0,
DelayPeriodBetweenCalls,
res => {
val pending = res.isDefined && res.get.status == OperationStatus.Pending
if (pending) {
KDSU.logDebug(
myName,
s"Polling on result for partition: '${partitionResult.partitionId}' in requestId: $requestId, status is-'Pending'")
}
pending
},
res => finalRes = res,
maxWaitTimeBetweenCallsMillis = KDSU.WriteInitialMaxWaitTime.toMillis.toInt,
maxWaitTimeAfterMinute = KDSU.WriteMaxWaitTime.toMillis.toInt)
.await(timeout, TimeUnit.MILLISECONDS)
finalRes match {
case Some(ingestResults) =>
processIngestionStatusResults(
partitionResult.partitionId,
ingestionInfoString,
shouldThrowOnTagsAlreadyExists,
ingestResults)
case None => throw new RuntimeException("Failed to poll on ingestion status.")
}
}