in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/Prober.java [581:630]
private void doPullIteration(int subscriberIndex) {
PullRequest pullRequest =
PullRequest.newBuilder()
.setSubscription(fullSubscriptionName.toString())
.setMaxMessages(maxPullMessages)
.build();
ApiFuture<PullResponse> pullResponseFuture =
pullSubscribers[subscriberIndex].pullCallable().futureCall(pullRequest);
pullResponseFuture.addListener(
() -> {
PullResponse pullResponse = null;
try {
pullResponse = pullResponseFuture.get();
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.WARNING, "Could not get pull result.", e);
doPullIteration(subscriberIndex);
return;
}
List<String> messagesToAck = new ArrayList<>();
List<String> messagesToNack = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
boolean ack = checkAndProcessMessage(message.getMessage(), subscriberIndex);
if (ack) {
messagesToAck.add(message.getAckId());
} else {
messagesToNack.add(message.getAckId());
}
}
if (!messagesToAck.isEmpty()) {
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(fullSubscriptionName.toString())
.addAllAckIds(messagesToAck)
.build();
pullSubscribers[subscriberIndex].acknowledgeCallable().call(acknowledgeRequest);
}
if (!messagesToNack.isEmpty()) {
ModifyAckDeadlineRequest modAckRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(fullSubscriptionName.toString())
.setAckDeadlineSeconds(0)
.addAllAckIds(messagesToNack)
.build();
pullSubscribers[subscriberIndex].modifyAckDeadlineCallable().call(modAckRequest);
}
doPullIteration(subscriberIndex);
},
executor);
}