in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java [290:479]
private void doOnNewPeersConfigurationApplied(PeersAndLearners configuration) {
try {
ByteArray pendingPartAssignmentsKey = pendingPartAssignmentsKey(tablePartitionId);
ByteArray stablePartAssignmentsKey = stablePartAssignmentsKey(tablePartitionId);
ByteArray plannedPartAssignmentsKey = plannedPartAssignmentsKey(tablePartitionId);
ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
Map<ByteArray, Entry> values = metaStorageMgr.getAll(
Set.of(
plannedPartAssignmentsKey,
pendingPartAssignmentsKey,
stablePartAssignmentsKey,
switchReduceKey,
switchAppendKey
)
).get();
Entry stableEntry = values.get(stablePartAssignmentsKey);
Entry pendingEntry = values.get(pendingPartAssignmentsKey);
Entry plannedEntry = values.get(plannedPartAssignmentsKey);
Entry switchReduceEntry = values.get(switchReduceKey);
Entry switchAppendEntry = values.get(switchAppendKey);
Set<Assignment> retrievedStable = readAssignments(stableEntry);
Set<Assignment> retrievedSwitchReduce = readAssignments(switchReduceEntry);
Set<Assignment> retrievedSwitchAppend = readAssignments(switchAppendEntry);
Set<Assignment> calculatedAssignments = calculateAssignmentsFn.apply(tablePartitionId);
Set<Assignment> stable = createAssignments(configuration);
// Were reduced
Set<Assignment> reducedNodes = difference(retrievedSwitchReduce, stable);
// Were added
Set<Assignment> addedNodes = difference(stable, retrievedStable);
// For further reduction
Set<Assignment> calculatedSwitchReduce = difference(retrievedSwitchReduce, reducedNodes);
// For further addition
Set<Assignment> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
calculatedSwitchAppend = difference(calculatedSwitchAppend, addedNodes);
calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);
Set<Assignment> calculatedPendingReduction = difference(stable, retrievedSwitchReduce);
Set<Assignment> calculatedPendingAddition = union(stable, reducedNodes);
calculatedPendingAddition = intersect(calculatedAssignments, calculatedPendingAddition);
// eq(revision(assignments.stable), retrievedAssignmentsStable.revision)
SimpleCondition con1 = stableEntry.empty()
? notExists(stablePartAssignmentsKey) :
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
// eq(revision(assignments.pending), retrievedAssignmentsPending.revision)
SimpleCondition con2 = revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
// eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
SimpleCondition con3 = switchReduceEntry.empty()
? notExists(switchReduceKey) : revision(switchReduceKey).eq(switchReduceEntry.revision());
// eq(revision(assignments.switch.append), retrievedAssignmentsSwitchAppend.revision)
SimpleCondition con4 = switchAppendEntry.empty()
? notExists(switchAppendKey) : revision(switchAppendKey).eq(switchAppendEntry.revision());
// All conditions combined with AND operator.
Condition retryPreconditions = and(con1, and(con2, and(con3, con4)));
Update successCase;
Update failCase;
byte[] stableByteArray = ByteUtils.toBytes(stable);
byte[] additionByteArray = ByteUtils.toBytes(calculatedPendingAddition);
byte[] reductionByteArray = ByteUtils.toBytes(calculatedPendingReduction);
byte[] switchReduceByteArray = ByteUtils.toBytes(calculatedSwitchReduce);
byte[] switchAppendByteArray = ByteUtils.toBytes(calculatedSwitchAppend);
if (!calculatedSwitchAppend.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey, stableByteArray),
put(pendingPartAssignmentsKey, additionByteArray),
put(switchReduceKey, switchReduceByteArray),
put(switchAppendKey, switchAppendByteArray)
).yield(SWITCH_APPEND_SUCCESS);
failCase = ops().yield(SWITCH_APPEND_FAIL);
} else if (!calculatedSwitchReduce.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey, stableByteArray),
put(pendingPartAssignmentsKey, reductionByteArray),
put(switchReduceKey, switchReduceByteArray),
put(switchAppendKey, switchAppendByteArray)
).yield(SWITCH_REDUCE_SUCCESS);
failCase = ops().yield(SWITCH_REDUCE_FAIL);
} else {
Condition con5;
if (plannedEntry.value() != null) {
// eq(revision(partition.assignments.planned), plannedEntry.revision)
con5 = revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
successCase = ops(
put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
put(pendingPartAssignmentsKey, plannedEntry.value()),
remove(plannedPartAssignmentsKey)
).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
} else {
// notExists(partition.assignments.planned)
con5 = notExists(plannedPartAssignmentsKey);
successCase = ops(
put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
remove(pendingPartAssignmentsKey)
).yield(FINISH_REBALANCE_SUCCESS);
failCase = ops().yield(FINISH_REBALANCE_FAIL);
}
retryPreconditions = and(retryPreconditions, con5);
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
int res = metaStorageMgr.invoke(iif(retryPreconditions, successCase, failCase)).get().getAsInt();
if (res < 0) {
switch (res) {
case SWITCH_APPEND_FAIL:
LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. "
+ "Going to retry [tablePartitionID={}, appliedPeers={}]",
tablePartitionId, stable
);
break;
case SWITCH_REDUCE_FAIL:
LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. "
+ "Going to retry [tablePartitionID={}, appliedPeers={}]",
tablePartitionId, stable
);
break;
case SCHEDULE_PENDING_REBALANCE_FAIL:
case FINISH_REBALANCE_FAIL:
LOG.info("Rebalance keys changed while trying to update rebalance information. "
+ "Going to retry [tablePartitionId={}, appliedPeers={}]",
tablePartitionId, stable
);
break;
default:
assert false : res;
break;
}
doOnNewPeersConfigurationApplied(configuration);
return;
}
switch (res) {
case SWITCH_APPEND_SUCCESS:
LOG.info("Rebalance finished. Going to schedule next rebalance with addition"
+ " [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
tablePartitionId, stable, calculatedPendingAddition
);
break;
case SWITCH_REDUCE_SUCCESS:
LOG.info("Rebalance finished. Going to schedule next rebalance with reduction"
+ " [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
tablePartitionId, stable, calculatedPendingReduction
);
break;
case SCHEDULE_PENDING_REBALANCE_SUCCESS:
LOG.info(
"Rebalance finished. Going to schedule next rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
tablePartitionId, stable, ByteUtils.fromBytes(plannedEntry.value())
);
break;
case FINISH_REBALANCE_SUCCESS:
LOG.info("Rebalance finished [tablePartitionId={}, appliedPeers={}]", tablePartitionId, stable);
break;
default:
assert false : res;
break;
}
rebalanceAttempts.set(0);
} catch (InterruptedException | ExecutionException e) {
// TODO: IGNITE-14693
LOG.warn("Unable to commit partition configuration to metastore: " + tablePartitionId, e);
}
}