public void sortedMultiInput()

in src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java [147:184]


    public void sortedMultiInput(SortingInputContext context) throws Exception {
        StreamExecutionEnvironment env = context.env;

        KeyedStream<Integer, Object> elements1 =
                env.fromParallelCollection(
                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
                                BasicTypeInfo.INT_TYPE_INFO)
                        .keyBy(el -> el);

        KeyedStream<Integer, Object> elements2 =
                env.fromParallelCollection(
                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
                                BasicTypeInfo.INT_TYPE_INFO)
                        .keyBy(el -> el);

        KeyedStream<Integer, Object> elements3 =
                env.fromParallelCollection(
                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
                                BasicTypeInfo.INT_TYPE_INFO)
                        .keyBy(el -> el);

        KeyedMultipleInputTransformation<Long> assertingTransformation =
                new KeyedMultipleInputTransformation<>(
                        "Asserting operator",
                        new AssertingThreeInputOperatorFactory(),
                        BasicTypeInfo.LONG_TYPE_INFO,
                        -1,
                        BasicTypeInfo.INT_TYPE_INFO);
        assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector());
        assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector());
        assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector());

        env.addOperator(assertingTransformation);
        DataStream<Long> counts = new DataStream<>(env, assertingTransformation);

        counts.addSink(new DiscardingSink<>());
        context.execute();
    }