in flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java [67:86]
public RecordsWithSplitIds<DataPoint> fetch() throws IOException {
if (this.split == null) {
return null;
}
final InfluxDBSplitRecords recordsBySplits = new InfluxDBSplitRecords(this.split.splitId());
try {
this.ingestionQueue.getAvailabilityFuture().get();
} catch (final InterruptedException | ExecutionException exception) {
throw new IOException("An exception occurred during fetch", exception);
}
final List<DataPoint> requests = this.ingestionQueue.poll();
if (requests == null) {
recordsBySplits.prepareForRead();
return recordsBySplits;
}
recordsBySplits.addAll(requests);
recordsBySplits.prepareForRead();
return recordsBySplits;
}