in ingestion-beam/src/main/java/com/mozilla/telemetry/IpPrivacyDecoder.java [57:87]
public static PipelineResult run(IpPrivacyDecoderOptions.Parsed options) {
final Pipeline pipeline = Pipeline.create(options);
final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();
// We wrap pipeline in Optional for more convenience in chaining together transforms.
Optional.of(pipeline) //
.map(p -> p //
.apply(options.getInputType().read(options)) //
.apply(ParseUri.of()).failuresTo(errorCollections) //
.apply("RestrictToMainPings",
Filter
.by((message) -> "main".equals(message.getAttribute(Attribute.DOCUMENT_TYPE))))
.apply(ParseProxy.of(null)) //
.apply(ParseIp.of()) //
.apply(GeoCityLookup.of(options.getGeoCityDatabase(), options.getGeoCityFilter())) //
.apply(DecompressPayload.enabled(options.getDecompressInputPayloads())) //
.apply(ExtractClientIdAndDropPayload.of()).failuresTo(errorCollections) //
.apply(HashClientInfo.of(options.getClientIdHashKey(), options.getClientIpHashKey())) //
.apply(NormalizeAttributes.of())) //
.map(p -> p //
.apply(RemoveAttributes.of()) //
.apply(options.getOutputType().write(options)).failuresTo(errorCollections));
// Write error output collections.
PCollectionList.of(errorCollections) //
.apply("FlattenErrorCollections", Flatten.pCollections()) //
.apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
.output();
return pipeline.run();
}