in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [75:127]
def getReadParameters(
parameters: Map[String, String],
sqlContext: SQLContext): KustoReadOptions = {
val requestedPartitions = parameters.get(KustoDebugOptions.KUSTO_NUM_PARTITIONS)
val partitioningMode = parameters.get(KustoDebugOptions.KUSTO_READ_PARTITION_MODE)
val numPartitions = setNumPartitions(sqlContext, requestedPartitions, partitioningMode)
// Set default export split limit as 1GB, maximal allowed
val readModeOption = parameters.get(KustoSourceOptions.KUSTO_READ_MODE)
val readMode: Option[ReadMode] = if (readModeOption.isDefined) {
Some(ReadMode.withName(readModeOption.get))
} else {
None
}
val distributedReadModeTransientCacheEnabled = parameters
.getOrElse(KustoSourceOptions.KUSTO_DISTRIBUTED_READ_MODE_TRANSIENT_CACHE, "false")
.trim
.toBoolean
val queryFilterPushDown =
parameters.get(KustoSourceOptions.KUSTO_QUERY_FILTER_PUSH_DOWN).map(s => s.trim.toBoolean)
val partitionColumn = parameters.get(KustoDebugOptions.KUSTO_PARTITION_COLUMN)
val partitionOptions = PartitionOptions(numPartitions, partitionColumn, partitioningMode)
// Parse upfront and throw back an error if there is a wrongly formatted JSON
val additionalExportOptions =
parameters.get(KustoSourceOptions.KUSTO_EXPORT_OPTIONS_JSON) match {
case Some(exportOptionsJsonString) =>
Try(
objectMapper.readValue(
exportOptionsJsonString,
new TypeReference[Map[String, String]] {})) match {
case Success(exportConfigMap) => exportConfigMap
case Failure(exception) =>
val errorMessage =
s"The configuration for ${KustoSourceOptions.KUSTO_EXPORT_OPTIONS_JSON} has a value " +
s"$exportOptionsJsonString that cannot be parsed as Map"
logError(className, errorMessage)
throw new IllegalArgumentException(errorMessage)
}
case None => Map.empty[String, String]
}
val userNamePrefix = additionalExportOptions.get("namePrefix")
if (userNamePrefix.isDefined) {
logWarn(
className,
"User cannot specify namePrefix for additionalExportOptions as it can lead to unexpected behavior in reading output")
}
KustoReadOptions(
readMode,
partitionOptions,
distributedReadModeTransientCacheEnabled,
queryFilterPushDown,
additionalExportOptions)
}