private void generatePublishLoad()

in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/Prober.java [688:741]


  private void generatePublishLoad() {
    if (noPublish) return;
    logger.log(Level.INFO, "Beginning publishing");
    generatePublishesFuture =
        executor.scheduleAtFixedRate(
            () -> {
              try {
                DateTime sendTime = DateTime.now();
                String messageSequenceNumber = Long.toString(publishedMessageCount++);
                boolean filteredOut = r.nextDouble() < messageFilteredProbability;
                // The maximum message size allowed by the service is 10 MB.
                int nextMessageSize =
                    messageSize <= 0 ? max(1, (int) (10000000 * r.nextDouble())) : messageSize;
                byte[] bytes = new byte[nextMessageSize];
                r.nextBytes(bytes);
                PubsubMessage builder =
                    PubsubMessage.newBuilder()
                        .setData(ByteString.copyFrom(bytes))
                        .putAttributes(MESSAGE_SEQUENCE_NUMBER_KEY, messageSequenceNumber)
                        .putAttributes(FILTERED_ATTRIBUTE, Boolean.toString(filteredOut))
                        .putAttributes(INSTANCE_ATTRIBUTE, instanceId)
                        .build();
                if (!filteredOut) {
                  messageSendTime.put(messageSequenceNumber, sendTime);
                }
                ApiFuture<String> publishFuture = publish(publisher, builder, filteredOut);
                publishFuture.addListener(
                    () -> {
                      try {
                        DateTime publishAckTime = DateTime.now();
                        Duration publishLatency = new Duration(sendTime, publishAckTime);
                        String messageId = publishFuture.get();
                        logger.fine(
                            "Published " + messageId + " in " + publishLatency.getMillis() + "ms");
                        long currentPublishCount = publishCount.incrementAndGet();
                        if (currentPublishCount % 1000 == 0) {
                          logger.info(
                              String.format(
                                  "Successfully published %d messages.", currentPublishCount));
                        }
                      } catch (InterruptedException | ExecutionException e) {
                        logger.log(Level.WARNING, "Failed to publish", e);
                        messageSendTime.remove(messageSequenceNumber);
                      }
                    },
                    executor);
              } catch (RuntimeException e) {
                logger.log(Level.WARNING, "Failed to publish", e);
              }
            },
            0,
            publishFrequency,
            MICROSECONDS);
  }