in hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/ParallelScanIterator.java [322:373]
public void scanKV() {
boolean canNext = true;
ArrayList<KV> dataList = new ArrayList<>(batchSize);
dataList.ensureCapacity(batchSize);
iteratorLock.lock();
try {
long entriesSize = 0, bodySize = 0;
while (canNext && !closed) {
iterator = this.getIterator();
if (iterator == null) {
break;
}
while (iterator.hasNext() && entriesSize < batchSize &&
bodySize < maxBodySize &&
counter < limit && !closed) {
KV kv = KV.of(iterator.next());
dataList.add(orderVertex ? kv.setNo(query.getSerialNo()) : kv);
bodySize += kv.size();
entriesSize++;
counter++;
}
if ((entriesSize >= batchSize || bodySize >= maxBodySize) ||
(orderEdge && bodySize >= maxBodySize / 2)) {
if (orderEdge) {
// Sort the edges, ensure all edges of one point are consecutive, prevent other points from inserting.
canNext = putData(dataList, iterator != null && iterator.hasNext());
} else {
canNext = putData(dataList);
}
dataList = new ArrayList<>(batchSize);
dataList.ensureCapacity(batchSize);
entriesSize = bodySize = 0;
}
}
if (!dataList.isEmpty()) {
if (orderEdge) {
putData(dataList, false);
} else {
putData(dataList);
}
}
} catch (Exception e) {
log.error("exception {}", e);
} finally {
iteratorLock.unlock();
if (iterator != null && counter < limit && !closed) {
suspendScanner(this);
} else {
quitScanner(this);
}
}
}