in src/main/java/com/google/solutions/df/log/aggregations/common/JsonToRowValidationTransform.java [39:69]
public PCollection<Row> expand(PCollection<String> input) {
PCollectionTuple output =
input.apply(
"Validated Json",
ParDo.of(new JsonValidatorFn())
.withOutputTags(Util.successTag, TupleTagList.of(Util.failureTag)));
PCollection<Row> logRow =
output
.get(Util.successTag)
.apply("Convert To Row", JsonToRow.withSchema(Util.networkLogSchema))
.setRowSchema(Util.networkLogSchema);
return logRow
.apply(
"ModifiedRow",
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
Row modifiedRow =
Row.fromRow(c.element())
.withFieldValue("startTime", Util.currentStartTime())
.withFieldValue("endTime", Util.currentEndTime())
.build();
c.output(modifiedRow);
}
}))
.setRowSchema(Util.networkLogSchema);
}