private void processField()

in ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/transform/PubsubMessageToObjectNode.java [453:547]


    private void processField(String jsonFieldName, Field field, JsonNode value, ObjectNode parent,
        ObjectNode additionalProperties) {
      final String bqFieldName = field.getName();

      // null is valid for any type except an element of a list
      if (value.isNull()) {
        updateParent(parent, jsonFieldName, bqFieldName, value);

        // A record of key and value indicates we need to transformForBqSchema a map to an array.
      } else if (isMapType(field)) {
        if (value.isObject()) {
          // If the value is not an ObjectNode, we're dealing with a duplicate normalized field name
          // which value has already been expanded to an array. We don't need to process it again.
          expandMapType(jsonFieldName, (ObjectNode) value, field, parent, additionalProperties);
        }
        // A record with a single "list" field and a list value should be expanded appropriately.
      } else if (isNestedListType(field, value)) {
        expandNestedListType(jsonFieldName, (ArrayNode) value, field, parent, additionalProperties);

        // We need to recursively call transformForBqSchema on any normal record type.
      } else if (field.getType() == LegacySQLTypeName.RECORD
          && field.getMode() != Field.Mode.REPEATED) {

        // An array signifies a fixed length tuple which should be given anonymous field names.
        if (value.isArray()) {
          updateParent(parent, jsonFieldName, bqFieldName, processTupleField(jsonFieldName,
              field.getSubFields(), (ArrayNode) value, additionalProperties));
        } else {
          // Only transform value if it is not null
          if (value.isObject()) {
            final ObjectNode props = (additionalProperties == null) ? null
                : Json.createObjectNode();
            transformForBqSchema((ObjectNode) value, field.getSubFields(), props);
            if (!Json.isNullOrEmpty(props)) {
              additionalProperties.set(jsonFieldName, props);
            }
          }
          updateParent(parent, jsonFieldName, bqFieldName, value);
        }

        // Likewise, we need to recursively call transformForBqSchema on repeated record types.
      } else if (field.getType() == LegacySQLTypeName.RECORD
          && field.getMode() == Field.Mode.REPEATED) {
        ArrayNode repeatedAdditionalProperties = Json.createArrayNode();
        if (Streams.stream(value).allMatch(JsonNode::isArray)) {
          // Tuples cannot be transformed in place, instead each element of the parent
          // array will need to reference a new transformed object.
          ArrayNode records = (ArrayNode) value;
          for (int i = 0; i < records.size(); i++) {
            final ObjectNode props = additionalProperties == null ? null : Json.createObjectNode();
            records.set(i, processTupleField(jsonFieldName, field.getSubFields(),
                (ArrayNode) records.get(i), props));
            if (!Json.isNullOrEmpty(props)) {
              repeatedAdditionalProperties.add(props.get(jsonFieldName));
            } else {
              repeatedAdditionalProperties.addObject();
            }
          }
        } else {
          ArrayNode filteredValue = Json.createArrayNode();
          for (JsonNode record : value) {
            final ObjectNode props = additionalProperties == null ? null : Json.createObjectNode();
            if (record.isObject()) {
              filteredValue.add(record);
              transformForBqSchema((ObjectNode) record, field.getSubFields(), props);
              if (!Json.isNullOrEmpty(props)) {
                repeatedAdditionalProperties.add(props);
              } else {
                repeatedAdditionalProperties.addObject();
              }
            } else {
              // BigQuery only allows objects in this array, so we insert an empty object instead.
              filteredValue.addObject();
              repeatedAdditionalProperties.add(record);
            }
          }
          value = filteredValue;
        }
        if (!Streams.stream(repeatedAdditionalProperties).allMatch(EMPTY_OBJECT::equals)) {
          additionalProperties.set(jsonFieldName, repeatedAdditionalProperties);
        }
        updateParent(parent, jsonFieldName, bqFieldName, value);

        // If we've made it here, we have a basic type or a list of basic types.
      } else {
        final Optional<JsonNode> coerced = coerceToBqType(value, field);
        // use coerced.orElse(null) to remove the field via updateParent if necessary
        updateParent(parent, jsonFieldName, bqFieldName, coerced.orElse(null));
        // If coerced is not present that means the actual type didn't match expected and we don't
        // define a coercion. We put the value to additional_properties instead.
        if (!coerced.isPresent() && additionalProperties != null) {
          additionalProperties.set(jsonFieldName, value);
        }
      }
    }