in samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala [100:156]
def fromBytes(bytes: Array[Byte], deserializerName: String) = serdes
.getOrElse(deserializerName, throw new SamzaException("No serde defined for %s" format deserializerName))
.fromBytes(bytes)
def fromBytes(envelope: IncomingMessageEnvelope) = {
val systemStream = envelope.getSystemStreamPartition.getSystemStream
val message = if (changeLogSystemStreams.contains(systemStream)
|| systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getMessage
} else if (intermediateMessageSerdes.contains(systemStream)) {
// If the stream is an intermediate stream, use the intermediate message serde
intermediateMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
} else if (systemStreamMessageSerdes.contains(systemStream)) {
// If the stream has a serde defined, use it.
systemStreamMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
} else if (systemMessageSerdes.contains(systemStream.getSystem)) {
// If the system has a serde defined, use it.
systemMessageSerdes(systemStream.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
} else {
// Just use the object.
envelope.getMessage
}
val key = if (changeLogSystemStreams.contains(systemStream)
|| systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getKey
} else if (message.isInstanceOf[ControlMessage]
&& controlMessageKeySerdes.contains(systemStream)) {
// If the message is a control message and the key needs to deserialize
controlMessageKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
} else if (systemStreamKeySerdes.contains(systemStream)) {
// If the stream has a serde defined, use it.
systemStreamKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
} else if (systemKeySerdes.contains(systemStream.getSystem)) {
// If the system has a serde defined, use it.
systemKeySerdes(systemStream.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
} else {
// Just use the object.
envelope.getKey
}
if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
envelope
} else {
new IncomingMessageEnvelope(
envelope.getSystemStreamPartition,
envelope.getOffset,
key,
message,
envelope.getSize,
envelope.getEventTime(),
envelope.getArrivalTime)
}
}