in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/Prober.java [539:579]
private void createStreamingPullSubscribers() {
for (int i = 0; i < subscriberCount; ++i) {
try {
final int index = i;
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
DateTime received = DateTime.now();
boolean ack = checkAndProcessMessage(message, index);
if (ackDelayMilliseconds == 0) {
ackNackMessage(ack, received, consumer);
} else {
awaitingAckFutures.add(
executor.schedule(
() -> ackNackMessage(ack, received, consumer),
ackDelayMilliseconds,
MILLISECONDS));
}
}
};
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(subscriberMaxOutstandingMessageCount)
.setMaxOutstandingRequestBytes(subscriberMaxOutstandingBytes)
.build();
Subscriber.Builder builder =
Subscriber.newBuilder(fullSubscriptionName, receiver)
.setParallelPullCount(subscriberStreamCount)
.setFlowControlSettings(flowControlSettings)
.setEndpoint(endpoint);;
builder = updateSubscriberBuilder(builder);
Subscriber subscriber = builder.build();
subscribers[i] = subscriber;
subscriber.startAsync().awaitRunning();
logger.log(Level.INFO, "Created Subscriber");
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to create subscriber for " + fullSubscriptionName, e);
}
}
}