public void batch()

in hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreSessionImpl.java [212:321]


    public void batch(BatchReq request, StreamObserver<FeedbackRes> observer) {
        String graph = request.getHeader().getGraph();
        List<BatchEntry> list = request.getWriteReq().getEntryList();
        PdProvider pd = getPD();
        try {
            GraphManager graphManager = pd.getGraphManager();
            Graph managerGraph = graphManager.getGraph(graph);
            if (managerGraph != null && graph.endsWith("/g")) {
                Metapb.Graph g = managerGraph.getProtoObj();
                if (g == null || g.getGraphState() == null) {
                    g = pd.getPDClient().getGraphWithOutException(graph);
                    managerGraph.setGraph(g);
                }
                if (g != null) {
                    Metapb.GraphState graphState = g.getGraphState();
                    if (graphState != null) {
                        GraphMode graphMode = graphState.getMode();
                        if (graphMode != null &&
                            graphMode.getNumber() == GraphMode.ReadOnly_VALUE) {
                            // When in read-only state, getMetric the latest graph state from pd, the graph's read-only state will be updated in pd's notification.
                            Metapb.Graph pdGraph =
                                    pd.getPDClient().getGraph(graph);
                            Metapb.GraphState pdGraphState =
                                    pdGraph.getGraphState();
                            if (pdGraphState != null &&
                                pdGraphState.getMode() != null &&
                                pdGraphState.getMode().getNumber() ==
                                GraphMode.ReadOnly_VALUE) {
                                // Confirm that the current state stored in pd is also read-only, then inserting data is not allowed.
                                throw new PDException(-1,
                                                      "the graph space size " +
                                                      "has " +
                                                      "reached the threshold");
                            }
                            // pd status is inconsistent with local cache, update local cache to the status in pd
                            managerGraph.setProtoObj(pdGraph);
                        }
                    }
                }
            }
        } catch (PDException e) {
            ResStatus status = ResStatus.newBuilder()
                                        .setCode(ResCode.RES_CODE_EXCESS)
                                        .setMsg(e.getMessage())
                                        .build();
            FeedbackRes feedbackRes = FeedbackRes.newBuilder()
                                                 .setStatus(status)
                                                 .build();
            observer.onNext(feedbackRes);
            observer.onCompleted();
            return;
        }

        // Split data by partition
        Map<Integer, List<BatchEntry>> groups = new HashMap<>();
        list.forEach((entry) -> {
            Key startKey = entry.getStartKey();
            if (startKey.getCode() == HgStoreConst.SCAN_ALL_PARTITIONS_ID) {
                // All Leader partitions
                List<Integer> ids =
                        storeService.getGraphLeaderPartitionIds(graph);
                ids.forEach(id -> {
                    if (!groups.containsKey(id)) {
                        groups.put(id, new LinkedList<>());
                    }
                    groups.get(id).add(entry);
                });
            } else {
                // According to keyCode to query the belonging partition ID, group by partition ID
                Integer partitionId =
                        pd.getPartitionByCode(graph, startKey.getCode())
                          .getId();
                if (!groups.containsKey(partitionId)) {
                    groups.put(partitionId, new LinkedList<>());
                }
                groups.get(partitionId).add(entry);
            }
        });

        // Send to different raft to execute
        BatchGrpcClosure<FeedbackRes> closure =
                new BatchGrpcClosure<>(groups.size());
        groups.forEach((partition, entries) -> {
            storeService.addRaftTask(HgStoreNodeService.BATCH_OP, graph,
                                     partition,
                                     BatchReq.newBuilder()
                                             .setHeader(request.getHeader())
                                             .setWriteReq(
                                                     BatchWriteReq.newBuilder()
                                                                  .addAllEntry(
                                                                          entries))
                                             .build(),
                                     closure.newRaftClosure());
        });

        if (!graph.isEmpty()) {
            log.debug(" batch: waiting raft...");
            // Wait for the return result
            closure.waitFinish(observer, r -> closure.selectError(r),
                               appConfig.getRaft().getRpcTimeOut());
            log.debug(" batch: ended waiting");
        } else {
            log.info(" batch: there is none of raft leader, graph = {}.",
                     request.getHeader().getGraph());
            observer.onNext(
                    FeedbackRes.newBuilder().setStatus(HgGrpc.success())
                               .build());
            observer.onCompleted();
        }
    }