public static PipelineResult run()

in ingestion-beam/src/main/java/com/mozilla/telemetry/AmplitudePublisher.java [66:110]


  public static PipelineResult run(AmplitudePublisherOptions.Parsed options) {
    final Pipeline pipeline = Pipeline.create(options);
    final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();

    PCollection<PubsubMessage> messages = pipeline //
        .apply(options.getInputType().read(options)) //
        .apply(FilterByDocType.of(options.getAllowedDocTypes(), options.getAllowedNamespaces()));

    // Random sample.
    if (options.getRandomSampleRatio() != null) {
      final Double ratio = options.getRandomSampleRatio();
      messages //
          .apply("SampleBySampleIdOrRandomNumber", Filter.by(message -> {
            message = PubsubConstraints.ensureNonNull(message);
            String sampleId = message.getAttribute(Attribute.SAMPLE_ID);
            return RandomSampler.filterBySampleIdOrRandomNumber(sampleId, ratio);
          })).apply("RepublishRandomSample", options.getOutputType().write(options));
    }

    final Map<String, String> apiKeys = readAmplitudeApiKeysFromFile(options.getApiKeys());

    PCollection<KV<String, Iterable<AmplitudeEvent>>> events = messages
        .apply(DecompressPayload.enabled(options.getDecompressInputPayloads())
            .withClientCompressionRecorded())
        .apply(ParseAmplitudeEvents.of(options.getEventsAllowList())).failuresTo(errorCollections)
        .apply(WithKeys.of((AmplitudeEvent event) -> event.getPlatform())) //
        .setCoder(KvCoder.of(StringUtf8Coder.of(), AmplitudeEvent.getCoder())) //
        // events from same app can be batched and sent to Amplitude in one request
        .apply(GroupIntoBatches.<String, AmplitudeEvent>ofSize(options.getMaxEventBatchSize()) //
            .withMaxBufferingDuration(Duration.standardSeconds(options.getMaxBufferingDuration())))
        .apply(SendRequest.of(apiKeys, options.getReportingEnabled(), //
            options.getMaxBatchesPerSecond()))
        .failuresTo(errorCollections); //

    // Note that there is no write step here for "successes"
    // since the purpose of this job is sending to the Amplitude API.

    // Write error output collections.
    PCollectionList.of(errorCollections) //
        .apply("FlattenErrorCollections", Flatten.pCollections()) //
        .apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
        .output();

    return pipeline.run();
  }