private def readCheckpoints()

in samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala [246:323]


  private def readCheckpoints(): Map[TaskName, Checkpoint] = {
    val checkpoints = mutable.Map[TaskName, Checkpoint]()
    val checkpointAppendTime = mutable.Map[TaskName, Long]()

    val iterator = new SystemStreamPartitionIterator(systemConsumer, checkpointSsp)
    var numMessagesRead = 0

    while (iterator.hasNext) {
      val checkpointEnvelope: IncomingMessageEnvelope = iterator.next
      // Kafka log append time for the checkpoint message
      val checkpointEnvelopeTs = checkpointEnvelope.getEventTime;
      val offset = checkpointEnvelope.getOffset

      numMessagesRead += 1
      if (numMessagesRead % 1000 == 0) {
        info(s"Read $numMessagesRead from topic: $checkpointTopic. Current offset: $offset")
      }

      val keyBytes = checkpointEnvelope.getKey.asInstanceOf[Array[Byte]]
      if (keyBytes == null) {
        throw new SamzaException("Encountered a checkpoint message with null key. Topic:$checkpointTopic " +
          s"Offset:$offset")
      }

      val checkpointKey = try {
        checkpointKeySerde.fromBytes(keyBytes)
      } catch {
        case e: Exception => if (validateCheckpoint) {
          throw new SamzaException(s"Exception while deserializing checkpoint-key. " +
            s"Topic: $checkpointTopic Offset: $offset", e)
        } else {
          warn(s"Ignoring exception while deserializing checkpoint-key. Topic: $checkpointTopic Offset: $offset", e)
          null
        }
      }

      if (checkpointKey != null) {
        // If the grouper in the key is not equal to the configured grouper, error out.
        val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
        if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
          warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: $actualGrouperFactory ")
          if (validateCheckpoint) {
            throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" +
              s"Configured value: $expectedGrouperFactory; Actual value: $actualGrouperFactory Offset: $offset")
          }
        }

        val msgBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
        try {
          // if checkpoint key version does not match configured checkpoint version to read, skip the message.
          if (checkpointReadVersions.contains(
            KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) {
            val taskName = checkpointKey.getTaskName
            if (!checkpoints.contains(taskName) ||
              shouldOverrideCheckpoint(checkpoints.get(taskName), checkpointKey, checkpointAppendTime.get(taskName),
                checkpointEnvelopeTs)) {
              checkpoints.put(taskName, deserializeCheckpoint(checkpointKey, msgBytes))
              checkpointAppendTime.put(taskName, checkpointEnvelopeTs)
            } // else ignore the de-prioritized checkpoint
          } else {
            // Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case
            // for forwards compatibility with new checkpoints versions in the checkpoint topic
            warn(s"Ignoring unknown checkpoint key type for checkpoint key: $checkpointKey")
          }
        } catch {
          case e: Exception =>
            if (validateCheckpoint) {
              throw new SamzaException(s"Exception while deserializing checkpoint-message. " +
                s"Topic: $checkpointTopic Offset: $offset", e)
            } else {
              warn(s"Ignoring exception while deserializing checkpoint-msg. Topic: $checkpointTopic Offset: $offset", e)
            }
        }
      }
    }
    info(s"Read $numMessagesRead messages from system:$checkpointSystem topic:$checkpointTopic")
    checkpoints.toMap
  }