private void configureSerdes()

in samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java [275:349]


  private void configureSerdes(Map<String, String> configs, Map<String, StreamEdge> inEdges, Map<String, StreamEdge> outEdges,
      List<StoreDescriptor> stores, Collection<String> tables, JobNode jobNode) {
    // collect all key and msg serde instances for streams
    Map<String, Serde> streamKeySerdes = new HashMap<>();
    Map<String, Serde> streamMsgSerdes = new HashMap<>();
    inEdges.keySet().forEach(streamId ->
        addSerdes(jobNode.getInputSerdes(streamId), streamId, streamKeySerdes, streamMsgSerdes));
    outEdges.keySet().forEach(streamId ->
        addSerdes(jobNode.getOutputSerde(streamId), streamId, streamKeySerdes, streamMsgSerdes));

    Map<String, Serde> storeKeySerdes = new HashMap<>();
    Map<String, Serde> storeMsgSerdes = new HashMap<>();
    stores.forEach(storeDescriptor -> {
      storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
      storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
    });

    Map<String, Serde> tableKeySerdes = new HashMap<>();
    Map<String, Serde> tableMsgSerdes = new HashMap<>();
    tables.forEach(tableId -> {
      addSerdes(jobNode.getTableSerdes(tableId), tableId, tableKeySerdes, tableMsgSerdes);
    });

    // for each unique stream or store serde instance, generate a unique name and serialize to config
    HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
    serdes.addAll(streamMsgSerdes.values());
    serdes.addAll(storeKeySerdes.values());
    serdes.addAll(storeMsgSerdes.values());
    serdes.addAll(tableKeySerdes.values());
    serdes.addAll(tableMsgSerdes.values());
    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
    Base64.Encoder base64Encoder = Base64.getEncoder();
    Map<Serde, String> serdeUUIDs = new HashMap<>();
    serdes.forEach(serde -> {
      String serdeName = serdeUUIDs.computeIfAbsent(serde,
        s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
      configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE, serdeName),
          base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
    });

    // set key and msg serdes for streams to the serde names generated above
    streamKeySerdes.forEach((streamId, serde) -> {
      String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX, streamId);
      String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE;
      configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
    });

    streamMsgSerdes.forEach((streamId, serde) -> {
      String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX, streamId);
      String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE;
      configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
    });

    // set key and msg serdes for stores to the serde names generated above
    storeKeySerdes.forEach((storeName, serde) -> {
      String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE, storeName);
      configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
    });

    storeMsgSerdes.forEach((storeName, serde) -> {
      String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE, storeName);
      configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
    });

    // set key and msg serdes for stores to the serde names generated above
    tableKeySerdes.forEach((tableId, serde) -> {
      String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
      configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
    });

    tableMsgSerdes.forEach((tableId, serde) -> {
      String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
      configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
    });
  }