private CompletableFuture processCreateLogRequest()

in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java [298:382]


    private CompletableFuture<Message> processCreateLogRequest(
            MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) {
        LogName name;
        try (AutoCloseableLock writeLock = writeLock()) {
            CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog();
            name = LogServiceProtoUtil.toLogName(createLog.getLogName());
            if(map.containsKey(name)) {
                return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
                        .toCreateLogExceptionReplyProto(new LogAlreadyExistException(name.getName()))
                        .build()
                        .toByteString()));
            }
            // Check that we have at least 3 nodes
            if (avail.size() < 3) {
                return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
                .toCreateLogExceptionReplyProto(new NoEnoughWorkersException(avail.size()))
                        .build()
                        .toByteString()));
            } else {
                List<PeerGroups> peerGroup =
                    IntStream.range(0, 3).mapToObj(i -> avail.poll()).collect(Collectors.toList());
                List<RaftPeer> peersFromGroup =
                    peerGroup.stream().map(obj -> obj.getPeer()).collect(Collectors.toList());
                RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peersFromGroup);
                peerGroup.stream().forEach(pg -> {
                    pg.getGroups().add(raftGroup);
                    avail.add(pg);
                });
                int provisionedPeers = 0;
                Exception originalException = null;
                for (RaftPeer peer : peers) {
                    try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
                        .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
                        client.getGroupManagementApi(peer.getId()).add(raftGroup);
                    } catch (IOException e) {
                        LOG.error("Failed to add Raft group ({}) for new Log({})",
                            raftGroup.getGroupId(), name, e);
                        originalException = e;
                        break;
                    }
                    provisionedPeers++;
                }
                // If we fail to add the group on all three peers, try to remove the group(s) which
                // failed to be added.
                if (provisionedPeers != peers.size()) {
                    int tornDownPeers = 0;
                    for (RaftPeer peer : peers) {
                        if (tornDownPeers >= provisionedPeers) {
                            break;
                        }
                        try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
                            .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
                            client.getGroupManagementApi(peer.getId()).remove(raftGroup.getGroupId(), true, false);
                        } catch (IOException e) {
                            LOG.error("Failed to clean up Raft group ({}) for peer ({}), "
                                + "ignoring exception", raftGroup.getGroupId(), peer, e);
                        }
                        tornDownPeers++;
                    }
                    // Make sure to send the original exception back to the client.
                    return CompletableFuture.completedFuture(Message.valueOf(
                        MetaServiceProtoUtil.toCreateLogExceptionReplyProto(originalException)
                            .build().toByteString()));
                }
                try (RaftClient client = RaftClient.newBuilder().setRaftGroup(currentGroup)
                    .setClientId(ClientId.randomId()).setProperties(properties).build()){
                    client.io().send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder()
                            .setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
                                    .setLogname(LogServiceProtoUtil.toLogNameProto(name))
                                    .setRaftGroup(MetaServiceProtoUtil
                                            .toRaftGroupProto(raftGroup)))
                            .build().toByteString());
                } catch (IOException e) {
                    LOG.error(
                        "Exception while registering raft group with Metadata Service during creation of log");
                    // Make sure to send the original exception back to the client.
                    return CompletableFuture.completedFuture(Message.valueOf(
                        MetaServiceProtoUtil.toCreateLogExceptionReplyProto(e).build()
                            .toByteString()));
                }
                return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
                        .toCreateLogReplyProto(new LogInfo((name), raftGroup)).build().toByteString()));
            }
        }
    }