in connector/src/main/scala/com/microsoft/kusto/spark/datasink/FinalizeHelper.scala [242:284]
def processIngestionStatusResults(
partitionId: Int = 0,
ingestionInfoString: String,
shouldThrowOnTagsAlreadyExists: Boolean,
ingestionStatusResult: IngestionStatus): Unit = {
ingestionStatusResult.status match {
case OperationStatus.Pending =>
throw new RuntimeException(
s"Ingestion to Kusto failed on timeout failure. $ingestionInfoString, partition: '$partitionId'")
case OperationStatus.Succeeded =>
KDSU.logInfo(
myName,
s"Ingestion to Kusto succeeded. $ingestionInfoString, partition: '$partitionId', " +
s"from: '${ingestionStatusResult.ingestionSourcePath}' , Operation ${ingestionStatusResult.operationId}")
case OperationStatus.Skipped =>
// TODO: should we throw ?
KDSU.logInfo(
myName,
s"Ingestion to Kusto skipped. $ingestionInfoString, " +
s"partition: '$partitionId', from: '${ingestionStatusResult.ingestionSourcePath}', " +
s"Operation ${ingestionStatusResult.operationId}")
case otherStatus: Any =>
// TODO error code should be added to java client
if (ingestionStatusResult.errorCodeString != "Skipped_IngestByTagAlreadyExists") {
throw new RuntimeException(
s"Ingestion to Kusto failed with status '$otherStatus'." +
s" $ingestionInfoString, partition: '$partitionId'. Ingestion info: '${mapper.writerWithDefaultPrettyPrinter
.writeValueAsString(ingestionStatusResult)}'")
} else if (shouldThrowOnTagsAlreadyExists) {
// TODO - think about this logic and other cases that should not throw all (maybe everything that starts with skip? this actualy
// seems like a bug in engine that the operation status is not Skipped)
// (Skipped_IngestByTagAlreadyExists is relevant for dedup flow only as in other cases we cancel the ingestion altogether)
throw new RuntimeException(s"Ingestion to Kusto skipped with status '$otherStatus'." +
s" $ingestionInfoString, partition: '$partitionId'. Ingestion info: '${new ObjectMapper().writerWithDefaultPrettyPrinter
.writeValueAsString(ingestionStatusResult)}'")
}
KDSU.logInfo(
myName,
s"Ingestion to Kusto failed. $ingestionInfoString, " +
s"partition: '$partitionId', from: '${ingestionStatusResult.ingestionSourcePath}', " +
s"Operation ${ingestionStatusResult.operationId}")
}
}