in ingestion-beam/src/main/java/com/mozilla/telemetry/amplitude/ParseAmplitudeEvents.java [261:287]
static List<ObjectNode> extractEvents(ObjectNode payload, String namespace, String docType)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
return StreamSupport.stream(payload.path("events").spliterator(), false).map(event -> {
final ObjectNode result = mapper.createObjectNode();
final JsonNode eventCategory = event.get("category");
final JsonNode eventName = event.get("name");
String eventType = "";
if (eventCategory != null && !eventCategory.isNull()) {
eventType = eventCategory.asText();
if (eventName != null && !eventName.isNull()) {
eventType += "." + eventName.asText();
}
}
if (isEventAllowed(namespace, docType, eventCategory, eventName)) {
result.put("event_type", eventType);
result.put("event_extras", event.get("extra"));
result.put("timestamp", event.get(Attribute.TIMESTAMP));
return result;
}
return null;
}).filter(result -> result != null).collect(Collectors.toList());
}