in connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala [57:141]
def initializeTablesBySchema(
tableCoordinates: KustoCoordinates,
tmpTableName: String,
sourceSchema: StructType,
targetSchema: Iterable[JsonNode],
writeOptions: WriteOptions,
crp: ClientRequestProperties,
configureRetentionPolicy: Boolean): Unit = {
var tmpTableSchema: String = ""
val database = tableCoordinates.database
val table = tableCoordinates.table.get
if (targetSchema.isEmpty) {
// Table does not exist
if (writeOptions.tableCreateOptions == SinkTableCreationMode.FailIfNotExist) {
throw new RuntimeException(
s"Table '$table' doesn't exist in database '$database', cluster '${tableCoordinates.clusterAlias} and tableCreateOptions is set to FailIfNotExist.")
} else {
// Parse dataframe schema and create a destination table with that schema
val tableSchemaBuilder = new StringJoiner(",")
sourceSchema.fields.foreach { field =>
val fieldType = DataTypeMapping.getSparkTypeToKustoTypeMap(field.dataType)
tableSchemaBuilder.add(s"['${field.name}']:$fieldType")
}
tmpTableSchema = tableSchemaBuilder.toString
executeEngine(database, generateTableCreateCommand(table, tmpTableSchema), "tableCreate", crp)
if (writeOptions.writeMode == WriteMode.KustoStreaming) {
executeEngine(database, generateTableAlterStreamIngestionCommand(table),"enableStreamingPolicy", crp)
executeEngine(database, generateClearStreamingIngestionCacheCommand(table), "clearStreamingPolicyCache", crp)
}
}
} else {
// Table exists. Parse kusto table schema and check if it matches the dataframes schema
val transformedTargetSchema = new ObjectMapper().createArrayNode()
targetSchema.foreach {
case (value) => {
transformedTargetSchema.add(value)
}
}
tmpTableSchema = extractSchemaFromResultTable(transformedTargetSchema)
}
if (writeOptions.writeMode == WriteMode.Transactional) {
// Create a temporary table with the kusto or dataframe parsed schema with retention and delete set to after the
// write operation times out. Engine recommended keeping the retention although we use auto delete.
executeEngine(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema), "tableCreate", crp)
val targetTableBatchingPolicyRes = executeEngine(
database,
generateTableShowIngestionBatchingPolicyCommand(table),
"showBatchingPolicy",
crp).getPrimaryResults
targetTableBatchingPolicyRes.next()
val targetTableBatchingPolicy =
targetTableBatchingPolicyRes.getString(2).replace("\r\n", "").replace("\"", "\"\"")
if (targetTableBatchingPolicy != null && targetTableBatchingPolicy != "null") {
executeEngine(
database,
generateTableAlterIngestionBatchingPolicyCommand(
tmpTableName,
targetTableBatchingPolicy),
"alterBatchingPolicy",
crp)
executeDM(generateRefreshBatchingPolicyCommand(database, tmpTableName), Some(crp), "refreshBatchingPolicy")
}
if (configureRetentionPolicy) {
executeEngine(
database,
generateTableAlterRetentionPolicy(
tmpTableName,
DurationFormatUtils.formatDuration(
writeOptions.autoCleanupTime.toMillis,
durationFormat,
true),
recoverable = false),
"alterRetentionPolicy",
crp)
val instant = Instant.now.plusSeconds(writeOptions.autoCleanupTime.toSeconds)
executeEngine(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant), "alterAutoDelete", crp)
}
KDSU.logInfo(
myName,
s"Successfully created temporary table $tmpTableName, will be deleted after completing the operation")
}
}