in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/ReadTransactionTransform.java [69:90]
public PCollection<Row> expand(PBegin input) {
PCollection<String> fileRow =
input
.apply(
"ReadFromGCS",
TextIO.read().from(filePattern()).watchForNewFiles(pollInterval(), Growth.never()))
.apply("AssignEventTimestamp", WithTimestamps.of((String rec) -> Instant.now()));
PCollection<String> pubsubMessage =
input.apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(subscriber()));
return PCollectionList.of(fileRow)
.and(pubsubMessage)
.apply(Flatten.<String>pCollections())
.apply(
"ValidateJson",
ParDo.of(new JsonValidatorFn())
.withOutputTags(Util.successJsonTag, TupleTagList.of(Util.failedJsonTag)))
.get(Util.successJsonTag)
.apply("JsonToRow", JsonToRow.withSchema(Util.transactionSchema));
}