in src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java [97:116]
public void run(SourceContext<TimestampedFileInputSplit> ctx) {
while (isRunning && count < SPLITS_PER_INVOCATION) {
count++;
synchronized (ctx.getCheckpointLock()) {
ctx.collect(SPLIT);
}
}
while (isRunning) {
try {
TARGET_COUNT_REACHED_LATCH.await(100, TimeUnit.MILLISECONDS);
return;
} catch (InterruptedException e) {
if (!isRunning) {
Thread.currentThread().interrupt();
}
} catch (TimeoutException e) {
// continue waiting
}
}
}