public PCollection expand()

in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/ReadTransactionTransform.java [69:90]


  public PCollection<Row> expand(PBegin input) {

    PCollection<String> fileRow =
        input
            .apply(
                "ReadFromGCS",
                TextIO.read().from(filePattern()).watchForNewFiles(pollInterval(), Growth.never()))
            .apply("AssignEventTimestamp", WithTimestamps.of((String rec) -> Instant.now()));

    PCollection<String> pubsubMessage =
        input.apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(subscriber()));

    return PCollectionList.of(fileRow)
        .and(pubsubMessage)
        .apply(Flatten.<String>pCollections())
        .apply(
            "ValidateJson",
            ParDo.of(new JsonValidatorFn())
                .withOutputTags(Util.successJsonTag, TupleTagList.of(Util.failedJsonTag)))
        .get(Util.successJsonTag)
        .apply("JsonToRow", JsonToRow.withSchema(Util.transactionSchema));
  }