public void sortedTwoInput()

in src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java [120:143]


    public void sortedTwoInput(SortingInputContext context) throws Exception {
        StreamExecutionEnvironment env = context.env;

        DataStreamSource<Integer> elements1 =
                env.fromParallelCollection(
                        new InputGenerator(RECORDS_PER_INVOCATION / 2),
                        BasicTypeInfo.INT_TYPE_INFO);

        DataStreamSource<Integer> elements2 =
                env.fromParallelCollection(
                        new InputGenerator(RECORDS_PER_INVOCATION / 2),
                        BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator<Long> counts =
                elements1
                        .connect(elements2)
                        .keyBy(element -> element, element -> element)
                        .transform(
                                "Asserting operator",
                                BasicTypeInfo.LONG_TYPE_INFO,
                                new AssertingTwoInputOperator());

        counts.addSink(new DiscardingSink<>());
        context.execute();
    }