in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala [108:165]
private[kusto] def distributedBuildScan(
kustoClient: ExtendedKustoClient,
request: KustoReadRequest,
storage: TransientStorageParameters,
options: KustoReadOptions,
filtering: KustoFiltering): RDD[Row] = {
var paths: Seq[String] = Seq()
// if distributedReadModeTransientCacheEnabled is set to true, then check if path is cached and use it
// if not export and cache the path for reuse
if (options.distributedReadModeTransientCacheEnabled) {
val key = DistributedReadModeTransientCacheKey(
request.query,
request.kustoCoordinates,
request.authentication)
if (distributedReadModeTransientCache.contains(key)) {
KDSU.logInfo(className, "Fetching from distributedReadModeTransientCache: hit, reusing cached export paths")
paths = distributedReadModeTransientCache(key)
} else {
KDSU.logInfo(className, "distributedReadModeTransientCache: miss, exporting to cache paths")
val filter = determineFilterPushDown(
options.queryFilterPushDown,
queryFilterPushDownDefault = false,
filtering)
paths = exportToStorage(kustoClient, request, storage, options, filter)
distributedReadModeTransientCache(key) = paths
}
} else {
val filter = determineFilterPushDown(
options.queryFilterPushDown,
queryFilterPushDownDefault = true,
filtering)
paths = exportToStorage(kustoClient, request, storage, options, filter)
}
val rdd =
try {
request.sparkSession.read.parquet(paths: _*).rdd
} catch {
case ex: Exception =>
// Check whether the result is empty, causing an IO exception on reading empty parquet file
// We don't mind generating the filtered query again - it only happens upon exception
val filteredQuery = KustoFilter.pruneAndFilter(request.schema, request.query, filtering)
val count = KDSU.countRows(
kustoClient.engineClient,
filteredQuery,
request.kustoCoordinates.database,
request.clientRequestProperties.orNull)
if (count == 0) {
request.sparkSession.emptyDataFrame.rdd
} else {
throw ex
}
}
KDSU.logInfo(className, "Transaction data read from blob storage, paths:" + paths)
rdd
}