def initializeTablesBySchema()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala [57:141]


  def initializeTablesBySchema(
      tableCoordinates: KustoCoordinates,
      tmpTableName: String,
      sourceSchema: StructType,
      targetSchema: Iterable[JsonNode],
      writeOptions: WriteOptions,
      crp: ClientRequestProperties,
      configureRetentionPolicy: Boolean): Unit = {

    var tmpTableSchema: String = ""
    val database = tableCoordinates.database
    val table = tableCoordinates.table.get

    if (targetSchema.isEmpty) {
      // Table does not exist
      if (writeOptions.tableCreateOptions == SinkTableCreationMode.FailIfNotExist) {
        throw new RuntimeException(
          s"Table '$table' doesn't exist in database '$database', cluster '${tableCoordinates.clusterAlias} and tableCreateOptions is set to FailIfNotExist.")
      } else {
        // Parse dataframe schema and create a destination table with that schema
        val tableSchemaBuilder = new StringJoiner(",")
        sourceSchema.fields.foreach { field =>
          val fieldType = DataTypeMapping.getSparkTypeToKustoTypeMap(field.dataType)
          tableSchemaBuilder.add(s"['${field.name}']:$fieldType")
        }
        tmpTableSchema = tableSchemaBuilder.toString
        executeEngine(database, generateTableCreateCommand(table, tmpTableSchema), "tableCreate", crp)
        if (writeOptions.writeMode == WriteMode.KustoStreaming) {
          executeEngine(database, generateTableAlterStreamIngestionCommand(table),"enableStreamingPolicy", crp)
          executeEngine(database, generateClearStreamingIngestionCacheCommand(table), "clearStreamingPolicyCache", crp)
        }
      }
    } else {
      // Table exists. Parse kusto table schema and check if it matches the dataframes schema
      val transformedTargetSchema = new ObjectMapper().createArrayNode()
      targetSchema.foreach {
        case (value) => {
          transformedTargetSchema.add(value)
        }
      }
      tmpTableSchema = extractSchemaFromResultTable(transformedTargetSchema)
    }

    if (writeOptions.writeMode == WriteMode.Transactional) {
      // Create a temporary table with the kusto or dataframe parsed schema with retention and delete set to after the
      // write operation times out. Engine recommended keeping the retention although we use auto delete.
      executeEngine(database, generateTempTableCreateCommand(tmpTableName, tmpTableSchema), "tableCreate", crp)
      val targetTableBatchingPolicyRes = executeEngine(
        database,
        generateTableShowIngestionBatchingPolicyCommand(table),
        "showBatchingPolicy",
        crp).getPrimaryResults
      targetTableBatchingPolicyRes.next()
      val targetTableBatchingPolicy =
        targetTableBatchingPolicyRes.getString(2).replace("\r\n", "").replace("\"", "\"\"")
      if (targetTableBatchingPolicy != null && targetTableBatchingPolicy != "null") {
        executeEngine(
          database,
          generateTableAlterIngestionBatchingPolicyCommand(
            tmpTableName,
            targetTableBatchingPolicy),
          "alterBatchingPolicy",
          crp)
        executeDM(generateRefreshBatchingPolicyCommand(database, tmpTableName), Some(crp), "refreshBatchingPolicy")
      }
      if (configureRetentionPolicy) {
        executeEngine(
          database,
          generateTableAlterRetentionPolicy(
            tmpTableName,
            DurationFormatUtils.formatDuration(
              writeOptions.autoCleanupTime.toMillis,
              durationFormat,
              true),
            recoverable = false),
          "alterRetentionPolicy",
          crp)
        val instant = Instant.now.plusSeconds(writeOptions.autoCleanupTime.toSeconds)
        executeEngine(database, generateTableAlterAutoDeletePolicy(tmpTableName, instant), "alterAutoDelete", crp)
      }
      KDSU.logInfo(
        myName,
        s"Successfully created temporary table $tmpTableName, will be deleted after completing the operation")
    }
  }