private def buildScanImpl()

in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoRelation.scala [103:191]


  private def buildScanImpl(
      requiredColumns: Array[String],
      filters: Array[Filter],
      kustoClient: ExtendedKustoClient,
      isUserOptionForceSingleMode: Boolean,
      useSingleMode: Boolean,
      estimatedRecordCount: Int) = {
    // Is a 0 only if both estimate and count return 0 in which case it is an empty RDD of rows
    if (estimatedRecordCount == 0) {
      sparkSession.emptyDataFrame.rdd
    } else {
      // Either a case of non-zero records or a case of timed-out.
      if (useSingleMode) {
        // There are less than 5000 (KustoConstants.DirectQueryUpperBoundRows) records, perform a single scan
        val singleBuildScanResult = Try(
          KustoReader.singleBuildScan(
            kustoClient,
            KustoReadRequest(
              sparkSession,
              cachedSchema,
              kustoCoordinates,
              query,
              authentication,
              timeout,
              clientRequestProperties,
              requestId),
            KustoFiltering(requiredColumns, filters)))
        // see if the scan succeeded
        singleBuildScanResult match {
          case Success(rdd) => rdd
          case Failure(exception) =>
            // If the user specified forceSingleMode explicitly and that cannot be honored , throw an exception back
            // Only check is if exception is because of QueryLimits , it will fallback
            val isRowLimitHit = ExceptionUtils
              .getRootCauseStackTrace(exception)
              .contains("Query execution has exceeded the allowed limits")
            if (isUserOptionForceSingleMode && !isRowLimitHit) {
              // Expected behavior for Issue#261
              throw exception
            } else {
              // The case where used did not provide an option and we estimated to be a single scan.
              // Our approximate estimate failed here , fallback to distributed
              KDSU.reportExceptionAndThrow(
                "KustoRelation",
                exception,
                "Failed with Single mode, falling back to Distributed mode,",
                kustoCoordinates.clusterAlias,
                kustoCoordinates.database,
                requestId = requestId,
                shouldNotThrow = true)
              readOptions.partitionOptions.column = Some(getPartitioningColumn)
              readOptions.partitionOptions.mode = Some(getPartitioningMode)
              KustoReader.distributedBuildScan(
                kustoClient,
                KustoReadRequest(
                  sparkSession,
                  cachedSchema,
                  kustoCoordinates,
                  query,
                  authentication,
                  timeout,
                  clientRequestProperties,
                  requestId),
                maybeStorageParameters.getOrElse(kustoClient.getTempBlobsForExport),
                readOptions,
                KustoFiltering(requiredColumns, filters))
            }
        }
      } else {
        // Determined to be distributed mode , through user property or by record count
        readOptions.partitionOptions.column = Some(getPartitioningColumn)
        readOptions.partitionOptions.mode = Some(getPartitioningMode)
        KustoReader.distributedBuildScan(
          kustoClient,
          KustoReadRequest(
            sparkSession,
            cachedSchema,
            kustoCoordinates,
            query,
            authentication,
            timeout,
            clientRequestProperties,
            requestId),
          maybeStorageParameters.getOrElse(kustoClient.getTempBlobsForExport),
          readOptions,
          KustoFiltering(requiredColumns, filters))
      }
    }
  }