in pipelines/etl_integration_java/src/main/java/com/google/cloud/dataflow/solutions/transform/TaxiEventProcessor.java [230:278]
private JsonNode sanitizeNode(JsonNode node, Schema schema) {
List<Field> fields = schema.getFields();
ObjectNode jsonNode = JsonNodeFactory.instance.objectNode();
for (Field f : fields) {
String fieldName = f.getName();
JsonNode childNode = node.get(fieldName);
Field childField = schema.getField(fieldName);
TypeName typeName = f.getType().getTypeName();
if (typeName.isCompositeType()) {
// Recursive iteration if this is a Composite
Schema childSchema =
Objects.requireNonNull(childField.getType().getRowSchema());
jsonNode.set(fieldName, sanitizeNode(childNode, childSchema));
} else if (typeName.isCollectionType()) {
// Add array if this is a collection. We assume all the elements share the same
// collection
// type.
FieldType wrappedType =
Objects.requireNonNull(f.getType().getCollectionElementType());
assert childNode.isArray();
Stream<JsonNode> stream = StreamSupport.stream(childNode.spliterator(), false);
List<JsonNode> objects;
if (wrappedType.getTypeName().isCompositeType()
|| wrappedType.getTypeName().isCollectionType()) {
// If the collection type is a Row/Struct, or it is a list of lists
Schema collectionSchema =
Objects.requireNonNull(wrappedType.getRowSchema());
objects =
stream.map(n -> sanitizeNode(n, collectionSchema))
.collect(Collectors.toList());
} else {
// If the collection type is a single value type
objects =
stream.map(n -> sanitizeSingleNode(n, wrappedType))
.collect(Collectors.toList());
}
ArrayNode sanitizedArray = jsonNode.arrayNode().addAll(objects);
jsonNode.set(fieldName, sanitizedArray);
} else {
// Single type field
JsonNode sanitized = sanitizeSingleNode(childNode, childField.getType());
jsonNode.set(fieldName, sanitized);
}
}
return jsonNode;
}