in src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java [168:185]
public SourceReader<Integer, MockSourceSplit> createReader(
SourceReaderContext readerContext) {
return new MockSourceReader(true, true) {
@Override
public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) {
if (canFinish.isDone() && !canFinish.isCompletedExceptionally()) {
return InputStatus.END_OF_INPUT;
} else {
return InputStatus.NOTHING_AVAILABLE;
}
}
@Override
public synchronized CompletableFuture<Void> isAvailable() {
return canFinish;
}
};
}