in spark/spark-tensorflow-connector/src/main/scala/org/tensorflow/spark/datasources/tfrecords/DefaultSource.scala [98:132]
private def saveDistributed(
features: RDD[(BytesWritable, NullWritable)],
path: String,
sqlContext: SQLContext,
mode: SaveMode,
codec: String): Unit = {
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val pathExists = fs.exists(qualifiedOutputPath)
mode match {
case SaveMode.Overwrite =>
fs.delete(qualifiedOutputPath, true)
save(sqlContext, features, path, codec)
case SaveMode.Append =>
throw new IllegalArgumentException("Append mode is not supported")
case SaveMode.ErrorIfExists =>
if (pathExists)
throw new IllegalStateException(
s"Path $path already exists. SaveMode: ErrorIfExists.")
save(sqlContext, features, path, codec)
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if data already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
if (pathExists == false)
save(sqlContext, features, path, codec)
}
}