public PCollectionTuple expand()

in src/main/java/com/google/cloud/solutions/autotokenize/dlp/DlpIdentify.java [91:132]


  public PCollectionTuple expand(PCollection<KV<ShardedKey<String>, Table>> batchedRecords) {

    var coderRegistry = batchedRecords.getPipeline().getCoderRegistry();

    coderRegistry.registerCoderForType(
        TypeDescriptors.kvs(
            TypeDescriptor.of(Table.class),
            TypeDescriptors.maps(TypeDescriptor.of(String.class), TypeDescriptor.of(String.class))),
        KvCoder.of(
            ProtoCoder.of(Table.class), MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));

    coderRegistry.registerCoderForType(
        TypeDescriptors.kvs(TypeDescriptor.of(String.class), TypeDescriptor.of(InfoType.class)),
        KvCoder.of(StringUtf8Coder.of(), ProtoCoder.of(InfoType.class)));

    TupleTag<KV<String, InfoType>> successElements = new TupleTag<>();

    var successAndErrorElements =
        batchedRecords.apply(
            "IdentifyUsingDLP",
            ParDo.of(SendToDlp.create(batchIdentifierFactory(), errorTag()))
                .withOutputTags(successElements, TupleTagList.of(errorTag())));

    var columnInformation =
        successAndErrorElements
            .get(successElements)
            .apply("CountInfoTypesPerColumn", Count.perElement())
            .apply("ParseCounts", MapElements.via(new CountFlattener()))
            .apply("GroupByColumns", GroupByKey.create())
            .apply(
                "CreateColumnInformation",
                MapElements.into(TypeDescriptor.of(ColumnInformation.class))
                    .via(
                        columnGroupKv ->
                            ColumnInformation.newBuilder()
                                .setColumnName(columnGroupKv.getKey())
                                .addAllInfoTypes(columnGroupKv.getValue())
                                .build()));

    return PCollectionTuple.of(columnInfoTag(), columnInformation)
        .and(errorTag(), successAndErrorElements.get(errorTag()));
  }