private[kusto] def setCsvMapping()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoIngestionUtils.scala [56:105]


  private[kusto] def setCsvMapping(
      sourceSchema: StructType,
      targetSchema: Array[JsonNode],
      ingestionProperties: SparkIngestionProperties,
      tableCreationMode: SinkTableCreationMode): Unit = {
    require(
      ingestionProperties.csvMappingNameReference == null || ingestionProperties.csvMappingNameReference.isEmpty,
      "Sink options SparkIngestionProperties.csvMappingNameReference and adjustSchema.GenerateDynamicCsvMapping are not compatible. Use only one.")

    val targetSchemaColumns = targetSchema
      .map(c =>
        (
          c.get(KustoConstants.Schema.NAME).asText(),
          c.get(KustoConstants.Schema.CSLTYPE).asText()))
      .toMap
    val sourceSchemaColumns = sourceSchema.fields.zipWithIndex.map(c => (c._1.name, c._2)).toMap
    /* This was created for the case where CreateTable is used along with Create CSV mapping. There are 2 options
    either to not have a mapping or create an explicit identity mapping. Since GenerateCSVMapping is requested explicitly
    creating an identity mapping made the most appropriate fit */
    val sourceSchemaColumnTypes =
      if (tableCreationMode == SinkTableCreationMode.CreateIfNotExist)
        sourceSchema.fields
          .map(field => (field.name, getSparkTypeToKustoTypeMap(field.dataType)))
          .toMap
      else Map.empty[String, String]
    val notFoundSourceColumns =
      sourceSchemaColumns.filter(c => !targetSchemaColumns.contains(c._1)).keys
    if (notFoundSourceColumns.nonEmpty && targetSchema != null && targetSchema.nonEmpty) {
      throw SchemaMatchException(
        s"Source schema has columns that are not present in the target: ${notFoundSourceColumns.mkString(", ")}.")
    }

    val columnMappingReset = sourceSchemaColumns
      .map(sourceColumn => {
        val targetDataType = targetSchemaColumns.get(sourceColumn._1)
        val columnMapping = targetDataType match {
          case Some(targetMapping) => new ColumnMapping(sourceColumn._1, targetMapping)
          // Get the datatype by column or fallback to string
          case None =>
            new ColumnMapping(
              sourceColumn._1,
              sourceSchemaColumnTypes.getOrElse(sourceColumn._1, "string"))
        }
        columnMapping.setOrdinal(sourceColumn._2)
        columnMapping
      })
    val mapping = csvMappingToString(columnMappingReset.toArray)
    KustoDataSourceUtils.logDebug(this.getClass.getSimpleName, s"Using CSV mapping : $mapping")
    ingestionProperties.csvMapping = mapping
  }