public void processElement()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParsePayload.java [115:252]


    public void processElement(@Element PubsubMessage message, MultiOutputReceiver out) {
      try {
        message = PubsubConstraints.ensureNonNull(message);
        Map<String, String> attributes = new HashMap<>(message.getAttributeMap());

        final int submissionBytes = message.getPayload().length;

        ObjectNode json;
        try {
          json = parseTimed(message.getPayload());
        } catch (IOException e) {
          Map<String, String> attrs = schemaStore.docTypeExists(message.getAttributeMap())
              ? message.getAttributeMap()
              : null; // null attributes will cause docType to show up as "unknown_doc_type"
          // in metrics
          PerDocTypeCounter.inc(attrs, "error_json_parse");
          PerDocTypeCounter.inc(attrs, "error_submission_bytes", submissionBytes);
          throw e;
        }

        // In case this message is being replayed from an error output where AddMetadata has already
        // been applied, we strip out any existing metadata fields and put them into attributes.
        NestedMetadata.stripPayloadMetadataToAttributes(attributes, json);

        // Check the contents of the message, potentially throwing an exception that causes the
        // message to be dropped or routed to error output; may also alter the payload to
        // redact sensitive fields.
        try {
          MessageScrubber.scrub(attributes, json);
        } catch (MessageShouldBeDroppedException e) {
          // This message should go to no output, so we return immediately without writing to output
          // receiver.
          return;
        }

        boolean validDocType = schemaStore.docTypeExists(attributes);
        if (!validDocType) {
          PerDocTypeCounter.inc(null, "error_invalid_doc_type");
          PerDocTypeCounter.inc(null, "error_submission_bytes", submissionBytes);
          throw new SchemaNotFoundException(String.format("No such docType: %s/%s",
              attributes.get("document_namespace"), attributes.get("document_type")));
        }

        // If no "document_version" attribute was parsed from the URI, this element must be from the
        // /submit/telemetry endpoint and we now need to grab version from the payload.
        if (!attributes.containsKey(Attribute.DOCUMENT_VERSION)) {
          try {
            String version = getVersionFromTelemetryPayload(json);
            attributes.put(Attribute.DOCUMENT_VERSION, version);
          } catch (SchemaNotFoundException e) {
            PerDocTypeCounter.inc(attributes, "error_missing_version");
            PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
            throw e;
          }
        }

        // Throws SchemaNotFoundException if there's no schema
        Schema schema;
        try {
          schema = schemaStore.getSchema(attributes);
        } catch (SchemaNotFoundException e) {
          PerDocTypeCounter.inc(attributes, "error_schema_not_found");
          PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
          throw e;
        }

        try {
          validateTimed(schema, json);
        } catch (ValidationException e) {
          PerDocTypeCounter.inc(attributes, "error_schema_validation");
          PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
          throw e;
        }

        final PipelineMetadataStore.PipelineMetadata meta = metadataStore.getSchema(attributes);

        try {
          deprecationCheck(attributes, meta);
        } catch (DeprecatedMessageException e) {
          PerDocTypeCounter.inc(attributes, "error_deprecated_message");
          PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
          throw e;
        }

        addAttributesFromPayload(attributes, json, meta);

        // Optionally split message into multiple messages based on pipeline metadata
        final PipelineMetadataStore.SplitConfig splitConfig = meta.split_config();
        final List<PubsubMessage> messages = new ArrayList<>();
        if (splitConfig == null || splitConfig.preserve_original()) {
          // https://github.com/mozilla/gcp-ingestion/issues/780
          // We need to be careful to consistently use our util methods (which use Jackson) for
          // serializing and de-serializing JSON to reduce the possibility of introducing encoding
          // issues. We previously called json.toString().getBytes() here without specifying a
          // charset.
          messages.add(new PubsubMessage(Json.asBytes(json), attributes));
        }

        if (splitConfig != null) {
          for (PipelineMetadataStore.SplitConfigTarget subsetConfig : splitConfig.subsets()) {
            final Map<String, String> subsetAttributes = new HashMap<>(attributes);
            subsetAttributes.put(Attribute.DOCUMENT_NAMESPACE, subsetConfig.document_namespace());
            subsetAttributes.put(Attribute.DOCUMENT_TYPE, subsetConfig.document_type());
            subsetAttributes.put(Attribute.DOCUMENT_VERSION, subsetConfig.document_version());
            final ObjectNode subsetJson = Json.createObjectNode();
            final ObjectSchema subsetSchema;
            try {
              subsetSchema = (ObjectSchema) schemaStore.getSchema(subsetAttributes);
            } catch (SchemaNotFoundException e) {
              PerDocTypeCounter.inc(attributes, "error_schema_not_found");
              PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
              throw e;
            }
            jsonMove(json, subsetJson, subsetSchema, true);
            messages.add(new PubsubMessage(Json.asBytes(subsetJson), subsetAttributes));
          }

          if (splitConfig.remainder() != null) {
            final Map<String, String> remainderAttributes = new HashMap<>(attributes);
            remainderAttributes.put(Attribute.DOCUMENT_NAMESPACE,
                splitConfig.remainder().document_namespace());
            remainderAttributes.put(Attribute.DOCUMENT_TYPE,
                splitConfig.remainder().document_type());
            remainderAttributes.put(Attribute.DOCUMENT_VERSION,
                splitConfig.remainder().document_version());
            messages.add(new PubsubMessage(Json.asBytes(json), remainderAttributes));
          }
        }

        PerDocTypeCounter.inc(attributes, "valid_submission");
        PerDocTypeCounter.inc(attributes, "valid_submission_bytes", submissionBytes);
        messages.forEach(out.get(outputTag)::output);
      } catch (IOException | SchemaNotFoundException | ValidationException
          | MessageScrubberException | DeprecatedMessageException e) {
        out.get(failureTag)
            .output(FailureMessage.of(ParsePayload.class.getSimpleName(), message, e));
      }
    }