in src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java [146:164]
public void serializerKryoProtobuf(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig();
serializerConfig.setForceKryo(true);
serializerConfig.registerTypeWithKryoSerializer(
org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class,
ProtobufSerializer.class);
serializerConfig.registerTypeWithKryoSerializer(
org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyOperation.class,
ProtobufSerializer.class);
env.addSource(new ProtobufPojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}