in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [326:368]
def parseSourceParameters(
parameters: Map[String, String],
allowProxy: Boolean): SourceParameters = {
// Parse KustoTableCoordinates - these are mandatory options
val database = parameters.get(KustoSourceOptions.KUSTO_DATABASE)
val cluster = parameters.get(KustoSourceOptions.KUSTO_CLUSTER)
if (database.isEmpty) {
throw new InvalidParameterException(
"KUSTO_DATABASE parameter is missing. Must provide a destination database name")
}
if (cluster.isEmpty) {
throw new InvalidParameterException(
"KUSTO_CLUSTER parameter is missing. Must provide a destination cluster name")
}
var alias = cluster
var clusterUrl = cluster
try {
alias = Some(getClusterNameFromUrlIfNeeded(cluster.get.toLowerCase()))
clusterUrl = Some(getEngineUrlFromAliasIfNeeded(cluster.get.toLowerCase()))
} catch {
case e: Exception =>
if (!allowProxy) {
throw e
}
}
val table = parameters.get(KustoSinkOptions.KUSTO_TABLE)
val requestId: String =
parameters.getOrElse(KustoSinkOptions.KUSTO_REQUEST_ID, UUID.randomUUID().toString)
val clientRequestProperties = getClientRequestProperties(parameters, requestId)
val (authentication, keyVaultAuthentication) = parseAuthentication(parameters, clusterUrl.get)
val ingestionUri = parameters.get(KustoSinkOptions.KUSTO_INGESTION_URI)
SourceParameters(
authentication,
KustoCoordinates(clusterUrl.get, alias.get, database.get, table, ingestionUri),
keyVaultAuthentication,
requestId,
clientRequestProperties)
}