public static PipelineResult run()

in ingestion-beam/src/main/java/com/mozilla/telemetry/Republisher.java [43:94]


  public static PipelineResult run(RepublisherOptions.Parsed options) {
    // We aren't decoding payloads, so no need to re-compress when republishing.
    options.setOutputPubsubCompression(Compression.UNCOMPRESSED);

    final Pipeline pipeline = Pipeline.create(options);

    // Trailing comments are used below to prevent re-wrapping by google-java-format.
    PCollection<PubsubMessage> decoded = pipeline //
        .apply(options.getInputType().read(options));

    // Republish debug messages.
    if (options.getEnableDebugDestination()) {
      RepublisherOptions.Parsed opts = options.as(RepublisherOptions.Parsed.class);
      opts.setOutput(options.getDebugDestination());
      decoded //
          .apply("FilterDebugMessages", Filter.by(message -> {
            message = PubsubConstraints.ensureNonNull(message);
            return message.getAttribute("x_debug_id") != null;
          })) //
          .apply("WriteDebugOutput", opts.getOutputType().write(opts));
    }

    // Republish a random sample.
    if (options.getRandomSampleRatio() != null) {
      final Double ratio = options.getRandomSampleRatio();
      RepublisherOptions.Parsed opts = options.as(RepublisherOptions.Parsed.class);
      opts.setOutput(options.getRandomSampleDestination());
      decoded //
          .apply("SampleBySampleIdOrRandomNumber", Filter.by(message -> {
            message = PubsubConstraints.ensureNonNull(message);
            String sampleId = message.getAttribute("sample_id");
            return RandomSampler.filterBySampleIdOrRandomNumber(sampleId, ratio);
          })).apply("RepublishRandomSample", opts.getOutputType().write(opts));
    }

    // Republish to per-docType destinations.
    if (options.getPerDocTypeDestinations() != null) {
      decoded.apply(RepublishPerDocType.of(options));
    }

    // Republish to per-namespace destinations.
    if (options.getPerNamespaceDestinations() != null) {
      decoded.apply(RepublishPerNamespace.of(options));
    }

    // Republish to sampled per-channel destinations.
    if (options.getPerChannelSampleRatios() != null) {
      decoded.apply(RepublishPerChannel.of(options));
    }

    return pipeline.run();
  }