in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [76:256]
private[kusto] def write(
batchId: Option[Long],
data: DataFrame,
tableCoordinates: KustoCoordinates,
authentication: KustoAuthentication,
writeOptions: WriteOptions,
crp: ClientRequestProperties): Unit = {
val batchIdIfExists = batchId.map(b => s"${b.toString}").getOrElse("")
val kustoClient = KustoClientCache.getClient(
tableCoordinates.clusterUrl,
authentication,
tableCoordinates.ingestionUrl,
tableCoordinates.clusterAlias)
val table = tableCoordinates.table.get
// TODO put data.sparkSession.sparkContext.appName in client app name
val tmpTableName: String = KDSU.generateTempTableName(
data.sparkSession.sparkContext.appName,
table,
writeOptions.requestId,
batchIdIfExists,
writeOptions.userTempTableName)
val stagingTableIngestionProperties = getSparkIngestionProperties(writeOptions)
val schemaShowCommandResult = kustoClient
.executeEngine(
tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get),
"schemaShow",
crp)
.getPrimaryResults
val targetSchema =
schemaShowCommandResult.getData.asScala.map(c => c.get(0).asInstanceOf[JsonNode]).toArray
KustoIngestionUtils.adjustSchema(
writeOptions.adjustSchema,
data.schema,
targetSchema,
stagingTableIngestionProperties,
writeOptions.tableCreateOptions)
val rebuiltOptions =
writeOptions.copy(maybeSparkIngestionProperties = Some(stagingTableIngestionProperties))
val tableExists = schemaShowCommandResult.count() > 0
val shouldIngest = kustoClient.shouldIngestData(
tableCoordinates,
writeOptions.maybeSparkIngestionProperties,
tableExists,
crp)
if (!shouldIngest) {
KDSU.logInfo(className, s"$IngestSkippedTrace '$table'")
} else {
if (writeOptions.userTempTableName.isDefined) {
if (kustoClient
.executeEngine(
tableCoordinates.database,
generateTableGetSchemaAsRowsCommand(writeOptions.userTempTableName.get),
"schemaShow",
crp)
.getPrimaryResults
.count() <= 0 ||
!tableExists) {
throw new InvalidParameterException(
"Temp table name provided but the table does not exist. Either drop this " +
"option or create the table beforehand.")
}
} else {
// KustoWriter will create a temporary table ingesting the data to it.
// Only if all executors succeeded the table will be appended to the original destination table.
kustoClient.initializeTablesBySchema(
tableCoordinates,
tmpTableName,
data.schema,
targetSchema,
writeOptions,
crp,
stagingTableIngestionProperties.creationTime == null)
}
kustoClient.setMappingOnStagingTableIfNeeded(
stagingTableIngestionProperties,
tableCoordinates.database,
tmpTableName,
table,
crp)
if (stagingTableIngestionProperties.flushImmediately) {
KDSU.logWarn(className, "It's not recommended to set flushImmediately to true")
}
if (stagingTableIngestionProperties.flushImmediately) {
KDSU.logWarn(
className,
"It's not recommended to set flushImmediately to true on production")
}
val cloudInfo = CloudInfo.retrieveCloudInfoForCluster(kustoClient.ingestKcsb.getClusterUrl)
val rdd = data.queryExecution.toRdd
val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult]
val parameters = KustoWriteResource(
authentication = authentication,
coordinates = tableCoordinates,
schema = data.schema,
writeOptions = rebuiltOptions,
tmpTableName = tmpTableName,
cloudInfo = cloudInfo)
val sinkStartTime = getCreationTime(stagingTableIngestionProperties)
if (writeOptions.isAsync) {
val asyncWork = rdd.foreachPartitionAsync { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
}
KDSU.logInfo(className, s"asynchronous write to Kusto table '$table' in progress")
// This part runs back on the driver
if (writeOptions.writeMode == WriteMode.Transactional) {
asyncWork.onSuccess { case _ =>
finalizeIngestionWhenWorkersSucceeded(
tableCoordinates,
batchIdIfExists,
tmpTableName,
partitionsResults,
writeOptions,
crp,
tableExists,
rdd.sparkContext,
authentication,
kustoClient,
sinkStartTime)
}
asyncWork.onFailure { case exception: Exception =>
if (writeOptions.userTempTableName.isEmpty) {
kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp)
}
KDSU.reportExceptionAndThrow(
className,
exception,
"writing data",
tableCoordinates.clusterUrl,
tableCoordinates.database,
table,
shouldNotThrow = true)
KDSU.logError(
className,
"The exception is not visible in the driver since we're in async mode")
}
}
} else {
try
rdd.foreachPartition { rows =>
ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters)
}
catch {
case exception: Exception =>
if (writeOptions.writeMode == WriteMode.Transactional) {
if (writeOptions.userTempTableName.isEmpty) {
kustoClient.cleanupIngestionByProducts(
tableCoordinates.database,
tmpTableName,
crp)
}
}
/* Throwing the exception will abort the job (explicitly on the driver) */
throw exception
}
if (writeOptions.writeMode == WriteMode.Transactional) {
finalizeIngestionWhenWorkersSucceeded(
tableCoordinates,
batchIdIfExists,
tmpTableName,
partitionsResults,
writeOptions,
crp,
tableExists,
rdd.sparkContext,
authentication,
kustoClient,
sinkStartTime)
}
}
}
}