in samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java [275:349]
private void configureSerdes(Map<String, String> configs, Map<String, StreamEdge> inEdges, Map<String, StreamEdge> outEdges,
List<StoreDescriptor> stores, Collection<String> tables, JobNode jobNode) {
// collect all key and msg serde instances for streams
Map<String, Serde> streamKeySerdes = new HashMap<>();
Map<String, Serde> streamMsgSerdes = new HashMap<>();
inEdges.keySet().forEach(streamId ->
addSerdes(jobNode.getInputSerdes(streamId), streamId, streamKeySerdes, streamMsgSerdes));
outEdges.keySet().forEach(streamId ->
addSerdes(jobNode.getOutputSerde(streamId), streamId, streamKeySerdes, streamMsgSerdes));
Map<String, Serde> storeKeySerdes = new HashMap<>();
Map<String, Serde> storeMsgSerdes = new HashMap<>();
stores.forEach(storeDescriptor -> {
storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
});
Map<String, Serde> tableKeySerdes = new HashMap<>();
Map<String, Serde> tableMsgSerdes = new HashMap<>();
tables.forEach(tableId -> {
addSerdes(jobNode.getTableSerdes(tableId), tableId, tableKeySerdes, tableMsgSerdes);
});
// for each unique stream or store serde instance, generate a unique name and serialize to config
HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
serdes.addAll(streamMsgSerdes.values());
serdes.addAll(storeKeySerdes.values());
serdes.addAll(storeMsgSerdes.values());
serdes.addAll(tableKeySerdes.values());
serdes.addAll(tableMsgSerdes.values());
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
Base64.Encoder base64Encoder = Base64.getEncoder();
Map<Serde, String> serdeUUIDs = new HashMap<>();
serdes.forEach(serde -> {
String serdeName = serdeUUIDs.computeIfAbsent(serde,
s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE, serdeName),
base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
});
// set key and msg serdes for streams to the serde names generated above
streamKeySerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX, streamId);
String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE;
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
streamMsgSerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX, streamId);
String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE;
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
// set key and msg serdes for stores to the serde names generated above
storeKeySerdes.forEach((storeName, serde) -> {
String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE, storeName);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
storeMsgSerdes.forEach((storeName, serde) -> {
String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE, storeName);
configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
});
// set key and msg serdes for stores to the serde names generated above
tableKeySerdes.forEach((tableId, serde) -> {
String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
tableMsgSerdes.forEach((tableId, serde) -> {
String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
}