public PCollection expand()

in src/main/java/com/google/solutions/df/log/aggregations/common/JsonToRowValidationTransform.java [39:69]


  public PCollection<Row> expand(PCollection<String> input) {

    PCollectionTuple output =
        input.apply(
            "Validated Json",
            ParDo.of(new JsonValidatorFn())
                .withOutputTags(Util.successTag, TupleTagList.of(Util.failureTag)));
    PCollection<Row> logRow =
        output
            .get(Util.successTag)
            .apply("Convert To Row", JsonToRow.withSchema(Util.networkLogSchema))
            .setRowSchema(Util.networkLogSchema);

    return logRow
        .apply(
            "ModifiedRow",
            ParDo.of(
                new DoFn<Row, Row>() {

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    Row modifiedRow =
                        Row.fromRow(c.element())
                            .withFieldValue("startTime", Util.currentStartTime())
                            .withFieldValue("endTime", Util.currentEndTime())
                            .build();
                    c.output(modifiedRow);
                  }
                }))
        .setRowSchema(Util.networkLogSchema);
  }