in src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java [83:102]
public void twoInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);
QueuingLongSource.reset();
DataStreamSource<Long> source1 =
env.addSource(new QueuingLongSource(1, ONE_IDLE_RECORDS_PER_INVOCATION - 1));
DataStreamSource<Long> source2 = env.addSource(new QueuingLongSource(2, 1));
source1.connect(source2)
.transform(
"custom operator",
TypeInformation.of(Long.class),
new MultiplyByTwoCoStreamMap())
.addSink(new DiscardingSink<>());
env.execute();
}