in ingestion-beam/src/main/java/com/mozilla/telemetry/AmplitudePublisher.java [66:110]
public static PipelineResult run(AmplitudePublisherOptions.Parsed options) {
final Pipeline pipeline = Pipeline.create(options);
final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();
PCollection<PubsubMessage> messages = pipeline //
.apply(options.getInputType().read(options)) //
.apply(FilterByDocType.of(options.getAllowedDocTypes(), options.getAllowedNamespaces()));
// Random sample.
if (options.getRandomSampleRatio() != null) {
final Double ratio = options.getRandomSampleRatio();
messages //
.apply("SampleBySampleIdOrRandomNumber", Filter.by(message -> {
message = PubsubConstraints.ensureNonNull(message);
String sampleId = message.getAttribute(Attribute.SAMPLE_ID);
return RandomSampler.filterBySampleIdOrRandomNumber(sampleId, ratio);
})).apply("RepublishRandomSample", options.getOutputType().write(options));
}
final Map<String, String> apiKeys = readAmplitudeApiKeysFromFile(options.getApiKeys());
PCollection<KV<String, Iterable<AmplitudeEvent>>> events = messages
.apply(DecompressPayload.enabled(options.getDecompressInputPayloads())
.withClientCompressionRecorded())
.apply(ParseAmplitudeEvents.of(options.getEventsAllowList())).failuresTo(errorCollections)
.apply(WithKeys.of((AmplitudeEvent event) -> event.getPlatform())) //
.setCoder(KvCoder.of(StringUtf8Coder.of(), AmplitudeEvent.getCoder())) //
// events from same app can be batched and sent to Amplitude in one request
.apply(GroupIntoBatches.<String, AmplitudeEvent>ofSize(options.getMaxEventBatchSize()) //
.withMaxBufferingDuration(Duration.standardSeconds(options.getMaxBufferingDuration())))
.apply(SendRequest.of(apiKeys, options.getReportingEnabled(), //
options.getMaxBatchesPerSecond()))
.failuresTo(errorCollections); //
// Note that there is no write step here for "successes"
// since the purpose of this job is sending to the Amplitude API.
// Write error output collections.
PCollectionList.of(errorCollections) //
.apply("FlattenErrorCollections", Flatten.pCollections()) //
.apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
.output();
return pipeline.run();
}