in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/OrderedProber.java [194:325]
protected boolean processMessage(PubsubMessage message, int subscriberIndex) {
boolean ack = super.processMessage(message, subscriberIndex);
Integer orderingKey = null;
try {
orderingKey = Integer.parseInt(message.getOrderingKey());
} catch (NumberFormatException e) {
logger.log(
Level.WARNING,
String.format(
"Received message with bad ordering key %s with sequence number %s and ID %s",
message.getOrderingKey(),
message.getAttributes().get(MESSAGE_SEQUENCE_NUMBER_KEY),
message.getMessageId()),
e);
}
Long sequenceNum = null;
try {
sequenceNum = Long.parseLong(message.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY));
} catch (NumberFormatException e) {
logger.log(
Level.WARNING,
String.format(
"Received message with bad ordering key sequence number %s with sequence number %s and"
+ " ID %s",
message.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY),
message.getAttributes().get(MESSAGE_SEQUENCE_NUMBER_KEY),
message.getMessageId()),
e);
}
if (orderingKey == null || sequenceNum == null) {
return ack;
}
// Affinity changes are expected to happen, but it is still useful to track when they happen.
Integer previousSubscriber = orderingKeyAffinity.put(orderingKey, subscriberIndex);
if (previousSubscriber != null && !previousSubscriber.equals(subscriberIndex)) {
logger.info(
String.format(
"Affinity change for key %d from %d to %d",
orderingKey, previousSubscriber, subscriberIndex));
}
Long largestSequenceNum = largestReceivedMessagePerKey.get(orderingKey);
if (largestSequenceNum == null || sequenceNum > largestSequenceNum) {
largestReceivedMessagePerKey.put(orderingKey, sequenceNum);
if (largestSequenceNum != null && sequenceNum > largestSequenceNum + 1) {
logger.log(
Level.SEVERE,
String.format(
"Out-of-order largest sequence number for ordering key %d: max so far %d, got %d"
+ " (messageID: %s)",
orderingKey, largestSequenceNum, sequenceNum, message.getMessageId()));
}
}
EvictingQueue<DeliveredMessage> lastMessages =
lastMessagesPerKey.computeIfAbsent(
orderingKey, (Integer k) -> EvictingQueue.create(deliveryHistoryCount));
DeliveredMessage deliveredMessage =
new DeliveredMessage(sequenceNum, message.getMessageId(), subscriberIndex);
lastMessages.add(deliveredMessage);
DeliveredMessage previousDeliveredMessage =
lastReceivedMessagePerKey.put(orderingKey, deliveredMessage);
if (previousDeliveredMessage != null && sequenceNum <= previousDeliveredMessage.sequenceNum) {
if (messageIdsMatch(sequenceNum, message.getMessageId(), lastMessages)) {
logger.info(
String.format(
"Redelivery of message %d for ordering key %d (messageID: %s)",
sequenceNum, orderingKey, message.getMessageId()));
} else {
// If the messageID does not match the ID already seen for this sequence number, it was a
// duplicate publish and therefore we should not reset, so put back the previous sequence
// number.
lastReceivedMessagePerKey.put(orderingKey, previousDeliveredMessage);
logger.info(
String.format(
"Unexpected message ID %s for ordering key %d with sequence number %d. This is likely"
+ " a message published twice; skipping message.",
message.getMessageId(), orderingKey, sequenceNum));
}
} else if (previousDeliveredMessage != null
&& sequenceNum > previousDeliveredMessage.sequenceNum + 1) {
boolean messageIdsMatch = messageIdsMatch(sequenceNum, message.getMessageId(), lastMessages);
boolean subscribersMatch = subscriberIndex == previousDeliveredMessage.subscriberIndex;
if (messageIdsMatch && subscribersMatch) {
logger.log(
Level.SEVERE,
String.format(
"Out-of-order delivery for ordering key %d: expected %d, got %d (messageID: %s)",
orderingKey,
previousDeliveredMessage.sequenceNum + 1,
sequenceNum,
message.getMessageId()));
// Ensure that the list of messages appears in the log without being inter-mingled with
// other lists that could print out at the same time.
synchronized (logger) {
logger.info(
String.format(
"Last %d messages for ordering key %s", deliveryHistoryCount, orderingKey));
for (DeliveredMessage dm : lastMessages) {
logger.info(String.format(" %s", dm));
}
}
} else if (!messageIdsMatch) {
// If the messageID does not match the ID already seen for this sequence number, it was a
// duplicate publish and should not be considered an out-of-order delivery.
logger.info(
String.format(
"Unexpected message ID %s for ordering key %d with sequence number %d. This is likely"
+ " a message published twice; skipping message.",
message.getMessageId(), orderingKey, sequenceNum));
} else {
// The subscriber index does not match. This means the out-of-order message is left over
// from delivery when the server lost track of outstanding messages and shifted affinity.
// It is expected that this could happen and should not be considered an out-of-order
// delivery.
logger.info(
String.format(
"Affinity change on key %d resulted in unexpected message: expected %d, got %d"
+ " (messageID: %s) ",
orderingKey,
previousDeliveredMessage.sequenceNum + 1,
sequenceNum,
message.getMessageId()));
}
}
return ack;
}