in samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala [246:323]
private def readCheckpoints(): Map[TaskName, Checkpoint] = {
val checkpoints = mutable.Map[TaskName, Checkpoint]()
val checkpointAppendTime = mutable.Map[TaskName, Long]()
val iterator = new SystemStreamPartitionIterator(systemConsumer, checkpointSsp)
var numMessagesRead = 0
while (iterator.hasNext) {
val checkpointEnvelope: IncomingMessageEnvelope = iterator.next
// Kafka log append time for the checkpoint message
val checkpointEnvelopeTs = checkpointEnvelope.getEventTime;
val offset = checkpointEnvelope.getOffset
numMessagesRead += 1
if (numMessagesRead % 1000 == 0) {
info(s"Read $numMessagesRead from topic: $checkpointTopic. Current offset: $offset")
}
val keyBytes = checkpointEnvelope.getKey.asInstanceOf[Array[Byte]]
if (keyBytes == null) {
throw new SamzaException("Encountered a checkpoint message with null key. Topic:$checkpointTopic " +
s"Offset:$offset")
}
val checkpointKey = try {
checkpointKeySerde.fromBytes(keyBytes)
} catch {
case e: Exception => if (validateCheckpoint) {
throw new SamzaException(s"Exception while deserializing checkpoint-key. " +
s"Topic: $checkpointTopic Offset: $offset", e)
} else {
warn(s"Ignoring exception while deserializing checkpoint-key. Topic: $checkpointTopic Offset: $offset", e)
null
}
}
if (checkpointKey != null) {
// If the grouper in the key is not equal to the configured grouper, error out.
val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: $actualGrouperFactory ")
if (validateCheckpoint) {
throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" +
s"Configured value: $expectedGrouperFactory; Actual value: $actualGrouperFactory Offset: $offset")
}
}
val msgBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
try {
// if checkpoint key version does not match configured checkpoint version to read, skip the message.
if (checkpointReadVersions.contains(
KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) {
val taskName = checkpointKey.getTaskName
if (!checkpoints.contains(taskName) ||
shouldOverrideCheckpoint(checkpoints.get(taskName), checkpointKey, checkpointAppendTime.get(taskName),
checkpointEnvelopeTs)) {
checkpoints.put(taskName, deserializeCheckpoint(checkpointKey, msgBytes))
checkpointAppendTime.put(taskName, checkpointEnvelopeTs)
} // else ignore the de-prioritized checkpoint
} else {
// Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case
// for forwards compatibility with new checkpoints versions in the checkpoint topic
warn(s"Ignoring unknown checkpoint key type for checkpoint key: $checkpointKey")
}
} catch {
case e: Exception =>
if (validateCheckpoint) {
throw new SamzaException(s"Exception while deserializing checkpoint-message. " +
s"Topic: $checkpointTopic Offset: $offset", e)
} else {
warn(s"Ignoring exception while deserializing checkpoint-msg. Topic: $checkpointTopic Offset: $offset", e)
}
}
}
}
info(s"Read $numMessagesRead messages from system:$checkpointSystem topic:$checkpointTopic")
checkpoints.toMap
}