in connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala [577:607]
private[kusto] def setMappingOnStagingTableIfNeeded(
stagingTableSparkIngestionProperties: SparkIngestionProperties,
database: String,
tempTable: String,
originalTable: String,
crp: ClientRequestProperties): Unit = {
val stagingTableIngestionProperties =
stagingTableSparkIngestionProperties.toIngestionProperties(database, tempTable)
val mapping = stagingTableIngestionProperties.getIngestionMapping
val mappingReferenceName = mapping.getIngestionMappingReference
if (StringUtils.isNotBlank(mappingReferenceName)) {
val mappingKind = mapping.getIngestionMappingKind.toString
val cmd = generateShowTableMappingsCommand(originalTable, mappingKind)
val mappings =
executeEngine(stagingTableIngestionProperties.getDatabaseName, cmd, "tableMappingsShow",crp).getPrimaryResults
var found = false
while (mappings.next && !found) {
if (mappings.getString(0).equals(mappingReferenceName)) {
val policyJson = mappings.getString(2).replace("\"", "'")
val cmd = generateCreateTableMappingCommand(
stagingTableIngestionProperties.getTableName,
mappingKind,
mappingReferenceName,
policyJson)
executeEngine(stagingTableIngestionProperties.getDatabaseName, cmd, "tableMappingCreate", crp)
found = true
}
}
}
}