in spark/spark-tensorflow-connector/src/main/scala/org/tensorflow/spark/datasources/tfrecords/DefaultSource.scala [173:205]
private def writePartitionLocal(
index: Int,
part: Iterator[(BytesWritable, NullWritable)],
localPath: String,
mode: SaveMode): Iterator[Int] = {
val dir = new File(localPath)
if (dir.exists()) {
if (mode == SaveMode.ErrorIfExists) {
throw new IllegalStateException(
s"LocalPath $localPath already exists. SaveMode: ErrorIfExists.")
}
if (mode == SaveMode.Ignore) {
return Iterator.empty
}
}
// Make the directory if it does not exist
dir.mkdirs()
// The path to the partition file.
val filePath = localPath + s"/part-" + String.format("%05d", java.lang.Integer.valueOf(index))
val fos = new DataOutputStream(new FileOutputStream(filePath))
var count = 0
try {
val tfw = new TFRecordWriter(fos)
for((bw, _) <- part) {
tfw.write(bw.getBytes)
count += 1
}
} finally {
fos.close()
}
Iterator(count)
}