public PDone expand()

in ingestion-beam/src/main/java/com/mozilla/telemetry/republisher/RepublishPerChannel.java [26:53]


  public PDone expand(PCollection<PubsubMessage> input) {
    List<Destination> destinations = baseOptions.getPerChannelSampleRatios().entrySet().stream() //
        .map(Destination::new) //
        .collect(Collectors.toList());
    int numDestinations = destinations.size();
    int numPartitions = numDestinations + 1;
    PCollectionList<PubsubMessage> partitioned = input.apply("PartitionByChannel",
        Partition.of(numPartitions, new PartitionFn(destinations)));

    for (int i = 0; i < numDestinations; i++) {
      Destination destination = destinations.get(i);
      RepublisherOptions.Parsed opts = baseOptions.as(RepublisherOptions.Parsed.class);
      opts.setOutput(
          baseOptions.getPerChannelDestination().replace("${channel}", destination.channel));

      partitioned.get(i) //
          .apply("Sample" + destination.getCapitalizedChannel() + "BySampleIdOrRandomNumber",
              Filter.by(message -> {
                message = PubsubConstraints.ensureNonNull(message);
                String sampleId = message.getAttribute("sample_id");
                return RandomSampler.filterBySampleIdOrRandomNumber(sampleId, destination.ratio);
              }))
          .apply("Republish" + destination.getCapitalizedChannel() + "Sample",
              opts.getOutputType().write(opts));
    }

    return PDone.in(input.getPipeline());
  }