private JsonNode sanitizeNode()

in pipelines/etl_integration_java/src/main/java/com/google/cloud/dataflow/solutions/transform/TaxiEventProcessor.java [230:278]


        private JsonNode sanitizeNode(JsonNode node, Schema schema) {
            List<Field> fields = schema.getFields();
            ObjectNode jsonNode = JsonNodeFactory.instance.objectNode();

            for (Field f : fields) {
                String fieldName = f.getName();
                JsonNode childNode = node.get(fieldName);
                Field childField = schema.getField(fieldName);
                TypeName typeName = f.getType().getTypeName();

                if (typeName.isCompositeType()) {
                    // Recursive iteration if this is a Composite
                    Schema childSchema =
                            Objects.requireNonNull(childField.getType().getRowSchema());
                    jsonNode.set(fieldName, sanitizeNode(childNode, childSchema));
                } else if (typeName.isCollectionType()) {
                    // Add array if this is a collection. We assume all the elements share the same
                    // collection
                    // type.
                    FieldType wrappedType =
                            Objects.requireNonNull(f.getType().getCollectionElementType());
                    assert childNode.isArray();
                    Stream<JsonNode> stream = StreamSupport.stream(childNode.spliterator(), false);
                    List<JsonNode> objects;
                    if (wrappedType.getTypeName().isCompositeType()
                            || wrappedType.getTypeName().isCollectionType()) {
                        // If the collection type is a Row/Struct, or it is a list of lists
                        Schema collectionSchema =
                                Objects.requireNonNull(wrappedType.getRowSchema());
                        objects =
                                stream.map(n -> sanitizeNode(n, collectionSchema))
                                        .collect(Collectors.toList());
                    } else {
                        // If the collection type is a single value type
                        objects =
                                stream.map(n -> sanitizeSingleNode(n, wrappedType))
                                        .collect(Collectors.toList());
                    }
                    ArrayNode sanitizedArray = jsonNode.arrayNode().addAll(objects);
                    jsonNode.set(fieldName, sanitizedArray);
                } else {
                    // Single type field
                    JsonNode sanitized = sanitizeSingleNode(childNode, childField.getType());
                    jsonNode.set(fieldName, sanitized);
                }
            }

            return jsonNode;
        }