protected ApiFuture publish()

in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/OrderedProber.java [136:191]


  protected ApiFuture<String> publish(
      Publisher publisher, PubsubMessage message, boolean filteredOut) {
    if (keyChoiceStrategy == KeyChoiceStrategy.ROUND_ROBIN) {
      lastOrderingKey = (lastOrderingKey + 1) % orderingKeyCount;
    } else {
      lastOrderingKey = r.nextInt(orderingKeyCount);
    }
    PublishInfo publishInfo =
        perKeyPublishInfo.compute(lastOrderingKey, (k, v) -> v == null ? new PublishInfo() : v);
    synchronized (publishInfo) {
      int publishGeneration = publishInfo.publishGeneration;
      PubsubMessage.Builder messageBuilder =
          message.toBuilder().setOrderingKey(Long.toString(lastOrderingKey));
      if (!filteredOut) {
        long nextSequenceNumber = ++publishInfo.lastPublishedSequenceNum;
        messageBuilder.putAttributes(
            ORDERING_KEY_SEQUENCE_NUMBER_KEY, Long.toString(nextSequenceNumber));
      }
      PubsubMessage messageToPublish = messageBuilder.build();
      ApiFuture<String> publishFuture = publisher.publish(messageToPublish);
      publishFuture.addListener(
          () -> {
            long sequenceNum =
                Long.parseLong(
                    messageToPublish.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY));
            synchronized (publishInfo) {
              try {
                publishFuture.get();
                publishInfo.lastSuccessfullyPublishedSequenceNum =
                    max(publishInfo.lastSuccessfullyPublishedSequenceNum, sequenceNum);
              } catch (InterruptedException | ExecutionException e) {
                logger.log(
                    Level.WARNING,
                    String.format(
                        "Publish failed for ordering key %s with ordering key sequence number %s and"
                            + " sequence number %s. Resetting publisher and republishing messages"
                            + " starting  after last successfully published sequence number.",
                        messageToPublish.getOrderingKey(),
                        messageToPublish.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY),
                        messageToPublish.getAttributes().get(MESSAGE_SEQUENCE_NUMBER_KEY)),
                    e);
                if (publishInfo.publishGeneration == publishGeneration) {
                  // If the generation number is the same, then this round of failures has not yet
                  // been processed, so process it now.
                  publishInfo.lastPublishedSequenceNum =
                      publishInfo.lastSuccessfullyPublishedSequenceNum;
                  ++publishInfo.publishGeneration;
                  publisher.resumePublish(messageToPublish.getOrderingKey());
                }
              }
            }
          },
          executor);
      return publishFuture;
    }
  }