public static PipelineResult run()

in ingestion-beam/src/main/java/com/mozilla/telemetry/Decoder.java [54:110]


  public static PipelineResult run(DecoderOptions.Parsed options) {
    final Pipeline pipeline = Pipeline.create(options);
    final List<PCollection<PubsubMessage>> failureCollections = new ArrayList<>();

    // We wrap pipeline in Optional for more convenience in chaining together transforms.
    Optional.of(pipeline)

        // Input
        .map(p -> p //
            .apply(options.getInputType().read(options)) //
            // We apply ParseUri without failures here, and add failures later, so that parseProxy
            // can use pipeline metadata to adjust what IP to use for geo lookups.
            .apply("ParseUri", ParseUri.withoutFailures()) //
            // We apply ParseProxy and GeoCityLookup and GeoIspLookup first so that IP
            // address is already removed before any message gets routed to error output; see
            // https://github.com/mozilla/gcp-ingestion/issues/1096
            .apply(ParseProxy.of(options.getSchemasLocation())) //
            .apply(GeoIspLookup.of(options.getGeoIspDatabase())) //
            .apply(GeoCityLookup.of(options.getGeoCityDatabase(), options.getGeoCityFilter())) //
            .apply(DecompressPayload.enabled(options.getDecompressInputPayloads())
                .withClientCompressionRecorded()))

        // Special case: parsing structured telemetry pings submitted from Cloud Logging.
        .map(p -> options.getLogIngestionEnabled() ? p //
            // We first extract IP address from the log entry and remove it from the payload
            // for consistency with the standard flow.
            .apply(ExtractIpFromLogEntry.of()) //
            .apply(GeoIspLookup.of(options.getGeoIspDatabase())) //
            .apply(GeoCityLookup.of(options.getGeoCityDatabase(), options.getGeoCityFilter())) //
            // Now we can parse the log entry and route failures to error output
            .apply(ParseLogEntry.of())//
            .failuresTo(failureCollections) : p)

        // Add parse uri failures separately so that they don't prevent geo lookups
        .map(p -> p //
            .apply("ParseUriAddFailures", ParseUri.addFailures()).failuresTo(failureCollections))

        // Main output
        .map(p -> p //
            // See discussion in https://github.com/mozilla/gcp-ingestion/issues/776
            .apply("LimitPayloadSize", LimitPayloadSize.toMB(8)).failuresTo(failureCollections) //
            .apply("ParsePayload", ParsePayload.of(options.getSchemasLocation())) //
            .failuresTo(failureCollections) //
            .apply(ParseUserAgent.of()) //
            .apply(NormalizeAttributes.of()) //
            .apply(SanitizeAttributes.of(options.getSchemasLocation())) //
            .apply("AddMetadata", AddMetadata.of()).failuresTo(failureCollections) //
            .apply(options.getOutputType().write(options)).failuresTo(failureCollections));

    // Error output
    PCollectionList.of(failureCollections) //
        .apply("FlattenFailureCollections", Flatten.pCollections()) //
        .apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
        .output();

    return pipeline.run();
  }