in spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java [85:119]
public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
this.schema = schema;
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
);
try {
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != schema.size()) {
logger.error("Schema size '{}' is not equal to arrow field size '{}'.",
fieldVectors.size(), schema.size());
throw new DorisException("Load Doris data failed, schema size of fetch data is wrong.");
}
if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
logger.debug("One batch in arrow has no data.");
continue;
}
rowCountInOneBatch = root.getRowCount();
// init the rowBatch
for (int i = 0; i < rowCountInOneBatch; ++i) {
rowBatch.add(new Row(fieldVectors.size()));
}
convertArrowToRowBatch();
readRowCount += root.getRowCount();
}
} catch (Exception e) {
logger.error("Read Doris Data failed because: ", e);
throw new DorisException(e.getMessage());
} finally {
close();
}
}