in src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java [119:149]
public void multiInputChainedIdleSource(FlinkEnvironmentContext context) throws Exception {
final StreamExecutionEnvironment env = context.env;
env.getConfig().enableObjectReuse();
final DataStream<Long> source1 =
env.fromSource(
new NumberSequenceSource(1L, RECORDS_PER_INVOCATION),
WatermarkStrategy.noWatermarks(),
"source-1");
final DataStreamSource<Integer> source2 =
env.fromSource(new IdlingSource(1), WatermarkStrategy.noWatermarks(), "source-2");
MultipleInputTransformation<Long> transform =
new MultipleInputTransformation<>(
"custom operator",
new MultiplyByTwoOperatorFactory(),
BasicTypeInfo.LONG_TYPE_INFO,
1);
transform.addInput(((DataStream<?>) source1).getTransformation());
transform.addInput(((DataStream<?>) source2).getTransformation());
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
env.addOperator(transform);
new MultipleConnectedStreams(env)
.transform(transform)
.addSink(new SinkClosingIdlingSource())
.setParallelism(1);
context.execute();
}