src/main/java/com/google/solutions/df/log/aggregations/common/ReadFlowLogTransform.java [60:75]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public PCollection<Row> expand(PBegin input) {

    PCollection<String> fileRow =
        input
            .apply(
                "ReadFromGCS",
                TextIO.read().from(filePattern()).watchForNewFiles(pollInterval(), Growth.never()))
            .apply("AssignEventTimestamp", WithTimestamps.of((String rec) -> Instant.now()));

    PCollection<String> pubsubMessage =
        input.apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(subscriber()));

    return PCollectionList.of(fileRow)
        .and(pubsubMessage)
        .apply(Flatten.<String>pCollections())
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/ReadTransactionTransform.java [68:83]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public PCollection<Row> expand(PBegin input) {

    PCollection<String> fileRow =
        input
            .apply(
                "ReadFromGCS",
                TextIO.read().from(filePattern()).watchForNewFiles(pollInterval(), Growth.never()))
            .apply("AssignEventTimestamp", WithTimestamps.of((String rec) -> Instant.now()));

    PCollection<String> pubsubMessage =
        input.apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(subscriber()));

    return PCollectionList.of(fileRow)
        .and(pubsubMessage)
        .apply(Flatten.<String>pCollections())
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



