public PDone expand()

in src/main/java/com/google/cloud/solutions/autotokenize/common/InspectionReportFileWriter.java [51:89]


  public PDone expand(PCollection<InspectionReport> inspectionReport) {
    PCollection<String> avroSchema =
        inspectionReport.apply("ExtractSchema", MapElements.via(new SchemaExtractor()));

    PCollection<ColumnInformation> columnInformation =
        inspectionReport
            .apply("ExtractColumnInformation", MapElements.via(new ColumnInformationExtractor()))
            .apply(Flatten.iterables());

    // Write the schema to a file.
    avroSchema.apply(
        "WriteSchema",
        FileIO.<String>write()
            .to(reportLocation())
            .via(TextIO.sink())
            .withNumShards(1)
            .withNaming((window, pane, numShards, shardIndex, compression) -> "schema.json"));

    // Write Column Information to GCS file.
    columnInformation.apply(
        "WriteColumnReport",
        FileIO.<String, ColumnInformation>writeDynamic()
            .via(
                Contextful.fn(JsonConvertor::asJsonString), Contextful.fn(colName -> TextIO.sink()))
            .by(ColumnInformation::getColumnName)
            .withDestinationCoder(StringUtf8Coder.of())
            .withNoSpilling()
            .withNaming(
                Contextful.fn(
                    colName ->
                        defaultNaming(
                            /* prefix= */ String.format(
                                    "col-%s", colName.replaceAll("[\\.\\$\\[\\]]+", "-"))
                                .replaceAll("[-]+", "-"),
                            /* suffix= */ ".json")))
            .to(reportLocation()));

    return PDone.in(inspectionReport.getPipeline());
  }