in src/main/java/com/google/cloud/solutions/autotokenize/pipeline/DlpInspectionPipeline.java [110:191]
Pipeline makePipeline() {
TupleTag<FlatRecord> recordsTag = new TupleTag<>();
TupleTag<String> avroSchemaTag = new TupleTag<>();
PCollectionTuple recordSchemaTuple =
pipeline.apply(
"Read" + SourceNames.forType(options.getSourceType()).asCamelCase(),
TransformingReader.forSourceType(options.getSourceType())
.from(options.getInputPattern())
.withJdbcConfiguration(
JdbcConfigurationExtractor.using(options).jdbcConfiguration())
.withSecretsClient(secretsClient)
.withRecordsTag(recordsTag)
.withAvroSchemaTag(avroSchemaTag));
// Sample and Identify columns
var columnInfoTag = new TupleTag<ColumnInformation>();
var errorTag = new TupleTag<KV<ShardedKey<String>, Table>>();
var dlpInspectResults =
recordSchemaTuple
.get(recordsTag)
.apply("RandomColumnarSample", RandomColumnarSampler.any(options.getSampleSize()))
.apply("BatchForDlp", new BatchColumnsForDlp())
.apply(
"DlpIdentify",
DlpIdentify.builder()
.batchIdentifierFactory(makeDlpBatchIdentifierFactory())
.columnInfoTag(columnInfoTag)
.errorTag(errorTag)
.build());
dlpInspectResults
.get(errorTag)
.setCoder(KvCoder.of(ShardedKey.Coder.of(StringUtf8Coder.of()), ProtoCoder.of(Table.class)))
.apply("MakeErrorTableJson", ParDo.of(new ConvertTableToJsonFn()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply(
"WriteErrorElements",
FileIO.<String, KV<String, String>>writeDynamic()
.via(Contextful.fn(KV::getValue), Contextful.fn(col -> TextIO.sink()))
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(
Contextful.fn(
colName ->
defaultNaming(
/* prefix= */ String.format(
"col-%s", colName.replaceAll("[\\.\\$\\[\\]]+", "-"))
.replaceAll("[-]+", "-"),
/* suffix= */ ".json")))
.to(options.getReportLocation() + "/error"));
var inspectionReport =
dlpInspectResults
.get(columnInfoTag)
.apply(
"ExtractReport",
MakeInspectionReport.builder()
.setAvroSchema(recordSchemaTuple.get(avroSchemaTag).apply(View.asSingleton()))
.setSourceType(options.getSourceType())
.setClock(clock)
.setInputPattern(options.getInputPattern())
.setJdbcConfiguration(
JdbcConfigurationExtractor.using(options).jdbcConfiguration())
.build());
recordSchemaTuple
.get(avroSchemaTag)
.apply(
"WriteSchema",
TextIO.write()
.to(options.getReportLocation() + "/schema")
.withSuffix(".json")
.withoutSharding());
writeReportToGcs(inspectionReport);
writeReportToBigQuery(inspectionReport);
writeReportToDataCatalog(inspectionReport);
return pipeline;
}