in ingestion-beam/src/main/java/com/mozilla/telemetry/Republisher.java [43:94]
public static PipelineResult run(RepublisherOptions.Parsed options) {
// We aren't decoding payloads, so no need to re-compress when republishing.
options.setOutputPubsubCompression(Compression.UNCOMPRESSED);
final Pipeline pipeline = Pipeline.create(options);
// Trailing comments are used below to prevent re-wrapping by google-java-format.
PCollection<PubsubMessage> decoded = pipeline //
.apply(options.getInputType().read(options));
// Republish debug messages.
if (options.getEnableDebugDestination()) {
RepublisherOptions.Parsed opts = options.as(RepublisherOptions.Parsed.class);
opts.setOutput(options.getDebugDestination());
decoded //
.apply("FilterDebugMessages", Filter.by(message -> {
message = PubsubConstraints.ensureNonNull(message);
return message.getAttribute("x_debug_id") != null;
})) //
.apply("WriteDebugOutput", opts.getOutputType().write(opts));
}
// Republish a random sample.
if (options.getRandomSampleRatio() != null) {
final Double ratio = options.getRandomSampleRatio();
RepublisherOptions.Parsed opts = options.as(RepublisherOptions.Parsed.class);
opts.setOutput(options.getRandomSampleDestination());
decoded //
.apply("SampleBySampleIdOrRandomNumber", Filter.by(message -> {
message = PubsubConstraints.ensureNonNull(message);
String sampleId = message.getAttribute("sample_id");
return RandomSampler.filterBySampleIdOrRandomNumber(sampleId, ratio);
})).apply("RepublishRandomSample", opts.getOutputType().write(opts));
}
// Republish to per-docType destinations.
if (options.getPerDocTypeDestinations() != null) {
decoded.apply(RepublishPerDocType.of(options));
}
// Republish to per-namespace destinations.
if (options.getPerNamespaceDestinations() != null) {
decoded.apply(RepublishPerNamespace.of(options));
}
// Republish to sampled per-channel destinations.
if (options.getPerChannelSampleRatios() != null) {
decoded.apply(RepublishPerChannel.of(options));
}
return pipeline.run();
}