in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/Prober.java [688:741]
private void generatePublishLoad() {
if (noPublish) return;
logger.log(Level.INFO, "Beginning publishing");
generatePublishesFuture =
executor.scheduleAtFixedRate(
() -> {
try {
DateTime sendTime = DateTime.now();
String messageSequenceNumber = Long.toString(publishedMessageCount++);
boolean filteredOut = r.nextDouble() < messageFilteredProbability;
// The maximum message size allowed by the service is 10 MB.
int nextMessageSize =
messageSize <= 0 ? max(1, (int) (10000000 * r.nextDouble())) : messageSize;
byte[] bytes = new byte[nextMessageSize];
r.nextBytes(bytes);
PubsubMessage builder =
PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(bytes))
.putAttributes(MESSAGE_SEQUENCE_NUMBER_KEY, messageSequenceNumber)
.putAttributes(FILTERED_ATTRIBUTE, Boolean.toString(filteredOut))
.putAttributes(INSTANCE_ATTRIBUTE, instanceId)
.build();
if (!filteredOut) {
messageSendTime.put(messageSequenceNumber, sendTime);
}
ApiFuture<String> publishFuture = publish(publisher, builder, filteredOut);
publishFuture.addListener(
() -> {
try {
DateTime publishAckTime = DateTime.now();
Duration publishLatency = new Duration(sendTime, publishAckTime);
String messageId = publishFuture.get();
logger.fine(
"Published " + messageId + " in " + publishLatency.getMillis() + "ms");
long currentPublishCount = publishCount.incrementAndGet();
if (currentPublishCount % 1000 == 0) {
logger.info(
String.format(
"Successfully published %d messages.", currentPublishCount));
}
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.WARNING, "Failed to publish", e);
messageSendTime.remove(messageSequenceNumber);
}
},
executor);
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Failed to publish", e);
}
},
0,
publishFrequency,
MICROSECONDS);
}