public void twoInputOneIdleMapSink()

in src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java [83:102]


    public void twoInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception {

        StreamExecutionEnvironment env = context.env;
        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
        env.setParallelism(1);

        QueuingLongSource.reset();
        DataStreamSource<Long> source1 =
                env.addSource(new QueuingLongSource(1, ONE_IDLE_RECORDS_PER_INVOCATION - 1));
        DataStreamSource<Long> source2 = env.addSource(new QueuingLongSource(2, 1));

        source1.connect(source2)
                .transform(
                        "custom operator",
                        TypeInformation.of(Long.class),
                        new MultiplyByTwoCoStreamMap())
                .addSink(new DiscardingSink<>());

        env.execute();
    }