in src/main/java/org/apache/flink/benchmark/MultiInputCheckpointingTimeBenchmark.java [88:131]
protected StreamGraphWithSources getStreamGraph() {
DataStream<Record> source1 =
env.fromSource(
new RecordSource(
Integer.MAX_VALUE, (int) SMALL_RECORD_SIZE.getBytes()),
noWatermarks(),
RecordSource.class.getName())
.slotSharingGroup("source-small-records")
.rebalance();
DataStream<Record> source2 =
env.fromSource(
new RecordSource(
Integer.MAX_VALUE, (int) BIG_RECORD_SIZE.getBytes()),
noWatermarks(),
RecordSource.class.getName())
.slotSharingGroup("source-big-records")
.rebalance();
source1.connect(source2)
.map(
new CoMapFunction<Record, Record, Record>() {
@Override
public Record map1(Record record) throws Exception {
return record;
}
@Override
public Record map2(Record record) throws Exception {
return record;
}
})
.name("co-map")
.slotSharingGroup("map-and-sink")
.addSink(new SlowDiscardSink<>())
.slotSharingGroup("map-and-sink");
final StreamGraph streamGraph = env.getStreamGraph(false);
final JobGraph jobGraph = streamGraph.getJobGraph();
final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
return new StreamGraphWithSources(
streamGraph, Arrays.asList(vertices.get(0).getID(), vertices.get(1).getID()));
}