public void twoInputMapSink()

in src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java [55:79]


    public void twoInputMapSink(FlinkEnvironmentContext context) throws Exception {

        StreamExecutionEnvironment env = context.env;

        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
        env.setParallelism(1);

        // Setting buffer timeout to 1 is an attempt to improve twoInputMapSink benchmark stability.
        // Without 1ms buffer timeout, some JVM forks are much slower then others, making results
        // unstable and unreliable.
        env.setBufferTimeout(1);

        long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
        DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput));
        DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput));

        source1.connect(source2)
                .transform(
                        "custom operator",
                        TypeInformation.of(Long.class),
                        new MultiplyByTwoCoStreamMap())
                .addSink(new DiscardingSink<>());

        env.execute();
    }