def fromBytes()

in samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala [100:156]


  def fromBytes(bytes: Array[Byte], deserializerName: String) = serdes
    .getOrElse(deserializerName, throw new SamzaException("No serde defined for %s" format deserializerName))
    .fromBytes(bytes)

  def fromBytes(envelope: IncomingMessageEnvelope) = {
    val systemStream = envelope.getSystemStreamPartition.getSystemStream

    val message = if (changeLogSystemStreams.contains(systemStream)
      || systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
      envelope.getMessage
    } else if (intermediateMessageSerdes.contains(systemStream)) {
      // If the stream is an intermediate stream, use the intermediate message serde
      intermediateMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
    } else if (systemStreamMessageSerdes.contains(systemStream)) {
      // If the stream has a serde defined, use it.
      systemStreamMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
    } else if (systemMessageSerdes.contains(systemStream.getSystem)) {
      // If the system has a serde defined, use it.
      systemMessageSerdes(systemStream.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
    } else {
      // Just use the object.
      envelope.getMessage
    }

    val key = if (changeLogSystemStreams.contains(systemStream)
      || systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) {
      // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
      envelope.getKey
    } else if (message.isInstanceOf[ControlMessage]
      && controlMessageKeySerdes.contains(systemStream)) {
      // If the message is a control message and the key needs to deserialize
      controlMessageKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
    } else if (systemStreamKeySerdes.contains(systemStream)) {
      // If the stream has a serde defined, use it.
      systemStreamKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
    } else if (systemKeySerdes.contains(systemStream.getSystem)) {
      // If the system has a serde defined, use it.
      systemKeySerdes(systemStream.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
    } else {
      // Just use the object.
      envelope.getKey
    }

    if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
      envelope
    } else {
      new IncomingMessageEnvelope(
        envelope.getSystemStreamPartition,
        envelope.getOffset,
        key,
        message,
        envelope.getSize,
        envelope.getEventTime(),
        envelope.getArrivalTime)
    }
  }