private void doPullIteration()

in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/Prober.java [581:630]


  private void doPullIteration(int subscriberIndex) {
    PullRequest pullRequest =
        PullRequest.newBuilder()
            .setSubscription(fullSubscriptionName.toString())
            .setMaxMessages(maxPullMessages)
            .build();
    ApiFuture<PullResponse> pullResponseFuture =
        pullSubscribers[subscriberIndex].pullCallable().futureCall(pullRequest);
    pullResponseFuture.addListener(
        () -> {
          PullResponse pullResponse = null;
          try {
            pullResponse = pullResponseFuture.get();
          } catch (InterruptedException | ExecutionException e) {
            logger.log(Level.WARNING, "Could not get pull result.", e);
            doPullIteration(subscriberIndex);
            return;
          }
          List<String> messagesToAck = new ArrayList<>();
          List<String> messagesToNack = new ArrayList<>();
          for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
            boolean ack = checkAndProcessMessage(message.getMessage(), subscriberIndex);
            if (ack) {
              messagesToAck.add(message.getAckId());
            } else {
              messagesToNack.add(message.getAckId());
            }
          }
          if (!messagesToAck.isEmpty()) {
            AcknowledgeRequest acknowledgeRequest =
                AcknowledgeRequest.newBuilder()
                    .setSubscription(fullSubscriptionName.toString())
                    .addAllAckIds(messagesToAck)
                    .build();

            pullSubscribers[subscriberIndex].acknowledgeCallable().call(acknowledgeRequest);
          }
          if (!messagesToNack.isEmpty()) {
            ModifyAckDeadlineRequest modAckRequest =
                ModifyAckDeadlineRequest.newBuilder()
                    .setSubscription(fullSubscriptionName.toString())
                    .setAckDeadlineSeconds(0)
                    .addAllAckIds(messagesToNack)
                    .build();
            pullSubscribers[subscriberIndex].modifyAckDeadlineCallable().call(modAckRequest);
          }
          doPullIteration(subscriberIndex);
        },
        executor);
  }