in src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java [101:124]
protected StreamGraphWithSources getStreamGraph() {
DataStreamSource<RecordSource.Record> source =
env.fromSource(
new RecordSource(Integer.MAX_VALUE, (int) getRecordSize().getBytes()),
noWatermarks(),
RecordSource.class.getName());
source.slotSharingGroup("source")
.rebalance()
.map((MapFunction<RecordSource.Record, RecordSource.Record>) value -> value)
.slotSharingGroup("map")
.rebalance()
.addSink(new SlowDiscardSink<>())
.slotSharingGroup("sink");
final StreamGraph streamGraph = env.getStreamGraph(false);
final JobVertexID sourceId =
streamGraph
.getJobGraph()
.getVerticesSortedTopologicallyFromSources()
.get(0)
.getID();
return new StreamGraphWithSources(streamGraph, Collections.singletonList(sourceId));
}