in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java [703:936]
private CompletableFuture<?> createTablePartitionsLocally(
long causalityToken,
CompletableFuture<List<Set<Assignment>>> assignmentsFuture,
int zoneId,
TableImpl table
) {
int tableId = table.tableId();
// Create new raft nodes according to new assignments.
Supplier<CompletableFuture<Void>> updateAssignmentsClosure = () -> assignmentsFuture.thenCompose(newAssignments -> {
// Empty assignments might be a valid case if tables are created from within cluster init HOCON
// configuration, which is not supported now.
assert newAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
int partitions = newAssignments.size();
CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
// TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set partitions only for assigned partitions.
PartitionSet parts = new BitSetPartitionSet();
for (int i = 0; i < futures.length; i++) {
futures[i] = new CompletableFuture<>();
parts.set(i);
}
String localMemberName = localNode().name();
for (int i = 0; i < partitions; i++) {
int partId = i;
Set<Assignment> newPartAssignment = newAssignments.get(partId);
InternalTable internalTbl = table.internalTable();
Assignment localMemberAssignment = newPartAssignment.stream()
.filter(a -> a.consistentId().equals(localMemberName))
.findAny()
.orElse(null);
PeersAndLearners newConfiguration = configurationFromAssignments(newPartAssignment);
TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId)
.collect(toList()));
var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>(
new HybridTimestamp(1, 0)
);
var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L);
((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker);
PartitionStorages partitionStorages = getPartitionStorages(table, partId);
PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(),
internalTbl, partId);
PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers(
partId,
partitionDataStorage,
table,
safeTimeTracker
);
mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler);
CompletableFuture<Boolean> startGroupFut;
// start new nodes, only if it is table creation, other cases will be covered by rebalance logic
if (localMemberAssignment != null) {
CompletableFuture<Boolean> shouldStartGroupFut;
// If Raft is running in in-memory mode or the PDS has been cleared, we need to remove the current node
// from the Raft group in order to avoid the double vote problem.
// <MUTED> See https://issues.apache.org/jira/browse/IGNITE-16668 for details.
// TODO: https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
if (internalTbl.storage().isVolatile()) {
shouldStartGroupFut = queryDataNodesCount(tableId, partId, newConfiguration.peers())
.thenApply(dataNodesCount -> {
boolean fullPartitionRestart = dataNodesCount == 0;
if (fullPartitionRestart) {
return true;
}
boolean majorityAvailable = dataNodesCount >= (newConfiguration.peers().size() / 2) + 1;
if (majorityAvailable) {
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment, metaStorageMgr);
return false;
} else {
// No majority and not a full partition restart - need to restart nodes
// with current partition.
String msg = "Unable to start partition " + partId + ". Majority not available.";
throw new IgniteInternalException(msg);
}
});
} else {
shouldStartGroupFut = completedFuture(true);
}
startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
if (!startGroup) {
return false;
}
TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();
RaftGroupOptions groupOptions = groupOptionsForPartition(
internalTbl.storage(),
internalTbl.txStateStorage(),
partitionKey(internalTbl, partId),
partitionUpdateHandlers
);
Peer serverPeer = newConfiguration.peer(localMemberName);
var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);
try {
// TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273
((Loza) raftMgr).startRaftGroupNode(
raftNodeId,
newConfiguration,
new PartitionListener(
partitionDataStorage,
partitionUpdateHandlers.storageUpdateHandler,
txStatePartitionStorage,
safeTimeTracker,
storageIndexTracker
),
new RebalanceRaftGroupEventsListener(
metaStorageMgr,
replicaGrpId,
busyLock,
createPartitionMover(internalTbl, partId),
this::calculateAssignments,
rebalanceScheduler
),
groupOptions
);
return true;
} catch (NodeStoppingException ex) {
throw new CompletionException(ex);
}
}), ioExecutor);
} else {
startGroupFut = completedFuture(false);
}
startGroupFut
.thenComposeAsync(v -> inBusyLock(busyLock, () -> {
try {
//TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online.
return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory);
} catch (NodeStoppingException ex) {
return failedFuture(ex);
}
}), ioExecutor)
.thenAcceptAsync(updatedRaftGroupService -> inBusyLock(busyLock, () -> {
((InternalTableImpl) internalTbl).updateInternalTableRaftGroupService(partId, updatedRaftGroupService);
boolean startedRaftNode = startGroupFut.join();
if (localMemberAssignment == null || !startedRaftNode) {
return;
}
MvPartitionStorage partitionStorage = partitionStorages.getMvPartitionStorage();
TxStateStorage txStateStorage = partitionStorages.getTxStateStorage();
try {
startReplicaWithNewListener(
replicaGrpId,
table,
safeTimeTracker,
storageIndexTracker,
partitionStorage,
txStateStorage,
partitionUpdateHandlers,
updatedRaftGroupService,
schemaManager.schemaRegistry(causalityToken, tableId)
);
} catch (NodeStoppingException ex) {
throw new AssertionError("Loza was stopped before Table manager", ex);
}
}), ioExecutor)
.whenComplete((res, ex) -> {
if (ex != null) {
LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId);
futures[partId].completeExceptionally(ex);
} else {
futures[partId].complete(null);
}
});
}
return allOf(futures);
});
// NB: all vv.update() calls must be made from the synchronous part of the method (not in thenCompose()/etc!).
CompletableFuture<?> localPartsUpdateFuture = localPartsByTableIdVv.update(causalityToken,
(previous, throwable) -> inBusyLock(busyLock, () -> assignmentsFuture.thenCompose(newAssignments -> {
PartitionSet parts = new BitSetPartitionSet();
for (int i = 0; i < newAssignments.size(); i++) {
parts.set(i);
}
return getOrCreatePartitionStorages(table, parts).thenApply(u -> {
var newValue = new HashMap<>(previous);
newValue.put(tableId, parts);
return newValue;
});
})));
return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
if (e != null) {
return failedFuture(e);
}
return localPartsUpdateFuture.thenCompose(unused ->
tablesByIdVv.get(causalityToken)
.thenComposeAsync(tablesById -> inBusyLock(busyLock, updateAssignmentsClosure), ioExecutor)
);
});
}