in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [265:295]
private def ingestRowsIntoTempTbl(
rows: Iterator[InternalRow],
batchIdForTracing: String,
partitionsResults: CollectionAccumulator[PartitionResult],
parameters: KustoWriteResource): Unit = {
if (rows.isEmpty) {
KDSU.logWarn(
className,
s"sink to Kusto table '${parameters.coordinates.table.get}' with no rows to write " +
s"on partition ${TaskContext.getPartitionId} $batchIdForTracing")
} else {
val ingestionProperties = getIngestionProperties(
parameters.writeOptions,
parameters.coordinates.database,
if (parameters.writeOptions.writeMode == WriteMode.Transactional) {
parameters.tmpTableName
} else {
parameters.coordinates.table.get
})
if (parameters.writeOptions.writeMode == WriteMode.KustoStreaming) {
streamRowsIntoKustoByWorkers(batchIdForTracing, rows, ingestionProperties, parameters)
} else {
ingestToTemporaryTableByWorkers(
batchIdForTracing,
rows,
partitionsResults,
ingestionProperties,
parameters)
}
}
}