override def createRelation()

in connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala [100:171]


  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val readOptions = KDSU.getReadParameters(parameters, sqlContext)
    if (authenticationParameters.isEmpty) {
      // Parse parameters if haven't got parsed before
      val sourceParameters = KDSU.parseSourceParameters(parameters, true)
      initCommonParams(sourceParameters)
    }

    val storageOption = parameters.get(KustoSourceOptions.KUSTO_TRANSIENT_STORAGE)
    val transientStorageParams: Option[TransientStorageParameters] =
      if (storageOption.isDefined) {
        Some(TransientStorageParameters.fromString(storageOption.get))
      } else {
        None
      }
    val (kustoAuthentication, mergedStorageParameters)
        : (Option[KustoAuthentication], Option[TransientStorageParameters]) = {
      if (keyVaultAuthentication.isDefined) {
        // Get params from keyVault
        authenticationParameters = Some(
          KDSU.mergeKeyVaultAndOptionsAuthentication(
            KeyVaultUtils.getAadAppParametersFromKeyVault(keyVaultAuthentication.get),
            authenticationParameters))

        (
          authenticationParameters,
          KDSU.mergeKeyVaultAndOptionsStorageParams(
            transientStorageParams,
            keyVaultAuthentication.get))
      } else if (transientStorageParams.isDefined) {
        // If any of the storage parameters defined a SAS we will take endpoint suffix from there
        transientStorageParams.get.storageCredentials.foreach(st => {
          st.validate()
          if (StringUtils.isNoneBlank(st.domainSuffix)) {
            transientStorageParams.get.endpointSuffix = st.domainSuffix
          }
        })
        // Params passed from options
        (authenticationParameters, transientStorageParams)
      } else {
        (authenticationParameters, None)
      }
    }

    val timeout = new FiniteDuration(
      parameters
        .getOrElse(
          KustoSourceOptions.KUSTO_TIMEOUT_LIMIT,
          KCONST.DefaultWaitingIntervalLongRunning)
        .toLong,
      TimeUnit.SECONDS)

    KDSU.logInfo(
      myName,
      s"Finished serializing parameters for reading: {requestId: $requestId, timeout: $timeout, readMode: ${readOptions.readMode
          .getOrElse("Default")}, clientRequestProperties: $clientRequestProperties")
    KustoRelation(
      kustoCoordinates,
      kustoAuthentication.get,
      parameters.getOrElse(KustoSourceOptions.KUSTO_QUERY, ""),
      readOptions,
      timeout,
      parameters.get(KustoSourceOptions.KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES),
      mergedStorageParameters,
      clientRequestProperties,
      requestId.get)(sqlContext.sparkSession)
  }

  override def shortName(): String = "kusto"
}