public static PipelineResult run()

in ingestion-beam/src/main/java/com/mozilla/telemetry/ContextualServicesReporter.java [60:124]


  public static PipelineResult run(ContextualServicesReporterOptions.Parsed options) {
    final Pipeline pipeline = Pipeline.create(options);
    final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();

    PCollection<SponsoredInteraction> requests = pipeline //
        .apply(options.getInputType().read(options)) //
        .apply(FilterByDocType.of(options.getAllowedDocTypes(), options.getAllowedNamespaces(),
            options.getLimitLegacyDesktopVersion())) //
        .apply(VerifyMetadata.of()) //
        .failuresTo(errorCollections) //
        .apply(DecompressPayload.enabled(options.getDecompressInputPayloads())) //
        .apply(ParseReportingUrl.of(options.getUrlAllowList())) //
        .failuresTo(errorCollections) //
        .apply(EmitCounters.of());

    Set<String> individualImpressions = ImmutableSet.of("topsites-impression");
    Set<String> individualClicks = ImmutableSet.of("topsites-click");

    Set<String> unionedDocTypes = Stream
        .concat(individualImpressions.stream(), individualClicks.stream())
        .collect(Collectors.toSet());

    // Perform windowed click counting per context_id, adding a click-status to the reporting URL
    // if the count passes a threshold.
    PCollection<SponsoredInteraction> clicksCountedByContextId = requests
        .apply("FilterClicksPerContextIdDocTypes", Filter.by((interaction) -> individualClicks //
            .contains(interaction.getDerivedDocumentType())))
        .apply(LabelSpikes.perContextId(options.getClickSpikeThreshold(),
            Time.parseDuration(options.getClickSpikeWindowDuration()), TelemetryEventType.CLICK));

    // Perform windowed impression counting per context_id, adding an impression-status to the
    // reporting URL
    // if the count passes a threshold.
    PCollection<SponsoredInteraction> impressionsCountedByContextId = requests
        .apply("FilterImpressionsPerContextIdDocTypes",
            Filter.by((interaction) -> individualImpressions //
                .contains(interaction.getDerivedDocumentType())))
        .apply(LabelSpikes.perContextId(options.getImpressionSpikeThreshold(),
            Time.parseDuration(options.getImpressionSpikeWindowDuration()),
            TelemetryEventType.IMPRESSION));

    // Aggregate impressions.
    PCollection<SponsoredInteraction> aggregatedImpressions = impressionsCountedByContextId
        .apply(AggregateImpressions.of(options.getAggregationWindowDuration()));

    PCollection<SponsoredInteraction> unaggregated = requests.apply("FilterUnaggregatedDocTypes",
        Filter.by((interaction) -> !unionedDocTypes //
            .contains(interaction.getDerivedDocumentType())));

    PCollectionList.of(aggregatedImpressions).and(clicksCountedByContextId).and(unaggregated)
        .apply(Flatten.pCollections())
        .apply(SendRequest.of(options.getReportingEnabled(), options.getLogReportingUrls()))
        .failuresTo(errorCollections);

    // Note that there is no write step here for "successes"
    // since the purpose of this job is sending to an external API.

    // Write error output collections.
    PCollectionList.of(errorCollections) //
        .apply("FlattenErrorCollections", Flatten.pCollections()) //
        .apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
        .output();

    return pipeline.run();
  }