in src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java [33:44]
public void run(SourceContext<Record> ctx) throws Exception {
long counter = 0;
while (running && counter < numberOfElements) {
synchronized (ctx.getCheckpointLock()) {
ctx.collectWithTimestamp(
Record.of((int) (counter % numberOfKeys), counter), counter);
counter++;
}
}
running = false;
}