in pipelines/etl_integration_java/src/main/java/com/google/cloud/dataflow/solutions/transform/TaxiEventProcessor.java [106:122]
public ParsingOutput<T> expand(PCollection<String> input) {
PCollectionRowTuple allRows =
input.apply("Convert to Row", new JsonStringParser(schema()));
PCollection<Row> rows = allRows.get(JsonStringParser.RESULTS_TAG);
PCollection<Row> errorRows = allRows.get(JsonStringParser.ERRORS_TAG);
PCollection<ParsingError> errorMessages =
errorRows.apply("Convert to ErrorMessage", new RowToError());
// Convert row objects to input type
PCollection<T> taxiRides =
rows.apply(
String.format("Convert to %s", clz().getSimpleName()),
Convert.fromRows(clz()));
return new ParsingOutput<>(input.getPipeline(), taxiRides, errorMessages);
}