def generateExportDataCommand()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/CslCommandsGenerator.scala [166:218]


  def generateExportDataCommand(
      query: String,
      directory: String,
      partitionId: Int,
      storageParameters: TransientStorageParameters,
      partitionPredicate: Option[String] = None,
      additionalExportOptions: Map[String, String] = Map.empty[String, String],
      supportNewParquetWriter: Boolean = true): String = {
    val getFullUrlFromParams = (storage: TransientStorageCredentials) => {
      val secretString =
        storage.authMethod match {
          case AuthMethod.Key => s""";" h@"${storage.storageAccountKey}""""
          case AuthMethod.Sas =>
            if (storage.sasKey(0) == '?') s"""" h@"${storage.sasKey}""""
            else s"""?" h@"${storage.sasKey}""""
          case AuthMethod.Impersonation =>
            s"""${TransientStorageParameters.ImpersonationString}""""
        }
      val blobUri =
        s"https://${storage.storageAccountName}.blob.${storageParameters.endpointSuffix}"
      s"$blobUri/${storage.blobContainer}$secretString"
    }
    // if we pass in compress as 'none' explicitly then do not compress, else compress
    val compress =
      if (additionalExportOptions
          .get("compressed")
          .exists(compressed => "none".equalsIgnoreCase(compressed))) ""
      else "compressed"
    val additionalOptionsString = additionalExportOptions
      .filterKeys(key => !defaultKeySet.contains(key))
      .map { case (k, v) =>
        s"""$k="$v""""
      }
      .mkString(",", ",", "")

    // Values in the map will override,We could have chosen sizeLimit option as the default.
    // Chosen the one in the map for consistency
    val compressionFormat = additionalExportOptions.getOrElse("compressionType", "snappy")
    val namePrefix = s"${directory}part$partitionId"
    val sizeLimitOverride = additionalExportOptions
      .get("sizeLimit")
      .map(size => s"sizeLimit=${size.toLong * 1024 * 1024} ,")
      .getOrElse("")
    val nativeParquetString = additionalExportOptions
      .get("useNativeParquetWriter")
      .map(b => s"useNativeParquetWriter=$b, ")
      .getOrElse(if (!supportNewParquetWriter) "useNativeParquetWriter=false, " else "")

    s""".export async $compress to parquet ("${storageParameters.storageCredentials
        .map(getFullUrlFromParams)
        .reduce((s, s1) => s + ",\"" + s1)})""" +
      s""" with ($sizeLimitOverride$nativeParquetString namePrefix="$namePrefix", compressionType="$compressionFormat"$additionalOptionsString) <| $query"""
  }