in samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala [346:384]
def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
checkpoint match {
case checkpointV1: CheckpointV1 => {
val key = new KafkaCheckpointLogKey(
KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE, taskName, expectedGrouperFactory)
val keyBytes = try {
checkpointKeySerde.toBytes(key)
} catch {
case e: Exception =>
throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
}
val msgBytes = try {
checkpointV1MsgSerde.toBytes(checkpointV1)
} catch {
case e: Exception =>
throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
}
new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
}
case checkpointV2: CheckpointV2 => {
val key = new KafkaCheckpointLogKey(
KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE, taskName, expectedGrouperFactory)
val keyBytes = try {
checkpointKeySerde.toBytes(key)
} catch {
case e: Exception =>
throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
}
val msgBytes = try {
checkpointV2MsgSerde.toBytes(checkpointV2)
} catch {
case e: Exception =>
throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
}
new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
}
case _ => throw new SamzaException("Unknown checkpoint version: " + checkpoint.getVersion)
}
}