protected boolean processMessage()

in ordering-keys-prober/src/main/java/com/google/cloud/pubsub/prober/OrderedProber.java [194:325]


  protected boolean processMessage(PubsubMessage message, int subscriberIndex) {
    boolean ack = super.processMessage(message, subscriberIndex);

    Integer orderingKey = null;
    try {
      orderingKey = Integer.parseInt(message.getOrderingKey());
    } catch (NumberFormatException e) {
      logger.log(
          Level.WARNING,
          String.format(
              "Received message with bad ordering key %s with sequence number %s and ID %s",
              message.getOrderingKey(),
              message.getAttributes().get(MESSAGE_SEQUENCE_NUMBER_KEY),
              message.getMessageId()),
          e);
    }
    Long sequenceNum = null;
    try {
      sequenceNum = Long.parseLong(message.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY));
    } catch (NumberFormatException e) {
      logger.log(
          Level.WARNING,
          String.format(
              "Received message with bad ordering key sequence number %s with sequence number %s and"
                  + " ID %s",
              message.getAttributes().get(ORDERING_KEY_SEQUENCE_NUMBER_KEY),
              message.getAttributes().get(MESSAGE_SEQUENCE_NUMBER_KEY),
              message.getMessageId()),
          e);
    }
    if (orderingKey == null || sequenceNum == null) {
      return ack;
    }

    // Affinity changes are expected to happen, but it is still useful to track when they happen.
    Integer previousSubscriber = orderingKeyAffinity.put(orderingKey, subscriberIndex);
    if (previousSubscriber != null && !previousSubscriber.equals(subscriberIndex)) {
      logger.info(
          String.format(
              "Affinity change for key %d from %d to %d",
              orderingKey, previousSubscriber, subscriberIndex));
    }

    Long largestSequenceNum = largestReceivedMessagePerKey.get(orderingKey);
    if (largestSequenceNum == null || sequenceNum > largestSequenceNum) {
      largestReceivedMessagePerKey.put(orderingKey, sequenceNum);
      if (largestSequenceNum != null && sequenceNum > largestSequenceNum + 1) {
        logger.log(
            Level.SEVERE,
            String.format(
                "Out-of-order largest sequence number for ordering key %d: max so far %d, got %d"
                    + " (messageID: %s)",
                orderingKey, largestSequenceNum, sequenceNum, message.getMessageId()));
      }
    }

    EvictingQueue<DeliveredMessage> lastMessages =
        lastMessagesPerKey.computeIfAbsent(
            orderingKey, (Integer k) -> EvictingQueue.create(deliveryHistoryCount));
    DeliveredMessage deliveredMessage =
        new DeliveredMessage(sequenceNum, message.getMessageId(), subscriberIndex);
    lastMessages.add(deliveredMessage);

    DeliveredMessage previousDeliveredMessage =
        lastReceivedMessagePerKey.put(orderingKey, deliveredMessage);
    if (previousDeliveredMessage != null && sequenceNum <= previousDeliveredMessage.sequenceNum) {
      if (messageIdsMatch(sequenceNum, message.getMessageId(), lastMessages)) {
        logger.info(
            String.format(
                "Redelivery of message %d for ordering key %d (messageID: %s)",
                sequenceNum, orderingKey, message.getMessageId()));
      } else {
        // If the messageID does not match the ID already seen for this sequence number, it was a
        // duplicate publish and therefore we should not reset, so put back the previous sequence
        // number.
        lastReceivedMessagePerKey.put(orderingKey, previousDeliveredMessage);
        logger.info(
            String.format(
                "Unexpected message ID %s for ordering key %d with sequence number %d. This is likely"
                    + " a message published twice; skipping message.",
                message.getMessageId(), orderingKey, sequenceNum));
      }
    } else if (previousDeliveredMessage != null
        && sequenceNum > previousDeliveredMessage.sequenceNum + 1) {
      boolean messageIdsMatch = messageIdsMatch(sequenceNum, message.getMessageId(), lastMessages);
      boolean subscribersMatch = subscriberIndex == previousDeliveredMessage.subscriberIndex;
      if (messageIdsMatch && subscribersMatch) {
        logger.log(
            Level.SEVERE,
            String.format(
                "Out-of-order delivery for ordering key %d: expected %d, got %d (messageID: %s)",
                orderingKey,
                previousDeliveredMessage.sequenceNum + 1,
                sequenceNum,
                message.getMessageId()));

        // Ensure that the list of messages appears in the log without being inter-mingled with
        // other lists that could print out at the same time.
        synchronized (logger) {
          logger.info(
              String.format(
                  "Last %d messages for ordering key %s", deliveryHistoryCount, orderingKey));
          for (DeliveredMessage dm : lastMessages) {
            logger.info(String.format("    %s", dm));
          }
        }
      } else if (!messageIdsMatch) {
        // If the messageID does not match the ID already seen for this sequence number, it was a
        // duplicate publish and should not be considered an out-of-order delivery.
        logger.info(
            String.format(
                "Unexpected message ID %s for ordering key %d with sequence number %d. This is likely"
                    + " a message published twice; skipping message.",
                message.getMessageId(), orderingKey, sequenceNum));
      } else {
        // The subscriber index does not match. This means the out-of-order message is left over
        // from delivery when the server lost track of outstanding messages and shifted affinity.
        // It is expected that this could happen and should not be considered an out-of-order
        // delivery.
        logger.info(
            String.format(
                "Affinity change on key %d resulted in unexpected message: expected %d, got %d"
                    + " (messageID: %s) ",
                orderingKey,
                previousDeliveredMessage.sequenceNum + 1,
                sequenceNum,
                message.getMessageId()));
      }
    }

    return ack;
  }