in ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/transform/PubsubMessageToObjectNode.java [389:428]
void transformForBqSchema(ObjectNode parent, List<Field> bqFields,
ObjectNode additionalProperties) {
final Map<String, Field> bqFieldMap = bqFields.stream()
.collect(Collectors.toMap(Field::getName, Function.identity()));
for (String jsonFieldName : Lists.newArrayList(parent.fieldNames())) {
final JsonNode value = parent.get(jsonFieldName);
final String altFieldName = jsonFieldName + "2";
// Determine the equivalent bqFieldName for this jsonField.
final String bqFieldName;
if (bqFieldMap.containsKey(altFieldName)
&& BUG_1737656_METRIC_NAMES.contains(jsonFieldName)) {
// For Glean pings defined before November 2021, we had deployed incorrect types to
// BigQuery tables under the metrics struct for types url, text, jwe, and labeled_rate;
// we thus have to rename these in schemas to url2, text2, jwe2, and labeled_rate2 and
// alter payloads here to match the expected schema.
parent.remove(jsonFieldName);
bqFieldName = altFieldName;
} else if (bqFieldMap.containsKey(jsonFieldName)) {
// The JSON field name already matches a BQ field.
bqFieldName = jsonFieldName;
} else {
// Remove the json field from the parent because it does not match the BQ field name.
parent.remove(jsonFieldName);
// Try cleaning the name to match our BQ conventions.
bqFieldName = getAndCacheBqName(jsonFieldName);
}
// If the field name matches a BQ field name, we process it and add it to the parent,
// otherwise we add it to additionalProperties without renaming.
if (bqFieldMap.containsKey(bqFieldName)) {
processField(jsonFieldName, bqFieldMap.get(bqFieldName), value, parent,
additionalProperties);
} else if (additionalProperties != null) {
additionalProperties.set(jsonFieldName, value);
}
}
}