in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala [200:237]
private def exportToStorage(
kustoClient: ExtendedKustoClient,
request: KustoReadRequest,
storage: TransientStorageParameters,
options: KustoReadOptions,
filtering: KustoFiltering) = {
KDSU.logInfo(
className,
s"Starting exporting data from Kusto to blob storage in Distributed mode. requestId: ${request.requestId}")
setupBlobAccess(request, storage)
val partitions = calculatePartitions(options.partitionOptions)
val reader = new KustoReader(kustoClient)
val directory = s"${request.kustoCoordinates.database}/dir${UUID.randomUUID()}/"
.replaceAll("[^0-9a-zA-Z/]", "_")
for (partition <- partitions) {
reader.exportPartitionToBlob(
partition.asInstanceOf[KustoPartition],
request,
storage,
directory,
options,
filtering)
}
val paths = storage.storageCredentials
.filter(params => dirExist(request.sparkSession, params, directory, storage.endpointSuffix))
.map(params =>
s"wasbs://${params.blobContainer}" +
s"@${params.storageAccountName}.blob.${storage.endpointSuffix}/$directory")
KDSU.logInfo(
className,
s"Finished exporting from Kusto to ${paths.mkString(",")}" +
s", on requestId: ${request.requestId}, will start parquet reading now")
paths
}