def toBytes()

in samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala [39:98]


  def toBytes(obj: Object, serializerName: String) = serdes
    .getOrElse(serializerName, throw new SamzaException("No serde defined for %s" format serializerName))
    .toBytes(obj)

  def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
    val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)
      || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
      envelope.getKey
    } else if (envelope.getMessage.isInstanceOf[ControlMessage]
      && controlMessageKeySerdes.contains(envelope.getSystemStream)) {
      // If the message is a control message and the key needs to serialize
      controlMessageKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey.asInstanceOf[String])
    } else if (envelope.getKeySerializerName != null) {
      // If a serde is defined, use it.
      toBytes(envelope.getKey, envelope.getKeySerializerName)
    } else if (systemStreamKeySerdes.contains(envelope.getSystemStream)) {
      // If the stream has a serde defined, use it.
      systemStreamKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey)
    } else if (systemKeySerdes.contains(envelope.getSystemStream.getSystem)) {
      // If the system has a serde defined, use it.
      systemKeySerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getKey)
    } else {
      // Just use the object.
      envelope.getKey
    }

    val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)
      || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
      envelope.getMessage
    } else if (intermediateMessageSerdes.contains(envelope.getSystemStream)) {
      // If the stream is an intermediate stream, use the intermediate message serde
      intermediateMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
    } else if (envelope.getMessageSerializerName != null) {
      // If a serde is defined, use it.
      toBytes(envelope.getMessage, envelope.getMessageSerializerName)
    } else if (systemStreamMessageSerdes.contains(envelope.getSystemStream)) {
      // If the stream has a serde defined, use it.
      systemStreamMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
    } else if (systemMessageSerdes.contains(envelope.getSystemStream.getSystem)) {
      // If the system has a serde defined, use it.
      systemMessageSerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getMessage)
    } else {
      // Just use the object.
      envelope.getMessage
    }

    if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
      envelope
    } else {
      new OutgoingMessageEnvelope(
        envelope.getSystemStream,
        null,
        null,
        envelope.getPartitionKey,
        key,
        message)
    }
  }