def writeDataFrame()

in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/util/FileSystem.scala [50:106]


  def writeDataFrame(
      dataFrame: DataFrame,
      fileType: String,
      outputPrefix: String,
      offsetStartChunkIndex: Option[Int],
      aggNumListOfEdgeChunk: Option[Array[Long]]
  ): Unit = {
    val spark = dataFrame.sparkSession
    // TODO: Make the hard-code setting to configurable
    spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
    spark.conf.set("parquet.enable.summary-metadata", "false")
    spark.conf.set("spark.sql.orc.compression.codec", "zstd")
    spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
    // first check the outputPrefix exists, if not, create it
    val path = new Path(outputPrefix)
    val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
    if (!fs.exists(path)) {
      fs.mkdirs(path)
    }
    fs.close()

    // write offset chunks DataFrame
    if (!offsetStartChunkIndex.isEmpty) {
      return dataFrame.write
        .mode("append")
        .option("header", "true")
        .option("fileFormat", fileType)
        .option(
          GeneralParams.offsetStartChunkIndexKey,
          offsetStartChunkIndex.get
        )
        .format("org.apache.graphar.datasources.GarDataSource")
        .save(outputPrefix)
    }
    // write edge chunks DataFrame
    if (!aggNumListOfEdgeChunk.isEmpty) {
      implicit val formats =
        DefaultFormats // initialize a default formats for json4s
      return dataFrame.write
        .mode("append")
        .option("header", "true")
        .option("fileFormat", fileType)
        .option(
          GeneralParams.aggNumListOfEdgeChunkKey,
          write(aggNumListOfEdgeChunk.get)
        )
        .format("org.apache.graphar.datasources.GarDataSource")
        .save(outputPrefix)
    }
    // write vertex chunks DataFrame
    dataFrame.write
      .mode("append")
      .option("header", "true")
      .option("fileFormat", fileType)
      .format("org.apache.graphar.datasources.GarDataSource")
      .save(outputPrefix)
  }