public static PipelineResult run()

in ingestion-beam/src/main/java/com/mozilla/telemetry/IpPrivacyDecoder.java [57:87]


  public static PipelineResult run(IpPrivacyDecoderOptions.Parsed options) {
    final Pipeline pipeline = Pipeline.create(options);
    final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();

    // We wrap pipeline in Optional for more convenience in chaining together transforms.
    Optional.of(pipeline) //
        .map(p -> p //
            .apply(options.getInputType().read(options)) //
            .apply(ParseUri.of()).failuresTo(errorCollections) //
            .apply("RestrictToMainPings",
                Filter
                    .by((message) -> "main".equals(message.getAttribute(Attribute.DOCUMENT_TYPE))))
            .apply(ParseProxy.of(null)) //
            .apply(ParseIp.of()) //
            .apply(GeoCityLookup.of(options.getGeoCityDatabase(), options.getGeoCityFilter())) //
            .apply(DecompressPayload.enabled(options.getDecompressInputPayloads())) //
            .apply(ExtractClientIdAndDropPayload.of()).failuresTo(errorCollections) //
            .apply(HashClientInfo.of(options.getClientIdHashKey(), options.getClientIpHashKey())) //
            .apply(NormalizeAttributes.of())) //
        .map(p -> p //
            .apply(RemoveAttributes.of()) //
            .apply(options.getOutputType().write(options)).failuresTo(errorCollections));

    // Write error output collections.
    PCollectionList.of(errorCollections) //
        .apply("FlattenErrorCollections", Flatten.pCollections()) //
        .apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
        .output();

    return pipeline.run();
  }