in src/main/java/org/example/ToUpperCaseTransformProvider.java [133:156]
private static DoFn<Row, Row> createDoFn(String field, boolean handleErrors, Schema errorSchema) {
return new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row inputRow, MultiOutputReceiver out) {
try {
// Apply toUpperCase() to given field and tag successful records
Row output =
Row.fromRow(inputRow)
.withFieldValue(
field, Objects.requireNonNull(inputRow.getString(field)).toUpperCase())
.build();
out.get(successValues).output(output);
} catch (Exception e) {
if (handleErrors) {
// Catch any errors and tag with error tag if error_handling is specified
out.get(errorValues).output(ErrorHandling.errorRecord(errorSchema, inputRow, e));
} else {
// Throw error if error_handling was not specified
throw new RuntimeException(e);
}
}
}
};
}