in maven-projects/spark/graphar/src/main/scala/org/apache/graphar/util/FileSystem.scala [50:106]
def writeDataFrame(
dataFrame: DataFrame,
fileType: String,
outputPrefix: String,
offsetStartChunkIndex: Option[Int],
aggNumListOfEdgeChunk: Option[Array[Long]]
): Unit = {
val spark = dataFrame.sparkSession
// TODO: Make the hard-code setting to configurable
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("spark.sql.orc.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
// first check the outputPrefix exists, if not, create it
val path = new Path(outputPrefix)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (!fs.exists(path)) {
fs.mkdirs(path)
}
fs.close()
// write offset chunks DataFrame
if (!offsetStartChunkIndex.isEmpty) {
return dataFrame.write
.mode("append")
.option("header", "true")
.option("fileFormat", fileType)
.option(
GeneralParams.offsetStartChunkIndexKey,
offsetStartChunkIndex.get
)
.format("org.apache.graphar.datasources.GarDataSource")
.save(outputPrefix)
}
// write edge chunks DataFrame
if (!aggNumListOfEdgeChunk.isEmpty) {
implicit val formats =
DefaultFormats // initialize a default formats for json4s
return dataFrame.write
.mode("append")
.option("header", "true")
.option("fileFormat", fileType)
.option(
GeneralParams.aggNumListOfEdgeChunkKey,
write(aggNumListOfEdgeChunk.get)
)
.format("org.apache.graphar.datasources.GarDataSource")
.save(outputPrefix)
}
// write vertex chunks DataFrame
dataFrame.write
.mode("append")
.option("header", "true")
.option("fileFormat", fileType)
.format("org.apache.graphar.datasources.GarDataSource")
.save(outputPrefix)
}