in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala [305:344]
private[kusto] def exportPartitionToBlob(
partition: KustoPartition,
request: KustoReadRequest,
storage: TransientStorageParameters,
directory: String,
options: KustoReadOptions,
filtering: KustoFiltering): Unit = {
val supportNewParquetWriter = new ComparableVersion(request.sparkSession.version)
.compareTo(new ComparableVersion(KustoReader.minimalParquetWriterVersion)) > 0
if (!supportNewParquetWriter) {
KDSU.logWarn(
myName,
"Setting useNativeParquetWriter=false. Users are advised to move to Spark versions >= 3.3.0 to leverage the performance and cost improvements of" +
"new encoding schemes introduced in both Kusto parquet files write and Spark parquet read")
}
val exportCommand = CslCommandsGenerator.generateExportDataCommand(
query = KustoFilter.pruneAndFilter(request.schema, request.query, filtering),
directory = directory,
partitionId = partition.idx,
storageParameters = storage,
partitionPredicate = partition.predicate,
additionalExportOptions = options.additionalExportOptions,
supportNewParquetWriter = supportNewParquetWriter)
val commandResult: KustoResultSetTable = client
.executeEngine(
request.kustoCoordinates.database,
exportCommand,
"exportPartitionToBlob",
request.clientRequestProperties.orNull)
.getPrimaryResults
KDSU.verifyAsyncCommandCompletion(
client.engineClient,
request.kustoCoordinates.database,
commandResult,
timeOut = request.timeout,
doingWhat = s"export data to blob directory: ('$directory') preparing it for reading.",
loggerName = myName,
requestId = request.requestId)
}