Pipeline makePipeline()

in src/main/java/com/google/cloud/solutions/autotokenize/pipeline/DlpInspectionPipeline.java [110:191]


  Pipeline makePipeline() {
    TupleTag<FlatRecord> recordsTag = new TupleTag<>();
    TupleTag<String> avroSchemaTag = new TupleTag<>();

    PCollectionTuple recordSchemaTuple =
        pipeline.apply(
            "Read" + SourceNames.forType(options.getSourceType()).asCamelCase(),
            TransformingReader.forSourceType(options.getSourceType())
                .from(options.getInputPattern())
                .withJdbcConfiguration(
                    JdbcConfigurationExtractor.using(options).jdbcConfiguration())
                .withSecretsClient(secretsClient)
                .withRecordsTag(recordsTag)
                .withAvroSchemaTag(avroSchemaTag));

    // Sample and Identify columns
    var columnInfoTag = new TupleTag<ColumnInformation>();
    var errorTag = new TupleTag<KV<ShardedKey<String>, Table>>();

    var dlpInspectResults =
        recordSchemaTuple
            .get(recordsTag)
            .apply("RandomColumnarSample", RandomColumnarSampler.any(options.getSampleSize()))
            .apply("BatchForDlp", new BatchColumnsForDlp())
            .apply(
                "DlpIdentify",
                DlpIdentify.builder()
                    .batchIdentifierFactory(makeDlpBatchIdentifierFactory())
                    .columnInfoTag(columnInfoTag)
                    .errorTag(errorTag)
                    .build());

    dlpInspectResults
        .get(errorTag)
        .setCoder(KvCoder.of(ShardedKey.Coder.of(StringUtf8Coder.of()), ProtoCoder.of(Table.class)))
        .apply("MakeErrorTableJson", ParDo.of(new ConvertTableToJsonFn()))
        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
        .apply(
            "WriteErrorElements",
            FileIO.<String, KV<String, String>>writeDynamic()
                .via(Contextful.fn(KV::getValue), Contextful.fn(col -> TextIO.sink()))
                .by(KV::getKey)
                .withDestinationCoder(StringUtf8Coder.of())
                .withNaming(
                    Contextful.fn(
                        colName ->
                            defaultNaming(
                                /* prefix= */ String.format(
                                        "col-%s", colName.replaceAll("[\\.\\$\\[\\]]+", "-"))
                                    .replaceAll("[-]+", "-"),
                                /* suffix= */ ".json")))
                .to(options.getReportLocation() + "/error"));

    var inspectionReport =
        dlpInspectResults
            .get(columnInfoTag)
            .apply(
                "ExtractReport",
                MakeInspectionReport.builder()
                    .setAvroSchema(recordSchemaTuple.get(avroSchemaTag).apply(View.asSingleton()))
                    .setSourceType(options.getSourceType())
                    .setClock(clock)
                    .setInputPattern(options.getInputPattern())
                    .setJdbcConfiguration(
                        JdbcConfigurationExtractor.using(options).jdbcConfiguration())
                    .build());

    recordSchemaTuple
        .get(avroSchemaTag)
        .apply(
            "WriteSchema",
            TextIO.write()
                .to(options.getReportLocation() + "/schema")
                .withSuffix(".json")
                .withoutSharding());

    writeReportToGcs(inspectionReport);
    writeReportToBigQuery(inspectionReport);
    writeReportToDataCatalog(inspectionReport);

    return pipeline;
  }