in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [381:527]
def parseSinkParameters(
parameters: Map[String, String],
mode: SaveMode = SaveMode.Append): SinkParameters = {
if (mode != SaveMode.Append) {
throw new InvalidParameterException(
s"Kusto data source supports only 'Append' mode, '$mode' directive is invalid. Please use df.write.mode(SaveMode.Append)..")
}
// TODO get defaults from KustoWriter()
// Parse WriteOptions
var tableCreation: SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist
var tableCreationParam: Option[String] = None
var isAsync: Boolean = false
var writeMode: WriteMode = WriteMode.Queued
var writeModeParam: Option[String] = None
var batchLimit: Int = 0
var minimalExtentsCountForSplitMergePerNode: Int = 0
var maxRetriesOnMoveExtents: Int = 0
try {
tableCreationParam = parameters.get(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS)
tableCreation =
if (tableCreationParam.isEmpty) SinkTableCreationMode.FailIfNotExist
else SinkTableCreationMode.withName(tableCreationParam.get)
} catch {
case _: NoSuchElementException =>
throw new InvalidParameterException(
s"No such SinkTableCreationMode option: '${tableCreationParam.get}'")
}
try {
writeModeParam = parameters.get(KustoSinkOptions.KUSTO_WRITE_MODE)
writeMode =
if (writeModeParam.isEmpty) WriteMode.Transactional
else WriteMode.withName(writeModeParam.get)
} catch {
case _: NoSuchElementException =>
throw new InvalidParameterException(s"No such WriteMode option: '${writeModeParam.get}'")
}
val streamIngestMaxSize =
parameters.get(KustoSinkOptions.KUSTO_STREAMING_INGEST_SIZE_IN_MB) match {
case Some(value) => value.toInt * OneMegaByte
case None => DefaultMaxStreamingBytesUncompressed
}
val userTempTableName = parameters.get(KustoSinkOptions.KUSTO_TEMP_TABLE_NAME)
if (userTempTableName.isDefined && (tableCreation == SinkTableCreationMode.CreateIfNotExist || writeMode != WriteMode.Transactional)) {
throw new InvalidParameterException(
"tempTableName can only be used with FailIfNotExists table create mode and Transactional write mode.")
}
isAsync =
parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false").trim.toBoolean
val pollingOnDriver =
parameters.getOrElse(KustoSinkOptions.KUSTO_POLLING_ON_DRIVER, "false").trim.toBoolean
batchLimit = parameters
.getOrElse(KustoSinkOptions.KUSTO_CLIENT_BATCHING_LIMIT, DefaultBatchingLimit.toString)
.trim
.toInt
minimalExtentsCountForSplitMergePerNode = parameters
.getOrElse(
KustoDebugOptions.KUSTO_MAXIMAL_EXTENTS_COUNT_FOR_SPLIT_MERGE_PER_NODE,
DefaultExtentsCountForSplitMergePerNode.toString)
.trim
.toInt
maxRetriesOnMoveExtents = parameters
.getOrElse(
KustoDebugOptions.KUSTO_MAX_RETRIES_ON_MOVE_EXTENTS,
DefaultMaxRetriesOnMoveExtents.toString)
.trim
.toInt
val maybeSchemaAdjustmentParam = parameters.get(KustoSinkOptions.KUSTO_ADJUST_SCHEMA)
val adjustSchema = maybeSchemaAdjustmentParam match {
case Some(param) =>
SchemaAdjustmentMode.withName(param)
case None => SchemaAdjustmentMode.NoAdjustment
}
if (adjustSchema == SchemaAdjustmentMode.GenerateDynamicCsvMapping && writeMode == WriteMode.KustoStreaming) {
throw new InvalidParameterException(
"GenerateDynamicCsvMapping cannot be used with Spark streaming ingestion")
}
val timeout = new FiniteDuration(
parameters
.getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST.DefaultWaitingIntervalLongRunning)
.toInt,
TimeUnit.SECONDS)
val autoCleanupTime = new FiniteDuration(
parameters
.getOrElse(
KustoSinkOptions.KUSTO_STAGING_RESOURCE_AUTO_CLEANUP_TIMEOUT,
KCONST.DefaultCleaningInterval)
.toInt,
TimeUnit.SECONDS)
val disableFlushImmediately =
parameters.getOrElse(KustoDebugOptions.KUSTO_DISABLE_FLUSH_IMMEDIATELY, "false").toBoolean
val ensureNoDupBlobs =
parameters.getOrElse(KustoDebugOptions.KUSTO_ENSURE_NO_DUPLICATED_BLOBS, "false").toBoolean
val ingestionPropertiesAsJson =
parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON)
val sourceParameters = parseSourceParameters(parameters, allowProxy = false)
val maybeSparkIngestionProperties =
getIngestionProperties(writeMode == WriteMode.KustoStreaming, ingestionPropertiesAsJson)
val writeOptions = WriteOptions(
pollingOnDriver,
tableCreation,
isAsync,
parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_RESULT_LIMIT, "1"),
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, "UTC"),
timeout,
maybeSparkIngestionProperties = maybeSparkIngestionProperties,
batchLimit,
sourceParameters.requestId,
autoCleanupTime,
maxRetriesOnMoveExtents,
minimalExtentsCountForSplitMergePerNode,
adjustSchema,
writeMode,
userTempTableName,
disableFlushImmediately,
ensureNoDupBlobs,
streamIngestMaxSize)
if (sourceParameters.kustoCoordinates.table.isEmpty) {
throw new InvalidParameterException(
"KUSTO_TABLE parameter is missing. Must provide a destination table name")
}
logInfo(
"parseSinkParameters",
s"Parsed write options for sink: {'table': '${sourceParameters.kustoCoordinates.table}', 'timeout': '${writeOptions.timeout}, 'async': ${writeOptions.isAsync}, 'writeMode': ${writeOptions.writeMode}, " +
s"'tableCreationMode': ${writeOptions.tableCreateOptions}, 'writeLimit': ${writeOptions.writeResultLimit}, 'batchLimit': ${writeOptions.batchLimit}" +
s", 'timeout': ${writeOptions.timeout}, 'timezone': ${writeOptions.timeZone}, " +
s"'maybeSparkIngestionProperties': $ingestionPropertiesAsJson, 'requestId': '${sourceParameters.requestId}', 'pollingOnDriver': ${writeOptions.pollingOnDriver}," +
s"'maxRetriesOnMoveExtents':$maxRetriesOnMoveExtents, 'minimalExtentsCountForSplitMergePerNode':$minimalExtentsCountForSplitMergePerNode, " +
s"'adjustSchema': $adjustSchema, 'autoCleanupTime': $autoCleanupTime${if (writeOptions.userTempTableName.isDefined)
s", userTempTableName: ${userTempTableName.get}"
else ""}, disableFlushImmediately: $disableFlushImmediately${if (ensureNoDupBlobs) "ensureNoDupBlobs: true"
else ""}")
SinkParameters(writeOptions, sourceParameters)
}