in ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java [298:382]
private CompletableFuture<Message> processCreateLogRequest(
MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) {
LogName name;
try (AutoCloseableLock writeLock = writeLock()) {
CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog();
name = LogServiceProtoUtil.toLogName(createLog.getLogName());
if(map.containsKey(name)) {
return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
.toCreateLogExceptionReplyProto(new LogAlreadyExistException(name.getName()))
.build()
.toByteString()));
}
// Check that we have at least 3 nodes
if (avail.size() < 3) {
return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
.toCreateLogExceptionReplyProto(new NoEnoughWorkersException(avail.size()))
.build()
.toByteString()));
} else {
List<PeerGroups> peerGroup =
IntStream.range(0, 3).mapToObj(i -> avail.poll()).collect(Collectors.toList());
List<RaftPeer> peersFromGroup =
peerGroup.stream().map(obj -> obj.getPeer()).collect(Collectors.toList());
RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peersFromGroup);
peerGroup.stream().forEach(pg -> {
pg.getGroups().add(raftGroup);
avail.add(pg);
});
int provisionedPeers = 0;
Exception originalException = null;
for (RaftPeer peer : peers) {
try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
.setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
client.getGroupManagementApi(peer.getId()).add(raftGroup);
} catch (IOException e) {
LOG.error("Failed to add Raft group ({}) for new Log({})",
raftGroup.getGroupId(), name, e);
originalException = e;
break;
}
provisionedPeers++;
}
// If we fail to add the group on all three peers, try to remove the group(s) which
// failed to be added.
if (provisionedPeers != peers.size()) {
int tornDownPeers = 0;
for (RaftPeer peer : peers) {
if (tornDownPeers >= provisionedPeers) {
break;
}
try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
.setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
client.getGroupManagementApi(peer.getId()).remove(raftGroup.getGroupId(), true, false);
} catch (IOException e) {
LOG.error("Failed to clean up Raft group ({}) for peer ({}), "
+ "ignoring exception", raftGroup.getGroupId(), peer, e);
}
tornDownPeers++;
}
// Make sure to send the original exception back to the client.
return CompletableFuture.completedFuture(Message.valueOf(
MetaServiceProtoUtil.toCreateLogExceptionReplyProto(originalException)
.build().toByteString()));
}
try (RaftClient client = RaftClient.newBuilder().setRaftGroup(currentGroup)
.setClientId(ClientId.randomId()).setProperties(properties).build()){
client.io().send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder()
.setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
.setRaftGroup(MetaServiceProtoUtil
.toRaftGroupProto(raftGroup)))
.build().toByteString());
} catch (IOException e) {
LOG.error(
"Exception while registering raft group with Metadata Service during creation of log");
// Make sure to send the original exception back to the client.
return CompletableFuture.completedFuture(Message.valueOf(
MetaServiceProtoUtil.toCreateLogExceptionReplyProto(e).build()
.toByteString()));
}
return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
.toCreateLogReplyProto(new LogInfo((name), raftGroup)).build().toByteString()));
}
}
}