in modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java [170:348]
public static CompletableFuture<Void> updatePendingAssignmentsKeys(
CatalogZoneDescriptor zoneDescriptor,
ZonePartitionId zonePartitionId,
Collection<String> dataNodes,
int partitions,
int replicas,
long revision,
HybridTimestamp timestamp,
MetaStorageManager metaStorageMgr,
int partNum,
Set<Assignment> zoneCfgPartAssignments,
long assignmentsTimestamp,
Set<String> aliveNodes,
ConsistencyMode consistencyMode
) {
ByteArray partChangeTriggerKey = pendingChangeTriggerKey(zonePartitionId);
ByteArray partAssignmentsPendingKey = pendingPartAssignmentsQueueKey(zonePartitionId);
ByteArray partAssignmentsPlannedKey = plannedPartAssignmentsKey(zonePartitionId);
ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(zonePartitionId);
Set<Assignment> calculatedAssignments = calculateAssignmentForPartition(dataNodes, partNum, partitions, replicas);
Set<Assignment> targetAssignmentSet;
if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
// All complicated logic here is needed because we want to return back to stable nodes
// that are returned back after majority is lost and stable was narrowed.
// Let's consider example:
// stable = [A, B, C], dataNodes = [A, B, C]
// B, C left, stable = [A], dataNodes = [A, B, C]
// B returned, we want stable = [A, B], but in terms of data nodes they are not changed and equal [A, B, C]
// So, because scale up mechanism in this case won't adjust stable, we need to add B to stable manually.
// General idea is to filter offline nodes from data nodes, but we need to be careful and do not remove nodes
// bypassing scale down mechanism. If node is offline and presented in previous stable, we won't remove that node.
// First of all, we remove offline nodes from calculated assignments
Set<Assignment> resultingAssignments = calculatedAssignments
.stream()
.filter(a -> aliveNodes.contains(a.consistentId()))
.collect(toSet());
// Here we re-introduce nodes that currently exist in the stable configuration
// but were previously removed without using the normal scale-down process.
for (Assignment assignment : zoneCfgPartAssignments) {
if (calculatedAssignments.contains(assignment)) {
resultingAssignments.add(assignment);
}
}
targetAssignmentSet = resultingAssignments;
} else {
targetAssignmentSet = calculatedAssignments;
}
boolean isNewAssignments = !zoneCfgPartAssignments.equals(targetAssignmentSet);
Assignments targetAssignments = Assignments.of(targetAssignmentSet, assignmentsTimestamp);
AssignmentsQueue partAssignmentsPendingQueue = pendingAssignmentsCalculator()
.stable(Assignments.of(zoneCfgPartAssignments, assignmentsTimestamp))
.target(targetAssignments)
.toQueue();
byte[] partAssignmentsPlannedBytes = targetAssignments.toBytes();
byte[] partAssignmentsPendingBytes = partAssignmentsPendingQueue.toBytes();
// if empty(partition.change.trigger) || partition.change.trigger < event.timestamp:
// if empty(partition.assignments.pending)
// && ((isNewAssignments && empty(partition.assignments.stable))
// || (partition.assignments.stable != calcPartAssignments() && !empty(partition.assignments.stable))):
// partition.assignments.pending = partAssignmentsPendingQueue
// partition.change.trigger = event.timestamp
// else:
// if partition.assignments.pending != partAssignmentsPendingQueue && !empty(partition.assignments.pending)
// partition.assignments.planned = calcPartAssignments()
// partition.change.trigger = event.timestamp
// else if partition.assignments.pending == partAssignmentsPendingQueue
// remove(partition.assignments.planned)
// partition.change.trigger = event.timestamp
// message after the metastorage invoke:
// "Remove planned key because current pending key has the same value."
// else if empty(partition.assignments.pending)
// remove(partition.assignments.planned)
// partition.change.trigger = event.timestamp
// message after the metastorage invoke:
// "Remove planned key because pending is empty and calculated assignments are equal to current assignments."
// else:
// skip
Condition newAssignmentsCondition = exists(partAssignmentsStableKey)
.and(value(partAssignmentsStableKey).ne(partAssignmentsPlannedBytes));
if (isNewAssignments) {
newAssignmentsCondition = notExists(partAssignmentsStableKey).or(newAssignmentsCondition);
}
byte[] timestampBytes = longToBytesKeepingOrder(timestamp.longValue());
Iif iif = iif(
or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(timestampBytes)),
iif(and(notExists(partAssignmentsPendingKey), newAssignmentsCondition),
ops(
put(partAssignmentsPendingKey, partAssignmentsPendingBytes),
put(partChangeTriggerKey, timestampBytes)
).yield(PENDING_KEY_UPDATED.ordinal()),
iif(and(value(partAssignmentsPendingKey).ne(partAssignmentsPendingBytes), exists(partAssignmentsPendingKey)),
ops(
put(partAssignmentsPlannedKey, partAssignmentsPlannedBytes),
put(partChangeTriggerKey, timestampBytes)
).yield(PLANNED_KEY_UPDATED.ordinal()),
iif(value(partAssignmentsPendingKey).eq(partAssignmentsPendingBytes),
ops(
remove(partAssignmentsPlannedKey),
put(partChangeTriggerKey, timestampBytes)
).yield(PLANNED_KEY_REMOVED_EQUALS_PENDING.ordinal()),
iif(notExists(partAssignmentsPendingKey),
ops(
remove(partAssignmentsPlannedKey),
put(partChangeTriggerKey, timestampBytes)
).yield(PLANNED_KEY_REMOVED_EMPTY_PENDING.ordinal()),
ops().yield(ASSIGNMENT_NOT_UPDATED.ordinal()))
))),
ops().yield(OUTDATED_UPDATE_RECEIVED.ordinal()));
return metaStorageMgr.invoke(iif).thenAccept(sr -> {
switch (UpdateStatus.valueOf(sr.getAsInt())) {
case PENDING_KEY_UPDATED:
LOG.info(
"Update metastore pending partitions key [key={}, partition={}, zone={}/{}, newVal={}, timestamp={}]",
partAssignmentsPendingKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
partAssignmentsPendingQueue, timestamp);
break;
case PLANNED_KEY_UPDATED:
LOG.info(
"Update metastore planned partitions key [key={}, partition={}, zone={}/{}, newVal={}]",
partAssignmentsPlannedKey, partNum, zoneDescriptor.id(), zoneDescriptor.name(),
targetAssignmentSet
);
break;
case PLANNED_KEY_REMOVED_EQUALS_PENDING:
LOG.info(
"Remove planned key because current pending key has the same value [key={}, partition={}, zone={}/{}, val={}]",
partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
targetAssignmentSet
);
break;
case PLANNED_KEY_REMOVED_EMPTY_PENDING:
LOG.info(
"Remove planned key because pending is empty and calculated assignments are equal to current assignments "
+ "[key={}, partition={}, zone={}/{}, val={}]",
partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
targetAssignmentSet
);
break;
case ASSIGNMENT_NOT_UPDATED:
LOG.debug(
"Assignments are not updated [key={}, partition={}, zone={}/{}, val={}]",
partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
targetAssignmentSet
);
break;
case OUTDATED_UPDATE_RECEIVED:
LOG.debug(
"Received outdated rebalance trigger event [revision={}, partition={}, zone={}/{}]",
revision, partNum, zoneDescriptor.id(), zoneDescriptor.name());
break;
default:
throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
}
});
}