def processIngestionStatusResults()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala [242:284]


  def processIngestionStatusResults(
      partitionId: Int = 0,
      ingestionInfoString: String,
      shouldThrowOnTagsAlreadyExists: Boolean,
      ingestionStatusResult: IngestionStatus): Unit = {
    ingestionStatusResult.status match {
      case OperationStatus.Pending =>
        throw new RuntimeException(
          s"Ingestion to Kusto failed on timeout failure. $ingestionInfoString,  partition: '$partitionId'")
      case OperationStatus.Succeeded =>
        KDSU.logInfo(
          myName,
          s"Ingestion to Kusto succeeded. $ingestionInfoString,  partition: '$partitionId', " +
            s"from: '${ingestionStatusResult.ingestionSourcePath}' , Operation ${ingestionStatusResult.operationId}")
      case OperationStatus.Skipped =>
        // TODO: should we throw ?
        KDSU.logInfo(
          myName,
          s"Ingestion to Kusto skipped. $ingestionInfoString, " +
            s"partition: '$partitionId', from: '${ingestionStatusResult.ingestionSourcePath}', " +
            s"Operation ${ingestionStatusResult.operationId}")
      case otherStatus: Any =>
        // TODO error code should be added to java client
        if (ingestionStatusResult.errorCodeString != "Skipped_IngestByTagAlreadyExists") {
          throw new RuntimeException(
            s"Ingestion to Kusto failed with status '$otherStatus'." +
              s" $ingestionInfoString, partition: '$partitionId'. Ingestion info: '${mapper.writerWithDefaultPrettyPrinter
                  .writeValueAsString(ingestionStatusResult)}'")
        } else if (shouldThrowOnTagsAlreadyExists) {
          // TODO - think about this logic and other cases that should not throw all (maybe everything that starts with skip? this actualy
          //  seems like a bug in engine that the operation status is not Skipped)
          //  (Skipped_IngestByTagAlreadyExists is relevant for dedup flow only as in other cases we cancel the ingestion altogether)
          throw new RuntimeException(s"Ingestion to Kusto skipped with status '$otherStatus'." +
            s" $ingestionInfoString, partition: '$partitionId'. Ingestion info: '${new ObjectMapper().writerWithDefaultPrettyPrinter
                .writeValueAsString(ingestionStatusResult)}'")
        }
        KDSU.logInfo(
          myName,
          s"Ingestion to Kusto failed. $ingestionInfoString, " +
            s"partition: '$partitionId', from: '${ingestionStatusResult.ingestionSourcePath}', " +
            s"Operation ${ingestionStatusResult.operationId}")
    }
  }