private void createStreamingPullSubscribers()

in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/Prober.java [539:579]


  private void createStreamingPullSubscribers() {
    for (int i = 0; i < subscriberCount; ++i) {
      try {
        final int index = i;
        MessageReceiver receiver =
            new MessageReceiver() {
              @Override
              public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                DateTime received = DateTime.now();
                boolean ack = checkAndProcessMessage(message, index);
                if (ackDelayMilliseconds == 0) {
                  ackNackMessage(ack, received, consumer);
                } else {
                  awaitingAckFutures.add(
                      executor.schedule(
                          () -> ackNackMessage(ack, received, consumer),
                          ackDelayMilliseconds,
                          MILLISECONDS));
                }
              }
            };
        FlowControlSettings flowControlSettings =
            FlowControlSettings.newBuilder()
                .setMaxOutstandingElementCount(subscriberMaxOutstandingMessageCount)
                .setMaxOutstandingRequestBytes(subscriberMaxOutstandingBytes)
                .build();
        Subscriber.Builder builder =
            Subscriber.newBuilder(fullSubscriptionName, receiver)
                .setParallelPullCount(subscriberStreamCount)
                .setFlowControlSettings(flowControlSettings)
                .setEndpoint(endpoint);;
        builder = updateSubscriberBuilder(builder);
        Subscriber subscriber = builder.build();
        subscribers[i] = subscriber;
        subscriber.startAsync().awaitRunning();
        logger.log(Level.INFO, "Created Subscriber");
      } catch (RuntimeException e) {
        logger.log(Level.WARNING, "Failed to create subscriber for " + fullSubscriptionName, e);
      }
    }
  }