in connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala [100:171]
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val readOptions = KDSU.getReadParameters(parameters, sqlContext)
if (authenticationParameters.isEmpty) {
// Parse parameters if haven't got parsed before
val sourceParameters = KDSU.parseSourceParameters(parameters, true)
initCommonParams(sourceParameters)
}
val storageOption = parameters.get(KustoSourceOptions.KUSTO_TRANSIENT_STORAGE)
val transientStorageParams: Option[TransientStorageParameters] =
if (storageOption.isDefined) {
Some(TransientStorageParameters.fromString(storageOption.get))
} else {
None
}
val (kustoAuthentication, mergedStorageParameters)
: (Option[KustoAuthentication], Option[TransientStorageParameters]) = {
if (keyVaultAuthentication.isDefined) {
// Get params from keyVault
authenticationParameters = Some(
KDSU.mergeKeyVaultAndOptionsAuthentication(
KeyVaultUtils.getAadAppParametersFromKeyVault(keyVaultAuthentication.get),
authenticationParameters))
(
authenticationParameters,
KDSU.mergeKeyVaultAndOptionsStorageParams(
transientStorageParams,
keyVaultAuthentication.get))
} else if (transientStorageParams.isDefined) {
// If any of the storage parameters defined a SAS we will take endpoint suffix from there
transientStorageParams.get.storageCredentials.foreach(st => {
st.validate()
if (StringUtils.isNoneBlank(st.domainSuffix)) {
transientStorageParams.get.endpointSuffix = st.domainSuffix
}
})
// Params passed from options
(authenticationParameters, transientStorageParams)
} else {
(authenticationParameters, None)
}
}
val timeout = new FiniteDuration(
parameters
.getOrElse(
KustoSourceOptions.KUSTO_TIMEOUT_LIMIT,
KCONST.DefaultWaitingIntervalLongRunning)
.toLong,
TimeUnit.SECONDS)
KDSU.logInfo(
myName,
s"Finished serializing parameters for reading: {requestId: $requestId, timeout: $timeout, readMode: ${readOptions.readMode
.getOrElse("Default")}, clientRequestProperties: $clientRequestProperties")
KustoRelation(
kustoCoordinates,
kustoAuthentication.get,
parameters.getOrElse(KustoSourceOptions.KUSTO_QUERY, ""),
readOptions,
timeout,
parameters.get(KustoSourceOptions.KUSTO_CUSTOM_DATAFRAME_COLUMN_TYPES),
mergedStorageParameters,
clientRequestProperties,
requestId.get)(sqlContext.sparkSession)
}
override def shortName(): String = "kusto"
}