in connector/src/main/scala/com/microsoft/kusto/spark/datasource/DefaultSource.scala [49:86]
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val sinkParameters = KDSU.parseSinkParameters(parameters, mode)
initCommonParams(sinkParameters.sourceParametersResults)
if (keyVaultAuthentication.isDefined) {
val paramsFromKeyVault =
KeyVaultUtils.getAadAppParametersFromKeyVault(keyVaultAuthentication.get)
authenticationParameters = Some(
KDSU.mergeKeyVaultAndOptionsAuthentication(paramsFromKeyVault, authenticationParameters))
}
KustoWriter.write(
None,
data,
kustoCoordinates,
authenticationParameters.get,
sinkParameters.writeOptions,
clientRequestProperties.get)
val limit =
if (sinkParameters.writeOptions.writeResultLimit.equalsIgnoreCase(
KustoSinkOptions.NONE_RESULT_LIMIT)) None
else {
try {
Some(sinkParameters.writeOptions.writeResultLimit.toInt)
} catch {
case _: Exception =>
throw new InvalidParameterException(
s"KustoOptions.KUSTO_WRITE_RESULT_LIMIT is set to '${sinkParameters.writeOptions.writeResultLimit}'. Must be either 'none' or an integer value")
}
}
createRelation(sqlContext, adjustParametersForBaseRelation(parameters, limit))
}