in src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java [120:143]
public void sortedTwoInput(SortingInputContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
DataStreamSource<Integer> elements1 =
env.fromParallelCollection(
new InputGenerator(RECORDS_PER_INVOCATION / 2),
BasicTypeInfo.INT_TYPE_INFO);
DataStreamSource<Integer> elements2 =
env.fromParallelCollection(
new InputGenerator(RECORDS_PER_INVOCATION / 2),
BasicTypeInfo.INT_TYPE_INFO);
SingleOutputStreamOperator<Long> counts =
elements1
.connect(elements2)
.keyBy(element -> element, element -> element)
.transform(
"Asserting operator",
BasicTypeInfo.LONG_TYPE_INFO,
new AssertingTwoInputOperator());
counts.addSink(new DiscardingSink<>());
context.execute();
}