in hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreSessionImpl.java [212:321]
public void batch(BatchReq request, StreamObserver<FeedbackRes> observer) {
String graph = request.getHeader().getGraph();
List<BatchEntry> list = request.getWriteReq().getEntryList();
PdProvider pd = getPD();
try {
GraphManager graphManager = pd.getGraphManager();
Graph managerGraph = graphManager.getGraph(graph);
if (managerGraph != null && graph.endsWith("/g")) {
Metapb.Graph g = managerGraph.getProtoObj();
if (g == null || g.getGraphState() == null) {
g = pd.getPDClient().getGraphWithOutException(graph);
managerGraph.setGraph(g);
}
if (g != null) {
Metapb.GraphState graphState = g.getGraphState();
if (graphState != null) {
GraphMode graphMode = graphState.getMode();
if (graphMode != null &&
graphMode.getNumber() == GraphMode.ReadOnly_VALUE) {
// When in read-only state, getMetric the latest graph state from pd, the graph's read-only state will be updated in pd's notification.
Metapb.Graph pdGraph =
pd.getPDClient().getGraph(graph);
Metapb.GraphState pdGraphState =
pdGraph.getGraphState();
if (pdGraphState != null &&
pdGraphState.getMode() != null &&
pdGraphState.getMode().getNumber() ==
GraphMode.ReadOnly_VALUE) {
// Confirm that the current state stored in pd is also read-only, then inserting data is not allowed.
throw new PDException(-1,
"the graph space size " +
"has " +
"reached the threshold");
}
// pd status is inconsistent with local cache, update local cache to the status in pd
managerGraph.setProtoObj(pdGraph);
}
}
}
}
} catch (PDException e) {
ResStatus status = ResStatus.newBuilder()
.setCode(ResCode.RES_CODE_EXCESS)
.setMsg(e.getMessage())
.build();
FeedbackRes feedbackRes = FeedbackRes.newBuilder()
.setStatus(status)
.build();
observer.onNext(feedbackRes);
observer.onCompleted();
return;
}
// Split data by partition
Map<Integer, List<BatchEntry>> groups = new HashMap<>();
list.forEach((entry) -> {
Key startKey = entry.getStartKey();
if (startKey.getCode() == HgStoreConst.SCAN_ALL_PARTITIONS_ID) {
// All Leader partitions
List<Integer> ids =
storeService.getGraphLeaderPartitionIds(graph);
ids.forEach(id -> {
if (!groups.containsKey(id)) {
groups.put(id, new LinkedList<>());
}
groups.get(id).add(entry);
});
} else {
// According to keyCode to query the belonging partition ID, group by partition ID
Integer partitionId =
pd.getPartitionByCode(graph, startKey.getCode())
.getId();
if (!groups.containsKey(partitionId)) {
groups.put(partitionId, new LinkedList<>());
}
groups.get(partitionId).add(entry);
}
});
// Send to different raft to execute
BatchGrpcClosure<FeedbackRes> closure =
new BatchGrpcClosure<>(groups.size());
groups.forEach((partition, entries) -> {
storeService.addRaftTask(HgStoreNodeService.BATCH_OP, graph,
partition,
BatchReq.newBuilder()
.setHeader(request.getHeader())
.setWriteReq(
BatchWriteReq.newBuilder()
.addAllEntry(
entries))
.build(),
closure.newRaftClosure());
});
if (!graph.isEmpty()) {
log.debug(" batch: waiting raft...");
// Wait for the return result
closure.waitFinish(observer, r -> closure.selectError(r),
appConfig.getRaft().getRpcTimeOut());
log.debug(" batch: ended waiting");
} else {
log.info(" batch: there is none of raft leader, graph = {}.",
request.getHeader().getGraph());
observer.onNext(
FeedbackRes.newBuilder().setStatus(HgGrpc.success())
.build());
observer.onCompleted();
}
}