public PCollection expand()

in src/main/java/com/google/cloud/solutions/autotokenize/dlp/BatchAndDlpDeIdRecords.java [127:170]


  public PCollection<FlatRecord> expand(PCollection<FlatRecord> input) {
    checkNotNull(dlpClientFactory(), "Provide Dlp client factory");
    checkArgument(isNotBlank(dlpProjectId()), "DLP ProjectId can't be empty.");
    checkArgument(isNotBlank(dlpRegion()), "DLP Region can't be null or empty");

    var errorTag = new TupleTag<KV<ShardedKey<String>, PartialColumnDlpTable>>();
    var successTag = new TupleTag<Iterable<FlatRecord>>();

    var successAndError =
        input
            .apply("AddRecordId", MapElements.via(FlatRecordKeysFn.create()))
            .apply(MapElements.via(new ShardAssigner<>(shardCount())))
            .apply(
                "BatchForDlp",
                GroupIntoBatches.<String, FlatRecord>ofByteSize(500000).withShardedKey())
            .apply("MakeDlpTable", ParDo.of(new DlpTableMaker(encryptConfig())))
            .setCoder(
                KvCoder.of(
                    ShardedKey.Coder.of(StringUtf8Coder.of()),
                    ProtoCoder.of(PartialColumnDlpTable.class)))
            .apply(
                ParDo.of(
                        DlpDeidentifyFn.builder()
                            .encryptConfig(encryptConfig())
                            .dlpClientFactory(dlpClientFactory())
                            .dlpProjectId(dlpProjectId())
                            .dlpRegion(dlpRegion())
                            .errorTag(errorTag)
                            .successTag(successTag)
                            .build())
                    .withOutputTags(successTag, TupleTagList.of(errorTag)));

    successAndError
        .get(errorTag)
        .setCoder(
            KvCoder.of(
                ShardedKey.Coder.of(StringUtf8Coder.of()),
                ProtoCoder.of(PartialColumnDlpTable.class)));

    return successAndError
        .get(successTag)
        .setCoder(IterableCoder.of(ProtoCoder.of(FlatRecord.class)))
        .apply(Flatten.iterables());
  }