in src/main/java/com/google/cloud/solutions/autotokenize/common/InspectionReportFileWriter.java [51:89]
public PDone expand(PCollection<InspectionReport> inspectionReport) {
PCollection<String> avroSchema =
inspectionReport.apply("ExtractSchema", MapElements.via(new SchemaExtractor()));
PCollection<ColumnInformation> columnInformation =
inspectionReport
.apply("ExtractColumnInformation", MapElements.via(new ColumnInformationExtractor()))
.apply(Flatten.iterables());
// Write the schema to a file.
avroSchema.apply(
"WriteSchema",
FileIO.<String>write()
.to(reportLocation())
.via(TextIO.sink())
.withNumShards(1)
.withNaming((window, pane, numShards, shardIndex, compression) -> "schema.json"));
// Write Column Information to GCS file.
columnInformation.apply(
"WriteColumnReport",
FileIO.<String, ColumnInformation>writeDynamic()
.via(
Contextful.fn(JsonConvertor::asJsonString), Contextful.fn(colName -> TextIO.sink()))
.by(ColumnInformation::getColumnName)
.withDestinationCoder(StringUtf8Coder.of())
.withNoSpilling()
.withNaming(
Contextful.fn(
colName ->
defaultNaming(
/* prefix= */ String.format(
"col-%s", colName.replaceAll("[\\.\\$\\[\\]]+", "-"))
.replaceAll("[-]+", "-"),
/* suffix= */ ".json")))
.to(reportLocation()));
return PDone.in(inspectionReport.getPipeline());
}