def pollOnResult()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala [187:240]


  def pollOnResult(
      partitionResult: PartitionResult,
      requestId: String,
      timeout: Long,
      ingestionInfoString: String,
      shouldThrowOnTagsAlreadyExists: Boolean): Unit = {
    var finalRes: Option[IngestionStatus] = None
    KDSU
      .doWhile[Option[IngestionStatus]](
        () => {
          try {
            finalRes = Some(partitionResult.ingestionResult.getIngestionStatusCollection.get(0))
            finalRes
          } catch {
            case e: TableServiceErrorException =>
              KDSU.reportExceptionAndThrow(
                myName,
                e,
                s"TableServiceErrorException : RequestId: $requestId",
                shouldNotThrow = true)
              None
            case e: Exception =>
              KDSU.reportExceptionAndThrow(
                myName,
                e,
                s"Failed to fetch operation status. RequestId: $requestId")
              None
          }
        },
        0,
        DelayPeriodBetweenCalls,
        res => {
          val pending = res.isDefined && res.get.status == OperationStatus.Pending
          if (pending) {
            KDSU.logDebug(
              myName,
              s"Polling on result for partition: '${partitionResult.partitionId}' in requestId: $requestId, status is-'Pending'")
          }
          pending
        },
        res => finalRes = res,
        maxWaitTimeBetweenCallsMillis = KDSU.WriteInitialMaxWaitTime.toMillis.toInt,
        maxWaitTimeAfterMinute = KDSU.WriteMaxWaitTime.toMillis.toInt)
      .await(timeout, TimeUnit.MILLISECONDS)
    finalRes match {
      case Some(ingestResults) =>
        processIngestionStatusResults(
          partitionResult.partitionId,
          ingestionInfoString,
          shouldThrowOnTagsAlreadyExists,
          ingestResults)
      case None => throw new RuntimeException("Failed to poll on ingestion status.")
    }
  }