in ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/transform/PubsubMessageToObjectNode.java [453:547]
private void processField(String jsonFieldName, Field field, JsonNode value, ObjectNode parent,
ObjectNode additionalProperties) {
final String bqFieldName = field.getName();
// null is valid for any type except an element of a list
if (value.isNull()) {
updateParent(parent, jsonFieldName, bqFieldName, value);
// A record of key and value indicates we need to transformForBqSchema a map to an array.
} else if (isMapType(field)) {
if (value.isObject()) {
// If the value is not an ObjectNode, we're dealing with a duplicate normalized field name
// which value has already been expanded to an array. We don't need to process it again.
expandMapType(jsonFieldName, (ObjectNode) value, field, parent, additionalProperties);
}
// A record with a single "list" field and a list value should be expanded appropriately.
} else if (isNestedListType(field, value)) {
expandNestedListType(jsonFieldName, (ArrayNode) value, field, parent, additionalProperties);
// We need to recursively call transformForBqSchema on any normal record type.
} else if (field.getType() == LegacySQLTypeName.RECORD
&& field.getMode() != Field.Mode.REPEATED) {
// An array signifies a fixed length tuple which should be given anonymous field names.
if (value.isArray()) {
updateParent(parent, jsonFieldName, bqFieldName, processTupleField(jsonFieldName,
field.getSubFields(), (ArrayNode) value, additionalProperties));
} else {
// Only transform value if it is not null
if (value.isObject()) {
final ObjectNode props = (additionalProperties == null) ? null
: Json.createObjectNode();
transformForBqSchema((ObjectNode) value, field.getSubFields(), props);
if (!Json.isNullOrEmpty(props)) {
additionalProperties.set(jsonFieldName, props);
}
}
updateParent(parent, jsonFieldName, bqFieldName, value);
}
// Likewise, we need to recursively call transformForBqSchema on repeated record types.
} else if (field.getType() == LegacySQLTypeName.RECORD
&& field.getMode() == Field.Mode.REPEATED) {
ArrayNode repeatedAdditionalProperties = Json.createArrayNode();
if (Streams.stream(value).allMatch(JsonNode::isArray)) {
// Tuples cannot be transformed in place, instead each element of the parent
// array will need to reference a new transformed object.
ArrayNode records = (ArrayNode) value;
for (int i = 0; i < records.size(); i++) {
final ObjectNode props = additionalProperties == null ? null : Json.createObjectNode();
records.set(i, processTupleField(jsonFieldName, field.getSubFields(),
(ArrayNode) records.get(i), props));
if (!Json.isNullOrEmpty(props)) {
repeatedAdditionalProperties.add(props.get(jsonFieldName));
} else {
repeatedAdditionalProperties.addObject();
}
}
} else {
ArrayNode filteredValue = Json.createArrayNode();
for (JsonNode record : value) {
final ObjectNode props = additionalProperties == null ? null : Json.createObjectNode();
if (record.isObject()) {
filteredValue.add(record);
transformForBqSchema((ObjectNode) record, field.getSubFields(), props);
if (!Json.isNullOrEmpty(props)) {
repeatedAdditionalProperties.add(props);
} else {
repeatedAdditionalProperties.addObject();
}
} else {
// BigQuery only allows objects in this array, so we insert an empty object instead.
filteredValue.addObject();
repeatedAdditionalProperties.add(record);
}
}
value = filteredValue;
}
if (!Streams.stream(repeatedAdditionalProperties).allMatch(EMPTY_OBJECT::equals)) {
additionalProperties.set(jsonFieldName, repeatedAdditionalProperties);
}
updateParent(parent, jsonFieldName, bqFieldName, value);
// If we've made it here, we have a basic type or a list of basic types.
} else {
final Optional<JsonNode> coerced = coerceToBqType(value, field);
// use coerced.orElse(null) to remove the field via updateParent if necessary
updateParent(parent, jsonFieldName, bqFieldName, coerced.orElse(null));
// If coerced is not present that means the actual type didn't match expected and we don't
// define a coercion. We put the value to additional_properties instead.
if (!coerced.isPresent() && additionalProperties != null) {
additionalProperties.set(jsonFieldName, value);
}
}
}