in samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala [39:98]
def toBytes(obj: Object, serializerName: String) = serdes
.getOrElse(serializerName, throw new SamzaException("No serde defined for %s" format serializerName))
.toBytes(obj)
def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)
|| envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getKey
} else if (envelope.getMessage.isInstanceOf[ControlMessage]
&& controlMessageKeySerdes.contains(envelope.getSystemStream)) {
// If the message is a control message and the key needs to serialize
controlMessageKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey.asInstanceOf[String])
} else if (envelope.getKeySerializerName != null) {
// If a serde is defined, use it.
toBytes(envelope.getKey, envelope.getKeySerializerName)
} else if (systemStreamKeySerdes.contains(envelope.getSystemStream)) {
// If the stream has a serde defined, use it.
systemStreamKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey)
} else if (systemKeySerdes.contains(envelope.getSystemStream.getSystem)) {
// If the system has a serde defined, use it.
systemKeySerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getKey)
} else {
// Just use the object.
envelope.getKey
}
val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)
|| envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getMessage
} else if (intermediateMessageSerdes.contains(envelope.getSystemStream)) {
// If the stream is an intermediate stream, use the intermediate message serde
intermediateMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
} else if (envelope.getMessageSerializerName != null) {
// If a serde is defined, use it.
toBytes(envelope.getMessage, envelope.getMessageSerializerName)
} else if (systemStreamMessageSerdes.contains(envelope.getSystemStream)) {
// If the stream has a serde defined, use it.
systemStreamMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
} else if (systemMessageSerdes.contains(envelope.getSystemStream.getSystem)) {
// If the system has a serde defined, use it.
systemMessageSerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getMessage)
} else {
// Just use the object.
envelope.getMessage
}
if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
envelope
} else {
new OutgoingMessageEnvelope(
envelope.getSystemStream,
null,
null,
envelope.getPartitionKey,
key,
message)
}
}