in src/main/java/com/google/cloud/solutions/autotokenize/pipeline/CsvTokenizationAndOrderingPipeline.java [104:125]
public PipelineResult run() throws Exception {
applyReadAndEncryptionSteps()
.apply("MakeCsvRecord", MapElements.via(flatRecordToCsvRowFn()))
.apply(
"SortCsvRows",
(options.getOrderingColumns() != null || options.getOrderingColumnNames() != null)
? csvSorter()
: MapElements.into(
TypeDescriptors.kvs(
TypeDescriptor.of(String.class),
TypeDescriptors.iterables(TypeDescriptor.of(CsvRow.class))))
.via(
(SerializableFunction<CsvRow, KV<String, Iterable<CsvRow>>>)
input -> KV.of("", List.of(input))))
.apply(
"WriteCsv",
CsvIO.write(new SortedRowIterableFn())
.withOutputFilePrefix(options.getOutputDirectory())
.withFileShards(options.getCsvFileShardCount()));
return pipeline.run();
}