public void multiInputChainedIdleSource()

in src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java [119:149]


    public void multiInputChainedIdleSource(FlinkEnvironmentContext context) throws Exception {
        final StreamExecutionEnvironment env = context.env;
        env.getConfig().enableObjectReuse();

        final DataStream<Long> source1 =
                env.fromSource(
                        new NumberSequenceSource(1L, RECORDS_PER_INVOCATION),
                        WatermarkStrategy.noWatermarks(),
                        "source-1");

        final DataStreamSource<Integer> source2 =
                env.fromSource(new IdlingSource(1), WatermarkStrategy.noWatermarks(), "source-2");

        MultipleInputTransformation<Long> transform =
                new MultipleInputTransformation<>(
                        "custom operator",
                        new MultiplyByTwoOperatorFactory(),
                        BasicTypeInfo.LONG_TYPE_INFO,
                        1);

        transform.addInput(((DataStream<?>) source1).getTransformation());
        transform.addInput(((DataStream<?>) source2).getTransformation());
        transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);

        env.addOperator(transform);
        new MultipleConnectedStreams(env)
                .transform(transform)
                .addSink(new SinkClosingIdlingSource())
                .setParallelism(1);
        context.execute();
    }