in src/main/java/com/google/cloud/solutions/autotokenize/dlp/BatchAndDlpDeIdRecords.java [127:170]
public PCollection<FlatRecord> expand(PCollection<FlatRecord> input) {
checkNotNull(dlpClientFactory(), "Provide Dlp client factory");
checkArgument(isNotBlank(dlpProjectId()), "DLP ProjectId can't be empty.");
checkArgument(isNotBlank(dlpRegion()), "DLP Region can't be null or empty");
var errorTag = new TupleTag<KV<ShardedKey<String>, PartialColumnDlpTable>>();
var successTag = new TupleTag<Iterable<FlatRecord>>();
var successAndError =
input
.apply("AddRecordId", MapElements.via(FlatRecordKeysFn.create()))
.apply(MapElements.via(new ShardAssigner<>(shardCount())))
.apply(
"BatchForDlp",
GroupIntoBatches.<String, FlatRecord>ofByteSize(500000).withShardedKey())
.apply("MakeDlpTable", ParDo.of(new DlpTableMaker(encryptConfig())))
.setCoder(
KvCoder.of(
ShardedKey.Coder.of(StringUtf8Coder.of()),
ProtoCoder.of(PartialColumnDlpTable.class)))
.apply(
ParDo.of(
DlpDeidentifyFn.builder()
.encryptConfig(encryptConfig())
.dlpClientFactory(dlpClientFactory())
.dlpProjectId(dlpProjectId())
.dlpRegion(dlpRegion())
.errorTag(errorTag)
.successTag(successTag)
.build())
.withOutputTags(successTag, TupleTagList.of(errorTag)));
successAndError
.get(errorTag)
.setCoder(
KvCoder.of(
ShardedKey.Coder.of(StringUtf8Coder.of()),
ProtoCoder.of(PartialColumnDlpTable.class)));
return successAndError
.get(successTag)
.setCoder(IterableCoder.of(ProtoCoder.of(FlatRecord.class)))
.apply(Flatten.iterables());
}