in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParsePayload.java [271:333]
private void addAttributesFromPayload(Map<String, String> attributes, ObjectNode json,
PipelineMetadataStore.PipelineMetadata meta) {
// Try to get glean-style client_info object.
JsonNode gleanClientInfo = getGleanClientInfo(json);
// Try to get "common ping"-style os object.
JsonNode commonPingOs = json.path("environment").path("system").path("os");
if (gleanClientInfo.isObject()) {
// See glean ping structure in:
// https://github.com/mozilla-services/mozilla-pipeline-schemas/blob/da4a1446efd948399eb9eade22f6fcbc5557f588/schemas/glean/baseline/baseline.1.schema.json
Optional.ofNullable(gleanClientInfo.path("app_channel").textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.APP_UPDATE_CHANNEL, v));
Optional.ofNullable(gleanClientInfo.path(Attribute.OS).textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.OS, v));
Optional.ofNullable(gleanClientInfo.path(Attribute.OS_VERSION).textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v))
.ifPresent(v -> attributes.put(Attribute.OS_VERSION, v));
} else if (commonPingOs.isObject()) {
// See common ping structure in:
// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/common-ping.html
Optional.ofNullable(commonPingOs.path("name").textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.OS, v));
Optional.ofNullable(commonPingOs.path("version").textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.OS_VERSION, v));
} else {
// Try to extract "activity-stream"-style values.
Optional.ofNullable(json.path("release_channel").textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.APP_UPDATE_CHANNEL, v));
// Try to extract "core ping"-style values; see
// https://github.com/mozilla-services/mozilla-pipeline-schemas/blob/da4a1446efd948399eb9eade22f6fcbc5557f588/schemas/telemetry/core/core.10.schema.json
Optional.ofNullable(json.path(Attribute.OS).textValue()) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.OS, v));
Optional.ofNullable(json.path("osversion")) //
.map(JsonNode::textValue) //
.filter(v -> !Strings.isNullOrEmpty(v)) //
.ifPresent(v -> attributes.put(Attribute.OS_VERSION, v));
}
addClientIdFromPayload(attributes, json);
// Add sample_id, by hashing an attribute or payload path that contains a UUID, ignore any
// attribute or path that is not a valid UUID, and when both are available prefer the attribute
String uuidAttribute = meta.sample_id_source_uuid_attribute();
List<String> uuidPayloadPath = meta.sample_id_source_uuid_payload_path();
if (uuidAttribute == null && uuidPayloadPath == null) {
// default to using client_id
uuidAttribute = Attribute.CLIENT_ID;
}
Stream.of(uuidAttribute == null ? null : attributes.get(uuidAttribute), //
uuidPayloadPath == null ? null : jsonPathText(json, uuidPayloadPath)) //
.map(ParsePayload::normalizeUuid) //
.filter(Objects::nonNull) //
.findFirst() //
.ifPresent(v -> attributes.put(Attribute.SAMPLE_ID, Long.toString(calculateSampleId(v))));
}