in src/main/java/com/google/cloud/solutions/autotokenize/dlp/DlpIdentify.java [91:132]
public PCollectionTuple expand(PCollection<KV<ShardedKey<String>, Table>> batchedRecords) {
var coderRegistry = batchedRecords.getPipeline().getCoderRegistry();
coderRegistry.registerCoderForType(
TypeDescriptors.kvs(
TypeDescriptor.of(Table.class),
TypeDescriptors.maps(TypeDescriptor.of(String.class), TypeDescriptor.of(String.class))),
KvCoder.of(
ProtoCoder.of(Table.class), MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
coderRegistry.registerCoderForType(
TypeDescriptors.kvs(TypeDescriptor.of(String.class), TypeDescriptor.of(InfoType.class)),
KvCoder.of(StringUtf8Coder.of(), ProtoCoder.of(InfoType.class)));
TupleTag<KV<String, InfoType>> successElements = new TupleTag<>();
var successAndErrorElements =
batchedRecords.apply(
"IdentifyUsingDLP",
ParDo.of(SendToDlp.create(batchIdentifierFactory(), errorTag()))
.withOutputTags(successElements, TupleTagList.of(errorTag())));
var columnInformation =
successAndErrorElements
.get(successElements)
.apply("CountInfoTypesPerColumn", Count.perElement())
.apply("ParseCounts", MapElements.via(new CountFlattener()))
.apply("GroupByColumns", GroupByKey.create())
.apply(
"CreateColumnInformation",
MapElements.into(TypeDescriptor.of(ColumnInformation.class))
.via(
columnGroupKv ->
ColumnInformation.newBuilder()
.setColumnName(columnGroupKv.getKey())
.addAllInfoTypes(columnGroupKv.getValue())
.build()));
return PCollectionTuple.of(columnInfoTag(), columnInformation)
.and(errorTag(), successAndErrorElements.get(errorTag()));
}