in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/OrderedProber.java [136:191]
protected ApiFuture<String> publish(
Publisher publisher, PubsubMessage message, boolean filteredOut) {
if (keyChoiceStrategy == KeyChoiceStrategy.ROUND_ROBIN) {
lastOrderingKey = (lastOrderingKey + 1) % orderingKeyCount;
} else {
lastOrderingKey = r.nextInt(orderingKeyCount);
}
PublishInfo publishInfo =
perKeyPublishInfo.compute(lastOrderingKey, (k, v) -> v == null ? new PublishInfo() : v);
synchronized (publishInfo) {
int publishGeneration = publishInfo.publishGeneration;
PubsubMessage.Builder messageBuilder =
message.toBuilder().setOrderingKey(Long.toString(lastOrderingKey));
if (!filteredOut) {
long nextSequenceNumber = ++publishInfo.lastPublishedSequenceNum;
messageBuilder.putAttributes(
ORDERING_KEY_SEQUENCE_NUMBER_KEY, Long.toString(nextSequenceNumber));
}
PubsubMessage messageToPublish = messageBuilder.build();
ApiFuture<String> publishFuture = publisher.publish(messageToPublish);
publishFuture.addListener(
() -> {
long sequenceNum =
Long.parseLong(
messageToPublish.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY));
synchronized (publishInfo) {
try {
publishFuture.get();
publishInfo.lastSuccessfullyPublishedSequenceNum =
max(publishInfo.lastSuccessfullyPublishedSequenceNum, sequenceNum);
} catch (InterruptedException | ExecutionException e) {
logger.log(
Level.WARNING,
String.format(
"Publish failed for ordering key %s with ordering key sequence number %s and"
+ " sequence number %s. Resetting publisher and republishing messages"
+ " starting after last successfully published sequence number.",
messageToPublish.getOrderingKey(),
messageToPublish.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY),
messageToPublish.getAttributes().get(MESSAGE_SEQUENCE_NUMBER_KEY)),
e);
if (publishInfo.publishGeneration == publishGeneration) {
// If the generation number is the same, then this round of failures has not yet
// been processed, so process it now.
publishInfo.lastPublishedSequenceNum =
publishInfo.lastSuccessfullyPublishedSequenceNum;
++publishInfo.publishGeneration;
publisher.resumePublish(messageToPublish.getOrderingKey());
}
}
}
},
executor);
return publishFuture;
}
}