in ingestion-beam/src/main/java/com/mozilla/telemetry/republisher/RepublishPerChannel.java [26:53]
public PDone expand(PCollection<PubsubMessage> input) {
List<Destination> destinations = baseOptions.getPerChannelSampleRatios().entrySet().stream() //
.map(Destination::new) //
.collect(Collectors.toList());
int numDestinations = destinations.size();
int numPartitions = numDestinations + 1;
PCollectionList<PubsubMessage> partitioned = input.apply("PartitionByChannel",
Partition.of(numPartitions, new PartitionFn(destinations)));
for (int i = 0; i < numDestinations; i++) {
Destination destination = destinations.get(i);
RepublisherOptions.Parsed opts = baseOptions.as(RepublisherOptions.Parsed.class);
opts.setOutput(
baseOptions.getPerChannelDestination().replace("${channel}", destination.channel));
partitioned.get(i) //
.apply("Sample" + destination.getCapitalizedChannel() + "BySampleIdOrRandomNumber",
Filter.by(message -> {
message = PubsubConstraints.ensureNonNull(message);
String sampleId = message.getAttribute("sample_id");
return RandomSampler.filterBySampleIdOrRandomNumber(sampleId, destination.ratio);
}))
.apply("Republish" + destination.getCapitalizedChannel() + "Sample",
opts.getOutputType().write(opts));
}
return PDone.in(input.getPipeline());
}