def parseSinkParameters()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [381:527]


  def parseSinkParameters(
      parameters: Map[String, String],
      mode: SaveMode = SaveMode.Append): SinkParameters = {
    if (mode != SaveMode.Append) {
      throw new InvalidParameterException(
        s"Kusto data source supports only 'Append' mode, '$mode' directive is invalid. Please use df.write.mode(SaveMode.Append)..")
    }

    // TODO get defaults from KustoWriter()
    // Parse WriteOptions
    var tableCreation: SinkTableCreationMode = SinkTableCreationMode.FailIfNotExist
    var tableCreationParam: Option[String] = None
    var isAsync: Boolean = false
    var writeMode: WriteMode = WriteMode.Queued
    var writeModeParam: Option[String] = None
    var batchLimit: Int = 0
    var minimalExtentsCountForSplitMergePerNode: Int = 0
    var maxRetriesOnMoveExtents: Int = 0
    try {
      tableCreationParam = parameters.get(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS)
      tableCreation =
        if (tableCreationParam.isEmpty) SinkTableCreationMode.FailIfNotExist
        else SinkTableCreationMode.withName(tableCreationParam.get)
    } catch {
      case _: NoSuchElementException =>
        throw new InvalidParameterException(
          s"No such SinkTableCreationMode option: '${tableCreationParam.get}'")
    }
    try {
      writeModeParam = parameters.get(KustoSinkOptions.KUSTO_WRITE_MODE)
      writeMode =
        if (writeModeParam.isEmpty) WriteMode.Transactional
        else WriteMode.withName(writeModeParam.get)
    } catch {
      case _: NoSuchElementException =>
        throw new InvalidParameterException(s"No such WriteMode option: '${writeModeParam.get}'")
    }

    val streamIngestMaxSize =
      parameters.get(KustoSinkOptions.KUSTO_STREAMING_INGEST_SIZE_IN_MB) match {
        case Some(value) => value.toInt * OneMegaByte
        case None => DefaultMaxStreamingBytesUncompressed
      }
    val userTempTableName = parameters.get(KustoSinkOptions.KUSTO_TEMP_TABLE_NAME)
    if (userTempTableName.isDefined && (tableCreation == SinkTableCreationMode.CreateIfNotExist || writeMode != WriteMode.Transactional)) {
      throw new InvalidParameterException(
        "tempTableName can only be used with FailIfNotExists table create mode and Transactional write mode.")
    }
    isAsync =
      parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "false").trim.toBoolean
    val pollingOnDriver =
      parameters.getOrElse(KustoSinkOptions.KUSTO_POLLING_ON_DRIVER, "false").trim.toBoolean

    batchLimit = parameters
      .getOrElse(KustoSinkOptions.KUSTO_CLIENT_BATCHING_LIMIT, DefaultBatchingLimit.toString)
      .trim
      .toInt
    minimalExtentsCountForSplitMergePerNode = parameters
      .getOrElse(
        KustoDebugOptions.KUSTO_MAXIMAL_EXTENTS_COUNT_FOR_SPLIT_MERGE_PER_NODE,
        DefaultExtentsCountForSplitMergePerNode.toString)
      .trim
      .toInt
    maxRetriesOnMoveExtents = parameters
      .getOrElse(
        KustoDebugOptions.KUSTO_MAX_RETRIES_ON_MOVE_EXTENTS,
        DefaultMaxRetriesOnMoveExtents.toString)
      .trim
      .toInt

    val maybeSchemaAdjustmentParam = parameters.get(KustoSinkOptions.KUSTO_ADJUST_SCHEMA)
    val adjustSchema = maybeSchemaAdjustmentParam match {
      case Some(param) =>
        SchemaAdjustmentMode.withName(param)
      case None => SchemaAdjustmentMode.NoAdjustment
    }

    if (adjustSchema == SchemaAdjustmentMode.GenerateDynamicCsvMapping && writeMode == WriteMode.KustoStreaming) {
      throw new InvalidParameterException(
        "GenerateDynamicCsvMapping cannot be used with Spark streaming ingestion")
    }

    val timeout = new FiniteDuration(
      parameters
        .getOrElse(KustoSinkOptions.KUSTO_TIMEOUT_LIMIT, KCONST.DefaultWaitingIntervalLongRunning)
        .toInt,
      TimeUnit.SECONDS)
    val autoCleanupTime = new FiniteDuration(
      parameters
        .getOrElse(
          KustoSinkOptions.KUSTO_STAGING_RESOURCE_AUTO_CLEANUP_TIMEOUT,
          KCONST.DefaultCleaningInterval)
        .toInt,
      TimeUnit.SECONDS)

    val disableFlushImmediately =
      parameters.getOrElse(KustoDebugOptions.KUSTO_DISABLE_FLUSH_IMMEDIATELY, "false").toBoolean
    val ensureNoDupBlobs =
      parameters.getOrElse(KustoDebugOptions.KUSTO_ENSURE_NO_DUPLICATED_BLOBS, "false").toBoolean

    val ingestionPropertiesAsJson =
      parameters.get(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON)

    val sourceParameters = parseSourceParameters(parameters, allowProxy = false)

    val maybeSparkIngestionProperties =
      getIngestionProperties(writeMode == WriteMode.KustoStreaming, ingestionPropertiesAsJson)

    val writeOptions = WriteOptions(
      pollingOnDriver,
      tableCreation,
      isAsync,
      parameters.getOrElse(KustoSinkOptions.KUSTO_WRITE_RESULT_LIMIT, "1"),
      parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, "UTC"),
      timeout,
      maybeSparkIngestionProperties = maybeSparkIngestionProperties,
      batchLimit,
      sourceParameters.requestId,
      autoCleanupTime,
      maxRetriesOnMoveExtents,
      minimalExtentsCountForSplitMergePerNode,
      adjustSchema,
      writeMode,
      userTempTableName,
      disableFlushImmediately,
      ensureNoDupBlobs,
      streamIngestMaxSize)

    if (sourceParameters.kustoCoordinates.table.isEmpty) {
      throw new InvalidParameterException(
        "KUSTO_TABLE parameter is missing. Must provide a destination table name")
    }

    logInfo(
      "parseSinkParameters",
      s"Parsed write options for sink: {'table': '${sourceParameters.kustoCoordinates.table}', 'timeout': '${writeOptions.timeout}, 'async': ${writeOptions.isAsync}, 'writeMode': ${writeOptions.writeMode}, " +
        s"'tableCreationMode': ${writeOptions.tableCreateOptions}, 'writeLimit': ${writeOptions.writeResultLimit}, 'batchLimit': ${writeOptions.batchLimit}" +
        s", 'timeout': ${writeOptions.timeout}, 'timezone': ${writeOptions.timeZone}, " +
        s"'maybeSparkIngestionProperties': $ingestionPropertiesAsJson, 'requestId': '${sourceParameters.requestId}', 'pollingOnDriver': ${writeOptions.pollingOnDriver}," +
        s"'maxRetriesOnMoveExtents':$maxRetriesOnMoveExtents, 'minimalExtentsCountForSplitMergePerNode':$minimalExtentsCountForSplitMergePerNode, " +
        s"'adjustSchema': $adjustSchema, 'autoCleanupTime': $autoCleanupTime${if (writeOptions.userTempTableName.isDefined)
            s", userTempTableName: ${userTempTableName.get}"
          else ""}, disableFlushImmediately: $disableFlushImmediately${if (ensureNoDupBlobs) "ensureNoDupBlobs: true"
          else ""}")

    SinkParameters(writeOptions, sourceParameters)
  }