in src/main/java/com/aliyun/odps/jdbc/utils/InstanceDataIterator.java [87:115]
private synchronized void submitNextSplit(int splitIndex) {
if (splitIndex >= splitNum) return;
long start = offset + splitIndex * splitSize;
long count = Math.min(splitSize, recordCount - (splitIndex * splitSize));
queues[splitIndex] = new LinkedBlockingQueue<>();
executor.submit(() -> {
TunnelRecordReader reader = null;
try {
reader = downloadSession.openRecordReader(start, count);
Record record;
while ((record = reader.read()) != null) {
queues[splitIndex].put(record);
}
queues[splitIndex].put(EOF_RECORD);
} catch (Throwable t) {
error.compareAndSet(null, t);
queues[splitIndex].offer(EOF_RECORD); // Ensure queue is marked as complete
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ignored) {
}
}
}
});
}