in src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java [55:79]
public void twoInputMapSink(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);
// Setting buffer timeout to 1 is an attempt to improve twoInputMapSink benchmark stability.
// Without 1ms buffer timeout, some JVM forks are much slower then others, making results
// unstable and unreliable.
env.setBufferTimeout(1);
long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput));
DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput));
source1.connect(source2)
.transform(
"custom operator",
TypeInformation.of(Long.class),
new MultiplyByTwoCoStreamMap())
.addSink(new DiscardingSink<>());
env.execute();
}