private def saveDistributed()

in spark/spark-tensorflow-connector/src/main/scala/org/tensorflow/spark/datasources/tfrecords/DefaultSource.scala [98:132]


  private def saveDistributed(
      features: RDD[(BytesWritable, NullWritable)],
      path: String,
      sqlContext: SQLContext,
      mode: SaveMode,
      codec: String): Unit = {
    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
    val outputPath = new Path(path)
    val fs = outputPath.getFileSystem(hadoopConf)
    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)

    val pathExists = fs.exists(qualifiedOutputPath)

    mode match {
      case SaveMode.Overwrite =>
        fs.delete(qualifiedOutputPath, true)
        save(sqlContext, features, path, codec)

      case SaveMode.Append =>
        throw new IllegalArgumentException("Append mode is not supported")

      case SaveMode.ErrorIfExists =>
        if (pathExists)
          throw new IllegalStateException(
            s"Path $path already exists. SaveMode: ErrorIfExists.")
        save(sqlContext, features, path, codec)

      case SaveMode.Ignore =>
        // With `SaveMode.Ignore` mode, if data already exists, the save operation is expected
        // to not save the contents of the DataFrame and to not change the existing data.
        // Therefore, it is okay to do nothing here and then just return the relation below.
        if (pathExists == false)
          save(sqlContext, features, path, codec)
    }
  }