in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java [140:191]
public boolean hasNext() throws DorisException {
if (partition.getLimit() > 0 && readCount >= partition.getLimit()) {
return false;
}
boolean hasNext = false;
if (isAsync && asyncThread != null && asyncThread.isAlive()) {
if (rowBatch == null || !rowBatch.hasNext()) {
while (!endOfStream.get() || !rowBatchQueue.isEmpty()) {
if (!rowBatchQueue.isEmpty()) {
try {
rowBatch = rowBatchQueue.take();
hasNext = true;
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
} else {
hasNext = true;
}
} else {
if (!endOfStream.get() && (rowBatch == null || !rowBatch.hasNext())) {
if (rowBatch != null) {
offset += rowBatch.getReadRowCount();
rowBatch.close();
}
TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
nextBatchParams.setContextId(contextId);
nextBatchParams.setOffset(offset);
TScanBatchResult nextResult = lockClient(backend -> {
try {
return backend.getNext(nextBatchParams);
} catch (DorisException e) {
throw new RuntimeException("backend get next failed", e);
}
});
endOfStream.set(nextResult.isEos());
if (!endOfStream.get()) {
rowBatch = new RowBatch(nextResult, dorisSchema, datetimeJava8ApiEnabled);
}
}
hasNext = !endOfStream.get();
}
return hasNext;
}