private[kusto] def distributedBuildScan()

in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoReader.scala [108:165]


  private[kusto] def distributedBuildScan(
      kustoClient: ExtendedKustoClient,
      request: KustoReadRequest,
      storage: TransientStorageParameters,
      options: KustoReadOptions,
      filtering: KustoFiltering): RDD[Row] = {
    var paths: Seq[String] = Seq()
    // if distributedReadModeTransientCacheEnabled is set to true, then check if path is cached and use it
    // if not export and cache the path for reuse
    if (options.distributedReadModeTransientCacheEnabled) {
      val key = DistributedReadModeTransientCacheKey(
        request.query,
        request.kustoCoordinates,
        request.authentication)
      if (distributedReadModeTransientCache.contains(key)) {
        KDSU.logInfo(className, "Fetching from distributedReadModeTransientCache: hit, reusing cached export paths")
        paths = distributedReadModeTransientCache(key)
      } else {
        KDSU.logInfo(className, "distributedReadModeTransientCache: miss, exporting to cache paths")
        val filter = determineFilterPushDown(
          options.queryFilterPushDown,
          queryFilterPushDownDefault = false,
          filtering)
        paths = exportToStorage(kustoClient, request, storage, options, filter)
        distributedReadModeTransientCache(key) = paths
      }
    } else {
      val filter = determineFilterPushDown(
        options.queryFilterPushDown,
        queryFilterPushDownDefault = true,
        filtering)
      paths = exportToStorage(kustoClient, request, storage, options, filter)
    }

    val rdd =
      try {
        request.sparkSession.read.parquet(paths: _*).rdd
      } catch {
        case ex: Exception =>
          // Check whether the result is empty, causing an IO exception on reading empty parquet file
          // We don't mind generating the filtered query again - it only happens upon exception
          val filteredQuery = KustoFilter.pruneAndFilter(request.schema, request.query, filtering)
          val count = KDSU.countRows(
            kustoClient.engineClient,
            filteredQuery,
            request.kustoCoordinates.database,
            request.clientRequestProperties.orNull)

          if (count == 0) {
            request.sparkSession.emptyDataFrame.rdd
          } else {
            throw ex
          }
      }

    KDSU.logInfo(className, "Transaction data read from blob storage, paths:" + paths)
    rdd
  }