private[kusto] def write()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [76:256]


  private[kusto] def write(
      batchId: Option[Long],
      data: DataFrame,
      tableCoordinates: KustoCoordinates,
      authentication: KustoAuthentication,
      writeOptions: WriteOptions,
      crp: ClientRequestProperties): Unit = {
    val batchIdIfExists = batchId.map(b => s"${b.toString}").getOrElse("")
    val kustoClient = KustoClientCache.getClient(
      tableCoordinates.clusterUrl,
      authentication,
      tableCoordinates.ingestionUrl,
      tableCoordinates.clusterAlias)

    val table = tableCoordinates.table.get
    // TODO put data.sparkSession.sparkContext.appName in client app name
    val tmpTableName: String = KDSU.generateTempTableName(
      data.sparkSession.sparkContext.appName,
      table,
      writeOptions.requestId,
      batchIdIfExists,
      writeOptions.userTempTableName)

    val stagingTableIngestionProperties = getSparkIngestionProperties(writeOptions)
    val schemaShowCommandResult = kustoClient
      .executeEngine(
        tableCoordinates.database,
        generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get),
        "schemaShow",
        crp)
      .getPrimaryResults

    val targetSchema =
      schemaShowCommandResult.getData.asScala.map(c => c.get(0).asInstanceOf[JsonNode]).toArray
    KustoIngestionUtils.adjustSchema(
      writeOptions.adjustSchema,
      data.schema,
      targetSchema,
      stagingTableIngestionProperties,
      writeOptions.tableCreateOptions)

    val rebuiltOptions =
      writeOptions.copy(maybeSparkIngestionProperties = Some(stagingTableIngestionProperties))

    val tableExists = schemaShowCommandResult.count() > 0
    val shouldIngest = kustoClient.shouldIngestData(
      tableCoordinates,
      writeOptions.maybeSparkIngestionProperties,
      tableExists,
      crp)

    if (!shouldIngest) {
      KDSU.logInfo(className, s"$IngestSkippedTrace '$table'")
    } else {
      if (writeOptions.userTempTableName.isDefined) {
        if (kustoClient
            .executeEngine(
              tableCoordinates.database,
              generateTableGetSchemaAsRowsCommand(writeOptions.userTempTableName.get),
              "schemaShow",
              crp)
            .getPrimaryResults
            .count() <= 0 ||
          !tableExists) {
          throw new InvalidParameterException(
            "Temp table name provided but the table does not exist. Either drop this " +
              "option or create the table beforehand.")
        }
      } else {
        // KustoWriter will create a temporary table ingesting the data to it.
        // Only if all executors succeeded the table will be appended to the original destination table.
        kustoClient.initializeTablesBySchema(
          tableCoordinates,
          tmpTableName,
          data.schema,
          targetSchema,
          writeOptions,
          crp,
          stagingTableIngestionProperties.creationTime == null)
      }

      kustoClient.setMappingOnStagingTableIfNeeded(
        stagingTableIngestionProperties,
        tableCoordinates.database,
        tmpTableName,
        table,
        crp)
      if (stagingTableIngestionProperties.flushImmediately) {
        KDSU.logWarn(className, "It's not recommended to set flushImmediately to true")
      }

      if (stagingTableIngestionProperties.flushImmediately) {
        KDSU.logWarn(
          className,
          "It's not recommended to set flushImmediately to true on production")
      }
      val cloudInfo = CloudInfo.retrieveCloudInfoForCluster(kustoClient.ingestKcsb.getClusterUrl)
      val rdd = data.queryExecution.toRdd
      val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]
      val parameters = KustoWriteResource(
        authentication = authentication,
        coordinates = tableCoordinates,
        schema = data.schema,
        writeOptions = rebuiltOptions,
        tmpTableName = tmpTableName,
        cloudInfo = cloudInfo)
      val sinkStartTime = getCreationTime(stagingTableIngestionProperties)
      if (writeOptions.isAsync) {
        val asyncWork = rdd.foreachPartitionAsync { rows =>
          ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
        }
        KDSU.logInfo(className, s"asynchronous write to Kusto table '$table' in progress")
        // This part runs back on the driver

        if (writeOptions.writeMode == WriteMode.Transactional) {
          asyncWork.onSuccess { case _ =>
            finalizeIngestionWhenWorkersSucceeded(
              tableCoordinates,
              batchIdIfExists,
              tmpTableName,
              partitionsResults,
              writeOptions,
              crp,
              tableExists,
              rdd.sparkContext,
              authentication,
              kustoClient,
              sinkStartTime)
          }
          asyncWork.onFailure { case exception: Exception =>
            if (writeOptions.userTempTableName.isEmpty) {
              kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp)
            }
            KDSU.reportExceptionAndThrow(
              className,
              exception,
              "writing data",
              tableCoordinates.clusterUrl,
              tableCoordinates.database,
              table,
              shouldNotThrow = true)
            KDSU.logError(
              className,
              "The exception is not visible in the driver since we're in async mode")
          }
        }
      } else {
        try
          rdd.foreachPartition { rows =>
            ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
          }
        catch {
          case exception: Exception =>
            if (writeOptions.writeMode == WriteMode.Transactional) {
              if (writeOptions.userTempTableName.isEmpty) {
                kustoClient.cleanupIngestionByProducts(
                  tableCoordinates.database,
                  tmpTableName,
                  crp)
              }
            }
            /* Throwing the exception will abort the job (explicitly on the driver) */
            throw exception
        }
        if (writeOptions.writeMode == WriteMode.Transactional) {
          finalizeIngestionWhenWorkersSucceeded(
            tableCoordinates,
            batchIdIfExists,
            tmpTableName,
            partitionsResults,
            writeOptions,
            crp,
            tableExists,
            rdd.sparkContext,
            authentication,
            kustoClient,
            sinkStartTime)
        }
      }
    }
  }