in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java [2194:2324]
private CompletableFuture<Void> handleChangePendingAssignmentEvent(
long causalityToken,
TablePartitionId replicaGrpId,
TableImpl tbl,
Entry pendingAssignmentsEntry,
Entry stableAssignmentsEntry
) {
// Assignments of the pending rebalance that we received through the Meta storage watch mechanism.
Set<Assignment> pendingAssignments = ByteUtils.fromBytes(pendingAssignmentsEntry.value());
PeersAndLearners pendingConfiguration = configurationFromAssignments(pendingAssignments);
int tableId = tbl.tableId();
int partId = replicaGrpId.partitionId();
byte[] stableAssignmentsBytes = stableAssignmentsEntry.value();
Set<Assignment> stableAssignments = ByteUtils.fromBytes(stableAssignmentsBytes);
PeersAndLearners stableConfiguration = configurationFromAssignments(stableAssignments);
placementDriver.updateAssignment(
replicaGrpId,
stableConfiguration.peers().stream().map(Peer::consistentId).collect(toList())
);
ClusterNode localMember = localNode();
// Start a new Raft node and Replica if this node has appeared in the new assignments.
boolean shouldStartLocalServices = pendingAssignments.stream()
.filter(assignment -> localMember.name().equals(assignment.consistentId()))
.anyMatch(assignment -> !stableAssignments.contains(assignment));
PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker =
new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
InternalTable internalTable = tbl.internalTable();
LOG.info("Received update on pending assignments. Check if new raft group should be started"
+ " [key={}, partition={}, table={}, localMemberAddress={}]",
new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
CompletableFuture<Void> localServicesStartFuture;
if (shouldStartLocalServices) {
localServicesStartFuture = localPartsByTableIdVv.get(causalityToken).thenComposeAsync(oldMap -> {
PartitionSet partitionSet = oldMap.get(tableId).copy();
return getOrCreatePartitionStorages(tbl, partitionSet).thenApply(u -> {
var newMap = new HashMap<>(oldMap);
newMap.put(tableId, partitionSet);
return newMap;
});
}).thenComposeAsync(unused -> {
PartitionStorages partitionStorages = getPartitionStorages(tbl, partId);
MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage();
TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage();
PartitionDataStorage partitionDataStorage = partitionDataStorage(mvPartitionStorage, internalTable, partId);
PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers(
partId,
partitionDataStorage,
tbl,
safeTimeTracker
);
return runAsync(() -> inBusyLock(busyLock, () -> {
try {
startPartitionRaftGroupNode(
replicaGrpId,
pendingConfiguration,
stableConfiguration,
safeTimeTracker,
storageIndexTracker,
internalTable,
txStatePartitionStorage,
partitionDataStorage,
partitionUpdateHandlers
);
startReplicaWithNewListener(
replicaGrpId,
tbl,
safeTimeTracker,
storageIndexTracker,
mvPartitionStorage,
txStatePartitionStorage,
partitionUpdateHandlers,
(TopologyAwareRaftGroupService) internalTable.partitionRaftGroupService(partId),
completedFuture(schemaManager.schemaRegistry(tableId))
);
} catch (NodeStoppingException ignored) {
// No-op.
}
}), ioExecutor);
});
} else {
localServicesStartFuture = completedFuture(null);
}
return localServicesStartFuture
.thenCompose(v -> metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)))
.thenCompose(latestPendingAssignmentsEntry -> {
// Do not change peers of the raft group if this is a stale event.
// Note that we start raft node before for the sake of the consistency in a starting and stopping raft nodes.
if (pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision()) {
return completedFuture(null);
}
RaftGroupService partGrpSvc = internalTable.partitionRaftGroupService(partId);
return partGrpSvc.refreshAndGetLeaderWithTerm()
.thenCompose(leaderWithTerm -> {
// run update of raft configuration if this node is a leader
if (isLocalPeer(leaderWithTerm.leader())) {
LOG.info("Current node={} is the leader of partition raft group={}. "
+ "Initiate rebalance process for partition={}, table={}",
localMember.address(), replicaGrpId, partId, tbl.name());
return partGrpSvc.changePeersAsync(pendingConfiguration, leaderWithTerm.term());
} else {
return completedFuture(null);
}
});
});
}