in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoRelation.scala [103:191]
private def buildScanImpl(
requiredColumns: Array[String],
filters: Array[Filter],
kustoClient: ExtendedKustoClient,
isUserOptionForceSingleMode: Boolean,
useSingleMode: Boolean,
estimatedRecordCount: Int) = {
// Is a 0 only if both estimate and count return 0 in which case it is an empty RDD of rows
if (estimatedRecordCount == 0) {
sparkSession.emptyDataFrame.rdd
} else {
// Either a case of non-zero records or a case of timed-out.
if (useSingleMode) {
// There are less than 5000 (KustoConstants.DirectQueryUpperBoundRows) records, perform a single scan
val singleBuildScanResult = Try(
KustoReader.singleBuildScan(
kustoClient,
KustoReadRequest(
sparkSession,
cachedSchema,
kustoCoordinates,
query,
authentication,
timeout,
clientRequestProperties,
requestId),
KustoFiltering(requiredColumns, filters)))
// see if the scan succeeded
singleBuildScanResult match {
case Success(rdd) => rdd
case Failure(exception) =>
// If the user specified forceSingleMode explicitly and that cannot be honored , throw an exception back
// Only check is if exception is because of QueryLimits , it will fallback
val isRowLimitHit = ExceptionUtils
.getRootCauseStackTrace(exception)
.contains("Query execution has exceeded the allowed limits")
if (isUserOptionForceSingleMode && !isRowLimitHit) {
// Expected behavior for Issue#261
throw exception
} else {
// The case where used did not provide an option and we estimated to be a single scan.
// Our approximate estimate failed here , fallback to distributed
KDSU.reportExceptionAndThrow(
"KustoRelation",
exception,
"Failed with Single mode, falling back to Distributed mode,",
kustoCoordinates.clusterAlias,
kustoCoordinates.database,
requestId = requestId,
shouldNotThrow = true)
readOptions.partitionOptions.column = Some(getPartitioningColumn)
readOptions.partitionOptions.mode = Some(getPartitioningMode)
KustoReader.distributedBuildScan(
kustoClient,
KustoReadRequest(
sparkSession,
cachedSchema,
kustoCoordinates,
query,
authentication,
timeout,
clientRequestProperties,
requestId),
maybeStorageParameters.getOrElse(kustoClient.getTempBlobsForExport),
readOptions,
KustoFiltering(requiredColumns, filters))
}
}
} else {
// Determined to be distributed mode , through user property or by record count
readOptions.partitionOptions.column = Some(getPartitioningColumn)
readOptions.partitionOptions.mode = Some(getPartitioningMode)
KustoReader.distributedBuildScan(
kustoClient,
KustoReadRequest(
sparkSession,
cachedSchema,
kustoCoordinates,
query,
authentication,
timeout,
clientRequestProperties,
requestId),
maybeStorageParameters.getOrElse(kustoClient.getTempBlobsForExport),
readOptions,
KustoFiltering(requiredColumns, filters))
}
}
}