in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParsePayload.java [115:252]
public void processElement(@Element PubsubMessage message, MultiOutputReceiver out) {
try {
message = PubsubConstraints.ensureNonNull(message);
Map<String, String> attributes = new HashMap<>(message.getAttributeMap());
final int submissionBytes = message.getPayload().length;
ObjectNode json;
try {
json = parseTimed(message.getPayload());
} catch (IOException e) {
Map<String, String> attrs = schemaStore.docTypeExists(message.getAttributeMap())
? message.getAttributeMap()
: null; // null attributes will cause docType to show up as "unknown_doc_type"
// in metrics
PerDocTypeCounter.inc(attrs, "error_json_parse");
PerDocTypeCounter.inc(attrs, "error_submission_bytes", submissionBytes);
throw e;
}
// In case this message is being replayed from an error output where AddMetadata has already
// been applied, we strip out any existing metadata fields and put them into attributes.
NestedMetadata.stripPayloadMetadataToAttributes(attributes, json);
// Check the contents of the message, potentially throwing an exception that causes the
// message to be dropped or routed to error output; may also alter the payload to
// redact sensitive fields.
try {
MessageScrubber.scrub(attributes, json);
} catch (MessageShouldBeDroppedException e) {
// This message should go to no output, so we return immediately without writing to output
// receiver.
return;
}
boolean validDocType = schemaStore.docTypeExists(attributes);
if (!validDocType) {
PerDocTypeCounter.inc(null, "error_invalid_doc_type");
PerDocTypeCounter.inc(null, "error_submission_bytes", submissionBytes);
throw new SchemaNotFoundException(String.format("No such docType: %s/%s",
attributes.get("document_namespace"), attributes.get("document_type")));
}
// If no "document_version" attribute was parsed from the URI, this element must be from the
// /submit/telemetry endpoint and we now need to grab version from the payload.
if (!attributes.containsKey(Attribute.DOCUMENT_VERSION)) {
try {
String version = getVersionFromTelemetryPayload(json);
attributes.put(Attribute.DOCUMENT_VERSION, version);
} catch (SchemaNotFoundException e) {
PerDocTypeCounter.inc(attributes, "error_missing_version");
PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
throw e;
}
}
// Throws SchemaNotFoundException if there's no schema
Schema schema;
try {
schema = schemaStore.getSchema(attributes);
} catch (SchemaNotFoundException e) {
PerDocTypeCounter.inc(attributes, "error_schema_not_found");
PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
throw e;
}
try {
validateTimed(schema, json);
} catch (ValidationException e) {
PerDocTypeCounter.inc(attributes, "error_schema_validation");
PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
throw e;
}
final PipelineMetadataStore.PipelineMetadata meta = metadataStore.getSchema(attributes);
try {
deprecationCheck(attributes, meta);
} catch (DeprecatedMessageException e) {
PerDocTypeCounter.inc(attributes, "error_deprecated_message");
PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
throw e;
}
addAttributesFromPayload(attributes, json, meta);
// Optionally split message into multiple messages based on pipeline metadata
final PipelineMetadataStore.SplitConfig splitConfig = meta.split_config();
final List<PubsubMessage> messages = new ArrayList<>();
if (splitConfig == null || splitConfig.preserve_original()) {
// https://github.com/mozilla/gcp-ingestion/issues/780
// We need to be careful to consistently use our util methods (which use Jackson) for
// serializing and de-serializing JSON to reduce the possibility of introducing encoding
// issues. We previously called json.toString().getBytes() here without specifying a
// charset.
messages.add(new PubsubMessage(Json.asBytes(json), attributes));
}
if (splitConfig != null) {
for (PipelineMetadataStore.SplitConfigTarget subsetConfig : splitConfig.subsets()) {
final Map<String, String> subsetAttributes = new HashMap<>(attributes);
subsetAttributes.put(Attribute.DOCUMENT_NAMESPACE, subsetConfig.document_namespace());
subsetAttributes.put(Attribute.DOCUMENT_TYPE, subsetConfig.document_type());
subsetAttributes.put(Attribute.DOCUMENT_VERSION, subsetConfig.document_version());
final ObjectNode subsetJson = Json.createObjectNode();
final ObjectSchema subsetSchema;
try {
subsetSchema = (ObjectSchema) schemaStore.getSchema(subsetAttributes);
} catch (SchemaNotFoundException e) {
PerDocTypeCounter.inc(attributes, "error_schema_not_found");
PerDocTypeCounter.inc(attributes, "error_submission_bytes", submissionBytes);
throw e;
}
jsonMove(json, subsetJson, subsetSchema, true);
messages.add(new PubsubMessage(Json.asBytes(subsetJson), subsetAttributes));
}
if (splitConfig.remainder() != null) {
final Map<String, String> remainderAttributes = new HashMap<>(attributes);
remainderAttributes.put(Attribute.DOCUMENT_NAMESPACE,
splitConfig.remainder().document_namespace());
remainderAttributes.put(Attribute.DOCUMENT_TYPE,
splitConfig.remainder().document_type());
remainderAttributes.put(Attribute.DOCUMENT_VERSION,
splitConfig.remainder().document_version());
messages.add(new PubsubMessage(Json.asBytes(json), remainderAttributes));
}
}
PerDocTypeCounter.inc(attributes, "valid_submission");
PerDocTypeCounter.inc(attributes, "valid_submission_bytes", submissionBytes);
messages.forEach(out.get(outputTag)::output);
} catch (IOException | SchemaNotFoundException | ValidationException
| MessageScrubberException | DeprecatedMessageException e) {
out.get(failureTag)
.output(FailureMessage.of(ParsePayload.class.getSimpleName(), message, e));
}
}