protected StreamGraphWithSources getStreamGraph()

in src/main/java/org/apache/flink/benchmark/MultiInputCheckpointingTimeBenchmark.java [88:131]


        protected StreamGraphWithSources getStreamGraph() {
            DataStream<Record> source1 =
                    env.fromSource(
                                    new RecordSource(
                                            Integer.MAX_VALUE, (int) SMALL_RECORD_SIZE.getBytes()),
                                    noWatermarks(),
                                    RecordSource.class.getName())
                            .slotSharingGroup("source-small-records")
                            .rebalance();

            DataStream<Record> source2 =
                    env.fromSource(
                                    new RecordSource(
                                            Integer.MAX_VALUE, (int) BIG_RECORD_SIZE.getBytes()),
                                    noWatermarks(),
                                    RecordSource.class.getName())
                            .slotSharingGroup("source-big-records")
                            .rebalance();

            source1.connect(source2)
                    .map(
                            new CoMapFunction<Record, Record, Record>() {
                                @Override
                                public Record map1(Record record) throws Exception {
                                    return record;
                                }

                                @Override
                                public Record map2(Record record) throws Exception {
                                    return record;
                                }
                            })
                    .name("co-map")
                    .slotSharingGroup("map-and-sink")
                    .addSink(new SlowDiscardSink<>())
                    .slotSharingGroup("map-and-sink");

            final StreamGraph streamGraph = env.getStreamGraph(false);
            final JobGraph jobGraph = streamGraph.getJobGraph();
            final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();

            return new StreamGraphWithSources(
                    streamGraph, Arrays.asList(vertices.get(0).getID(), vertices.get(1).getID()));
        }