def getReadParameters()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [75:127]


  def getReadParameters(
      parameters: Map[String, String],
      sqlContext: SQLContext): KustoReadOptions = {
    val requestedPartitions = parameters.get(KustoDebugOptions.KUSTO_NUM_PARTITIONS)
    val partitioningMode = parameters.get(KustoDebugOptions.KUSTO_READ_PARTITION_MODE)
    val numPartitions = setNumPartitions(sqlContext, requestedPartitions, partitioningMode)
    // Set default export split limit as 1GB, maximal allowed
    val readModeOption = parameters.get(KustoSourceOptions.KUSTO_READ_MODE)
    val readMode: Option[ReadMode] = if (readModeOption.isDefined) {
      Some(ReadMode.withName(readModeOption.get))
    } else {
      None
    }
    val distributedReadModeTransientCacheEnabled = parameters
      .getOrElse(KustoSourceOptions.KUSTO_DISTRIBUTED_READ_MODE_TRANSIENT_CACHE, "false")
      .trim
      .toBoolean
    val queryFilterPushDown =
      parameters.get(KustoSourceOptions.KUSTO_QUERY_FILTER_PUSH_DOWN).map(s => s.trim.toBoolean)
    val partitionColumn = parameters.get(KustoDebugOptions.KUSTO_PARTITION_COLUMN)
    val partitionOptions = PartitionOptions(numPartitions, partitionColumn, partitioningMode)
    // Parse upfront and throw back an error if there is a wrongly formatted JSON
    val additionalExportOptions =
      parameters.get(KustoSourceOptions.KUSTO_EXPORT_OPTIONS_JSON) match {
        case Some(exportOptionsJsonString) =>
          Try(
            objectMapper.readValue(
              exportOptionsJsonString,
              new TypeReference[Map[String, String]] {})) match {
            case Success(exportConfigMap) => exportConfigMap
            case Failure(exception) =>
              val errorMessage =
                s"The configuration for ${KustoSourceOptions.KUSTO_EXPORT_OPTIONS_JSON} has a value " +
                  s"$exportOptionsJsonString that cannot be parsed as Map"
              logError(className, errorMessage)
              throw new IllegalArgumentException(errorMessage)
          }
        case None => Map.empty[String, String]
      }
    val userNamePrefix = additionalExportOptions.get("namePrefix")
    if (userNamePrefix.isDefined) {
      logWarn(
        className,
        "User cannot specify namePrefix for additionalExportOptions as it can lead to unexpected behavior in reading output")
    }

    KustoReadOptions(
      readMode,
      partitionOptions,
      distributedReadModeTransientCacheEnabled,
      queryFilterPushDown,
      additionalExportOptions)
  }