in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [297:323]
private def ingestRowsIntoKusto(
rows: Iterator[InternalRow],
ingestClient: IngestClient,
ingestionProperties: IngestionProperties,
partitionsResults: CollectionAccumulator[PartitionResult],
batchIdForTracing: String,
parameters: KustoWriteResource): Unit = {
// Transactional mode write into the temp table instead of the destination table
if (parameters.writeOptions.writeMode == WriteMode.Transactional) {
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE)
ingestionProperties.setReportLevel(
IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES)
}
ingestionProperties.setDataFormat(DataFormat.CSV.name)
/* A try block may be redundant here. An exception thrown has to be propagated depending on the exception */
ingestRows(
rows,
parameters,
ingestClient,
ingestionProperties,
partitionsResults,
batchIdForTracing)
KDSU.logInfo(
className,
s"Ingesting from blob(s) partition: ${TaskContext.getPartitionId()} requestId: " +
s"'${parameters.writeOptions.requestId}' batch$batchIdForTracing")
}