in connector/src/main/scala/com/microsoft/kusto/spark/datasource/KustoRelation.scala [60:101]
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val kustoClient = KustoClientCache.getClient(
kustoCoordinates.clusterUrl,
authentication,
kustoCoordinates.ingestionUrl,
kustoCoordinates.clusterAlias)
val isUserOptionForceSingleMode = readOptions.readMode.contains(ReadMode.ForceSingleMode)
val (useSingleMode, estimatedRecordCount) = readOptions.readMode match {
// if the user provides a specific option , this is to be used no matter what
case Some(_) => (isUserOptionForceSingleMode, -1)
// If there is no option mentioned , then estimate which option to use
// Count records and see if we wat a distributed or single mode
case None =>
val estimatedRecordCountResult = Try(
KDSU.estimateRowsCount(
kustoClient.engineClient,
query,
kustoCoordinates.database,
clientRequestProperties.orNull))
/*
Return values of estimate method
- Non zero integer : In case estimate method returns a value , estimated result
- Zero : Estimate fails and falls back to count , this gives a 0 result
- An exception : Happens when estimate is a null/empty and we fallback to count and count fails as well (timeout)
*/
estimatedRecordCountResult match {
// if the count is lss than the 5k threshold,use single mode.
case Success(recordCount) =>
(recordCount <= KustoConstants.DirectQueryUpperBoundRows, recordCount)
// A case of query timing out. ForceDistributedMode will be used here
case Failure(_) => (false, -1)
}
}
// To avoid all the complexity and logically split , extract this to a separate API
buildScanImpl(
requiredColumns,
filters,
kustoClient,
isUserOptionForceSingleMode,
useSingleMode,
estimatedRecordCount)
}