private void addAttributesFromPayload()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParsePayload.java [271:333]


  private void addAttributesFromPayload(Map<String, String> attributes, ObjectNode json,
      PipelineMetadataStore.PipelineMetadata meta) {
    // Try to get glean-style client_info object.
    JsonNode gleanClientInfo = getGleanClientInfo(json);

    // Try to get "common ping"-style os object.
    JsonNode commonPingOs = json.path("environment").path("system").path("os");

    if (gleanClientInfo.isObject()) {
      // See glean ping structure in:
      // https://github.com/mozilla-services/mozilla-pipeline-schemas/blob/da4a1446efd948399eb9eade22f6fcbc5557f588/schemas/glean/baseline/baseline.1.schema.json
      Optional.ofNullable(gleanClientInfo.path("app_channel").textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.APP_UPDATE_CHANNEL, v));
      Optional.ofNullable(gleanClientInfo.path(Attribute.OS).textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.OS, v));
      Optional.ofNullable(gleanClientInfo.path(Attribute.OS_VERSION).textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v))
          .ifPresent(v -> attributes.put(Attribute.OS_VERSION, v));
    } else if (commonPingOs.isObject()) {
      // See common ping structure in:
      // https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/common-ping.html
      Optional.ofNullable(commonPingOs.path("name").textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.OS, v));
      Optional.ofNullable(commonPingOs.path("version").textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.OS_VERSION, v));
    } else {
      // Try to extract "activity-stream"-style values.
      Optional.ofNullable(json.path("release_channel").textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.APP_UPDATE_CHANNEL, v));

      // Try to extract "core ping"-style values; see
      // https://github.com/mozilla-services/mozilla-pipeline-schemas/blob/da4a1446efd948399eb9eade22f6fcbc5557f588/schemas/telemetry/core/core.10.schema.json
      Optional.ofNullable(json.path(Attribute.OS).textValue()) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.OS, v));
      Optional.ofNullable(json.path("osversion")) //
          .map(JsonNode::textValue) //
          .filter(v -> !Strings.isNullOrEmpty(v)) //
          .ifPresent(v -> attributes.put(Attribute.OS_VERSION, v));
    }

    addClientIdFromPayload(attributes, json);

    // Add sample_id, by hashing an attribute or payload path that contains a UUID, ignore any
    // attribute or path that is not a valid UUID, and when both are available prefer the attribute
    String uuidAttribute = meta.sample_id_source_uuid_attribute();
    List<String> uuidPayloadPath = meta.sample_id_source_uuid_payload_path();
    if (uuidAttribute == null && uuidPayloadPath == null) {
      // default to using client_id
      uuidAttribute = Attribute.CLIENT_ID;
    }
    Stream.of(uuidAttribute == null ? null : attributes.get(uuidAttribute), //
        uuidPayloadPath == null ? null : jsonPathText(json, uuidPayloadPath)) //
        .map(ParsePayload::normalizeUuid) //
        .filter(Objects::nonNull) //
        .findFirst() //
        .ifPresent(v -> attributes.put(Attribute.SAMPLE_ID, Long.toString(calculateSampleId(v))));
  }