in src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java [147:184]
public void sortedMultiInput(SortingInputContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
KeyedStream<Integer, Object> elements1 =
env.fromParallelCollection(
new InputGenerator(RECORDS_PER_INVOCATION / 3),
BasicTypeInfo.INT_TYPE_INFO)
.keyBy(el -> el);
KeyedStream<Integer, Object> elements2 =
env.fromParallelCollection(
new InputGenerator(RECORDS_PER_INVOCATION / 3),
BasicTypeInfo.INT_TYPE_INFO)
.keyBy(el -> el);
KeyedStream<Integer, Object> elements3 =
env.fromParallelCollection(
new InputGenerator(RECORDS_PER_INVOCATION / 3),
BasicTypeInfo.INT_TYPE_INFO)
.keyBy(el -> el);
KeyedMultipleInputTransformation<Long> assertingTransformation =
new KeyedMultipleInputTransformation<>(
"Asserting operator",
new AssertingThreeInputOperatorFactory(),
BasicTypeInfo.LONG_TYPE_INFO,
-1,
BasicTypeInfo.INT_TYPE_INFO);
assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector());
assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector());
assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector());
env.addOperator(assertingTransformation);
DataStream<Long> counts = new DataStream<>(env, assertingTransformation);
counts.addSink(new DiscardingSink<>());
context.execute();
}