in ingestion-beam/src/main/java/com/mozilla/telemetry/ContextualServicesReporter.java [60:124]
public static PipelineResult run(ContextualServicesReporterOptions.Parsed options) {
final Pipeline pipeline = Pipeline.create(options);
final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();
PCollection<SponsoredInteraction> requests = pipeline //
.apply(options.getInputType().read(options)) //
.apply(FilterByDocType.of(options.getAllowedDocTypes(), options.getAllowedNamespaces(),
options.getLimitLegacyDesktopVersion())) //
.apply(VerifyMetadata.of()) //
.failuresTo(errorCollections) //
.apply(DecompressPayload.enabled(options.getDecompressInputPayloads())) //
.apply(ParseReportingUrl.of(options.getUrlAllowList())) //
.failuresTo(errorCollections) //
.apply(EmitCounters.of());
Set<String> individualImpressions = ImmutableSet.of("topsites-impression");
Set<String> individualClicks = ImmutableSet.of("topsites-click");
Set<String> unionedDocTypes = Stream
.concat(individualImpressions.stream(), individualClicks.stream())
.collect(Collectors.toSet());
// Perform windowed click counting per context_id, adding a click-status to the reporting URL
// if the count passes a threshold.
PCollection<SponsoredInteraction> clicksCountedByContextId = requests
.apply("FilterClicksPerContextIdDocTypes", Filter.by((interaction) -> individualClicks //
.contains(interaction.getDerivedDocumentType())))
.apply(LabelSpikes.perContextId(options.getClickSpikeThreshold(),
Time.parseDuration(options.getClickSpikeWindowDuration()), TelemetryEventType.CLICK));
// Perform windowed impression counting per context_id, adding an impression-status to the
// reporting URL
// if the count passes a threshold.
PCollection<SponsoredInteraction> impressionsCountedByContextId = requests
.apply("FilterImpressionsPerContextIdDocTypes",
Filter.by((interaction) -> individualImpressions //
.contains(interaction.getDerivedDocumentType())))
.apply(LabelSpikes.perContextId(options.getImpressionSpikeThreshold(),
Time.parseDuration(options.getImpressionSpikeWindowDuration()),
TelemetryEventType.IMPRESSION));
// Aggregate impressions.
PCollection<SponsoredInteraction> aggregatedImpressions = impressionsCountedByContextId
.apply(AggregateImpressions.of(options.getAggregationWindowDuration()));
PCollection<SponsoredInteraction> unaggregated = requests.apply("FilterUnaggregatedDocTypes",
Filter.by((interaction) -> !unionedDocTypes //
.contains(interaction.getDerivedDocumentType())));
PCollectionList.of(aggregatedImpressions).and(clicksCountedByContextId).and(unaggregated)
.apply(Flatten.pCollections())
.apply(SendRequest.of(options.getReportingEnabled(), options.getLogReportingUrls()))
.failuresTo(errorCollections);
// Note that there is no write step here for "successes"
// since the purpose of this job is sending to an external API.
// Write error output collections.
PCollectionList.of(errorCollections) //
.apply("FlattenErrorCollections", Flatten.pCollections()) //
.apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
.output();
return pipeline.run();
}