def buildOutgoingMessageEnvelope[T <: Checkpoint]()

in samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala [346:384]


  def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
    checkpoint match {
      case checkpointV1: CheckpointV1 => {
        val key = new KafkaCheckpointLogKey(
          KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE, taskName, expectedGrouperFactory)
        val keyBytes = try {
          checkpointKeySerde.toBytes(key)
        } catch {
          case e: Exception =>
            throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
        }
        val msgBytes = try {
          checkpointV1MsgSerde.toBytes(checkpointV1)
        } catch {
          case e: Exception =>
            throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
        }
        new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
      }
      case checkpointV2: CheckpointV2 => {
        val key = new KafkaCheckpointLogKey(
          KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE, taskName, expectedGrouperFactory)
        val keyBytes = try {
          checkpointKeySerde.toBytes(key)
        } catch {
          case e: Exception =>
            throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
        }
        val msgBytes = try {
          checkpointV2MsgSerde.toBytes(checkpointV2)
        } catch {
          case e: Exception =>
            throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
        }
        new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
      }
      case _ => throw new SamzaException("Unknown checkpoint version: " + checkpoint.getVersion)
    }
  }