in hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/ScanBatchResponse.java [183:232]
private void sendEntries() {
if (state == State.DONE || iterator == null) {
setStateIdle();
return;
}
iteratorLock.lock();
try {
if (state == State.DONE || iterator == null) {
setStateIdle();
return;
}
KvStream.Builder dataBuilder = KvStream.newBuilder().setVersion(1);
while (state != State.DONE && iterator.hasNext()
&& (seqNo - clientSeqNo < maxInFlightCount)
&& this.count < limit) {
KVByteBuffer buffer = new KVByteBuffer(alloc.get());
List<ParallelScanIterator.KV> dataList = iterator.next();
dataList.forEach(kv -> {
kv.write(buffer);
this.count++;
});
dataBuilder.setStream(buffer.flip().getBuffer());
dataBuilder.setSeqNo(seqNo++);
dataBuilder.complete(e -> alloc.release(buffer.getBuffer()));
this.sender.onNext(dataBuilder.build());
this.activeTime = System.currentTimeMillis();
}
if (!iterator.hasNext() || this.count >= limit || state == State.DONE) {
closeIter();
this.sender.onNext(KvStream.newBuilder().setOver(true).build());
setStateDone();
} else {
setStateIdle();
}
} catch (Throwable e) {
if (this.state != State.DONE) {
log.error(" send data exception: ", e);
setStateIdle();
if (this.sender != null) {
try {
this.sender.onError(e);
} catch (Exception ex) {
}
}
}
} finally {
iteratorLock.unlock();
}
}