protected StreamGraphWithSources getStreamGraph()

in src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java [101:124]


        protected StreamGraphWithSources getStreamGraph() {
            DataStreamSource<RecordSource.Record> source =
                    env.fromSource(
                            new RecordSource(Integer.MAX_VALUE, (int) getRecordSize().getBytes()),
                            noWatermarks(),
                            RecordSource.class.getName());

            source.slotSharingGroup("source")
                    .rebalance()
                    .map((MapFunction<RecordSource.Record, RecordSource.Record>) value -> value)
                    .slotSharingGroup("map")
                    .rebalance()
                    .addSink(new SlowDiscardSink<>())
                    .slotSharingGroup("sink");

            final StreamGraph streamGraph = env.getStreamGraph(false);
            final JobVertexID sourceId =
                    streamGraph
                            .getJobGraph()
                            .getVerticesSortedTopologicallyFromSources()
                            .get(0)
                            .getID();
            return new StreamGraphWithSources(streamGraph, Collections.singletonList(sourceId));
        }