in modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java [337:554]
private void doStableKeySwitch(
Set<Assignment> stableFromRaft,
ZonePartitionId zonePartitionId,
MetaStorageManager metaStorageMgr,
BiFunction<ZonePartitionId, Long, CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
) {
try {
ByteArray pendingPartAssignmentsKey = pendingPartAssignmentsQueueKey(zonePartitionId);
ByteArray stablePartAssignmentsKey = stablePartAssignmentsKey(zonePartitionId);
ByteArray plannedPartAssignmentsKey = plannedPartAssignmentsKey(zonePartitionId);
ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
Map<ByteArray, Entry> values = metaStorageMgr.getAll(
Set.of(
plannedPartAssignmentsKey,
pendingPartAssignmentsKey,
stablePartAssignmentsKey,
switchReduceKey,
switchAppendKey
)
).get();
Entry stableEntry = values.get(stablePartAssignmentsKey);
Entry pendingEntry = values.get(pendingPartAssignmentsKey);
Entry plannedEntry = values.get(plannedPartAssignmentsKey);
Entry switchReduceEntry = values.get(switchReduceKey);
Entry switchAppendEntry = values.get(switchAppendKey);
Set<Assignment> retrievedStable = readAssignments(stableEntry).nodes();
Set<Assignment> retrievedSwitchReduce = readAssignments(switchReduceEntry).nodes();
Set<Assignment> retrievedSwitchAppend = readAssignments(switchAppendEntry).nodes();
Assignments pendingAssignments = pendingEntry.value() == null
? Assignments.EMPTY
: AssignmentsQueue.fromBytes(pendingEntry.value()).poll();
Set<Assignment> retrievedPending = pendingAssignments.nodes();
if (!retrievedPending.equals(stableFromRaft)) {
return;
}
// We wait for catalog metadata to be applied up to the provided timestamp, so it should be safe to use the timestamp.
Set<Assignment> calculatedAssignments = calculateAssignmentsFn.apply(zonePartitionId, pendingAssignments.timestamp())
.get();
// Were reduced
Set<Assignment> reducedNodes = difference(retrievedSwitchReduce, stableFromRaft);
// Were added
Set<Assignment> addedNodes = difference(stableFromRaft, retrievedStable);
// For further reduction
Set<Assignment> calculatedSwitchReduce = difference(retrievedSwitchReduce, reducedNodes);
// For further addition
Set<Assignment> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
calculatedSwitchAppend = difference(calculatedSwitchAppend, addedNodes);
calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);
Set<Assignment> calculatedPendingReduction = difference(stableFromRaft, retrievedSwitchReduce);
Set<Assignment> calculatedPendingAddition = union(stableFromRaft, reducedNodes);
calculatedPendingAddition = intersect(calculatedAssignments, calculatedPendingAddition);
// eq(revision(assignments.stable), retrievedAssignmentsStable.revision)
SimpleCondition con1 = stableEntry.empty()
? notExists(stablePartAssignmentsKey) :
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
// eq(revision(assignments.pending), retrievedAssignmentsPending.revision)
SimpleCondition con2 = revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
// eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
SimpleCondition con3 = switchReduceEntry.empty()
? notExists(switchReduceKey) : revision(switchReduceKey).eq(switchReduceEntry.revision());
// eq(revision(assignments.switch.append), retrievedAssignmentsSwitchAppend.revision)
SimpleCondition con4 = switchAppendEntry.empty()
? notExists(switchAppendKey) : revision(switchAppendKey).eq(switchAppendEntry.revision());
// All conditions combined with AND operator.
Condition retryPreconditions = and(con1, and(con2, and(con3, con4)));
Update successCase;
Update failCase;
long catalogTimestamp = pendingAssignments.timestamp();
byte[] stableFromRaftByteArray = Assignments.toBytes(stableFromRaft, catalogTimestamp);
byte[] additionByteArray = AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition, catalogTimestamp));
byte[] reductionByteArray = AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction, catalogTimestamp));
byte[] switchReduceByteArray = Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
byte[] switchAppendByteArray = Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
if (!calculatedSwitchAppend.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey, stableFromRaftByteArray),
put(pendingPartAssignmentsKey, additionByteArray),
put(switchReduceKey, switchReduceByteArray),
put(switchAppendKey, switchAppendByteArray)
).yield(SWITCH_APPEND_SUCCESS);
failCase = ops().yield(SWITCH_APPEND_FAIL);
} else if (!calculatedSwitchReduce.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey, stableFromRaftByteArray),
put(pendingPartAssignmentsKey, reductionByteArray),
put(switchReduceKey, switchReduceByteArray),
put(switchAppendKey, switchAppendByteArray)
).yield(SWITCH_REDUCE_SUCCESS);
failCase = ops().yield(SWITCH_REDUCE_FAIL);
} else {
Condition con5;
if (plannedEntry.value() != null) {
// eq(revision(partition.assignments.planned), plannedEntry.revision)
con5 = revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
successCase = ops(
put(stablePartAssignmentsKey, stableFromRaftByteArray),
put(pendingPartAssignmentsKey, AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
remove(plannedPartAssignmentsKey)
).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
} else {
// notExists(partition.assignments.planned)
con5 = notExists(plannedPartAssignmentsKey);
successCase = ops(
put(stablePartAssignmentsKey, stableFromRaftByteArray),
remove(pendingPartAssignmentsKey)
).yield(FINISH_REBALANCE_SUCCESS);
failCase = ops().yield(FINISH_REBALANCE_FAIL);
}
retryPreconditions = and(retryPreconditions, con5);
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
int res = metaStorageMgr.invoke(iif(retryPreconditions, successCase, failCase)).get().getAsInt();
if (res < 0) {
switch (res) {
case SWITCH_APPEND_FAIL:
LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. "
+ "Going to retry [zonePartitionId={}, appliedPeers={}]",
zonePartitionId, stableFromRaft
);
break;
case SWITCH_REDUCE_FAIL:
LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. "
+ "Going to retry [zonePartitionId={}, appliedPeers={}]",
zonePartitionId, stableFromRaft
);
break;
case SCHEDULE_PENDING_REBALANCE_FAIL:
case FINISH_REBALANCE_FAIL:
LOG.info("Rebalance keys changed while trying to update rebalance information. "
+ "Going to retry [zonePartitionId={}, appliedPeers={}]",
zonePartitionId, stableFromRaft
);
break;
default:
assert false : res;
break;
}
doStableKeySwitch(
stableFromRaft,
zonePartitionId,
metaStorageMgr,
calculateAssignmentsFn
);
return;
}
switch (res) {
case SWITCH_APPEND_SUCCESS:
LOG.info("Rebalance finished. Going to schedule next rebalance with addition"
+ " [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
zonePartitionId, stableFromRaft, calculatedPendingAddition
);
break;
case SWITCH_REDUCE_SUCCESS:
LOG.info("Rebalance finished. Going to schedule next rebalance with reduction"
+ " [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
zonePartitionId, stableFromRaft, calculatedPendingReduction
);
break;
case SCHEDULE_PENDING_REBALANCE_SUCCESS:
LOG.info(
"Rebalance finished. Going to schedule next rebalance [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
zonePartitionId, stableFromRaft, Assignments.fromBytes(plannedEntry.value()).nodes()
);
break;
case FINISH_REBALANCE_SUCCESS:
LOG.info("Rebalance finished [zonePartitionId={}, appliedPeers={}]", zonePartitionId, stableFromRaft);
break;
default:
assert false : res;
break;
}
} catch (InterruptedException | ExecutionException e) {
// TODO: IGNITE-14693
if (!hasCause(e, NodeStoppingException.class)) {
if (hasCause(e, TimeoutException.class)) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout properly.
LOG.error("Unable to commit partition configuration to metastore: {}", e, zonePartitionId);
} else {
String errorMessage = String.format("Unable to commit partition configuration to metastore: %s", zonePartitionId);
failureProcessor.process(new FailureContext(e, errorMessage));
}
}
}
}