in flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java [213:271]
public boolean hasNext() {
if (readOptions.getRowLimit() != null && readRowCount >= readOptions.getRowLimit()) {
return false;
}
boolean hasNext = false;
if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
// support deserialize Arrow to RowBatch asynchronously
if (rowBatch == null || !rowBatch.hasNext()) {
while (!eos.get() || !rowBatchBlockingQueue.isEmpty()) {
if (!rowBatchBlockingQueue.isEmpty()) {
try {
rowBatch = rowBatchBlockingQueue.take();
} catch (InterruptedException e) {
throw new DorisRuntimeException(e);
}
hasNext = true;
break;
} else {
// wait for rowBatch put in queue or eos change
try {
Thread.sleep(5);
} catch (InterruptedException e) {
}
}
}
} else {
hasNext = true;
}
} else {
clientLock.lock();
try {
// Arrow data was acquired synchronously during the iterative process
if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
if (rowBatch != null) {
offset += rowBatch.getReadRowCount();
rowBatch.close();
}
TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
nextBatchParams.setContextId(contextId);
nextBatchParams.setOffset(offset);
TScanBatchResult nextResult = client.getNext(nextBatchParams);
eos.set(nextResult.isEos());
if (!eos.get()) {
rowBatch = new RowBatch(nextResult, schema).readArrow();
} else {
LOG.info(
"Scan finished, tablets: {}, offset: {}",
partition.getTabletIds(),
offset);
}
}
hasNext = !eos.get();
} finally {
clientLock.unlock();
}
}
return hasNext;
}