in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoIngestionUtils.scala [56:105]
private[kusto] def setCsvMapping(
sourceSchema: StructType,
targetSchema: Array[JsonNode],
ingestionProperties: SparkIngestionProperties,
tableCreationMode: SinkTableCreationMode): Unit = {
require(
ingestionProperties.csvMappingNameReference == null || ingestionProperties.csvMappingNameReference.isEmpty,
"Sink options SparkIngestionProperties.csvMappingNameReference and adjustSchema.GenerateDynamicCsvMapping are not compatible. Use only one.")
val targetSchemaColumns = targetSchema
.map(c =>
(
c.get(KustoConstants.Schema.NAME).asText(),
c.get(KustoConstants.Schema.CSLTYPE).asText()))
.toMap
val sourceSchemaColumns = sourceSchema.fields.zipWithIndex.map(c => (c._1.name, c._2)).toMap
/* This was created for the case where CreateTable is used along with Create CSV mapping. There are 2 options
either to not have a mapping or create an explicit identity mapping. Since GenerateCSVMapping is requested explicitly
creating an identity mapping made the most appropriate fit */
val sourceSchemaColumnTypes =
if (tableCreationMode == SinkTableCreationMode.CreateIfNotExist)
sourceSchema.fields
.map(field => (field.name, getSparkTypeToKustoTypeMap(field.dataType)))
.toMap
else Map.empty[String, String]
val notFoundSourceColumns =
sourceSchemaColumns.filter(c => !targetSchemaColumns.contains(c._1)).keys
if (notFoundSourceColumns.nonEmpty && targetSchema != null && targetSchema.nonEmpty) {
throw SchemaMatchException(
s"Source schema has columns that are not present in the target: ${notFoundSourceColumns.mkString(", ")}.")
}
val columnMappingReset = sourceSchemaColumns
.map(sourceColumn => {
val targetDataType = targetSchemaColumns.get(sourceColumn._1)
val columnMapping = targetDataType match {
case Some(targetMapping) => new ColumnMapping(sourceColumn._1, targetMapping)
// Get the datatype by column or fallback to string
case None =>
new ColumnMapping(
sourceColumn._1,
sourceSchemaColumnTypes.getOrElse(sourceColumn._1, "string"))
}
columnMapping.setOrdinal(sourceColumn._2)
columnMapping
})
val mapping = csvMappingToString(columnMappingReset.toArray)
KustoDataSourceUtils.logDebug(this.getClass.getSimpleName, s"Using CSV mapping : $mapping")
ingestionProperties.csvMapping = mapping
}