in flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java [108:137]
public RowBatch readArrow() {
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != schema.size()) {
logger.error("Schema size '{}' is not equal to arrow field size '{}'.",
fieldVectors.size(), schema.size());
throw new DorisException("Load Doris data failed, schema size of fetch data is wrong.");
}
if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
logger.debug("One batch in arrow has no data.");
continue;
}
rowCountInOneBatch = root.getRowCount();
// init the rowBatch
for (int i = 0; i < rowCountInOneBatch; ++i) {
rowBatch.add(new Row(fieldVectors.size()));
}
convertArrowToRowBatch();
readRowCount += root.getRowCount();
}
return this;
} catch (Exception e) {
logger.error("Read Doris Data failed because: ", e);
throw new DorisRuntimeException(e.getMessage());
} finally {
close();
}
}