in connector/src/main/scala/com/microsoft/kusto/spark/utils/CslCommandsGenerator.scala [166:218]
def generateExportDataCommand(
query: String,
directory: String,
partitionId: Int,
storageParameters: TransientStorageParameters,
partitionPredicate: Option[String] = None,
additionalExportOptions: Map[String, String] = Map.empty[String, String],
supportNewParquetWriter: Boolean = true): String = {
val getFullUrlFromParams = (storage: TransientStorageCredentials) => {
val secretString =
storage.authMethod match {
case AuthMethod.Key => s""";" h@"${storage.storageAccountKey}""""
case AuthMethod.Sas =>
if (storage.sasKey(0) == '?') s"""" h@"${storage.sasKey}""""
else s"""?" h@"${storage.sasKey}""""
case AuthMethod.Impersonation =>
s"""${TransientStorageParameters.ImpersonationString}""""
}
val blobUri =
s"https://${storage.storageAccountName}.blob.${storageParameters.endpointSuffix}"
s"$blobUri/${storage.blobContainer}$secretString"
}
// if we pass in compress as 'none' explicitly then do not compress, else compress
val compress =
if (additionalExportOptions
.get("compressed")
.exists(compressed => "none".equalsIgnoreCase(compressed))) ""
else "compressed"
val additionalOptionsString = additionalExportOptions
.filterKeys(key => !defaultKeySet.contains(key))
.map { case (k, v) =>
s"""$k="$v""""
}
.mkString(",", ",", "")
// Values in the map will override,We could have chosen sizeLimit option as the default.
// Chosen the one in the map for consistency
val compressionFormat = additionalExportOptions.getOrElse("compressionType", "snappy")
val namePrefix = s"${directory}part$partitionId"
val sizeLimitOverride = additionalExportOptions
.get("sizeLimit")
.map(size => s"sizeLimit=${size.toLong * 1024 * 1024} ,")
.getOrElse("")
val nativeParquetString = additionalExportOptions
.get("useNativeParquetWriter")
.map(b => s"useNativeParquetWriter=$b, ")
.getOrElse(if (!supportNewParquetWriter) "useNativeParquetWriter=false, " else "")
s""".export async $compress to parquet ("${storageParameters.storageCredentials
.map(getFullUrlFromParams)
.reduce((s, s1) => s + ",\"" + s1)})""" +
s""" with ($sizeLimitOverride$nativeParquetString namePrefix="$namePrefix", compressionType="$compressionFormat"$additionalOptionsString) <| $query"""
}