public static CompletableFuture updatePendingAssignmentsKeys()

in modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java [170:348]


    public static CompletableFuture<Void> updatePendingAssignmentsKeys(
            CatalogZoneDescriptor zoneDescriptor,
            ZonePartitionId zonePartitionId,
            Collection<String> dataNodes,
            int partitions,
            int replicas,
            long revision,
            HybridTimestamp timestamp,
            MetaStorageManager metaStorageMgr,
            int partNum,
            Set<Assignment> zoneCfgPartAssignments,
            long assignmentsTimestamp,
            Set<String> aliveNodes,
            ConsistencyMode consistencyMode
    ) {
        ByteArray partChangeTriggerKey = pendingChangeTriggerKey(zonePartitionId);

        ByteArray partAssignmentsPendingKey = pendingPartAssignmentsQueueKey(zonePartitionId);

        ByteArray partAssignmentsPlannedKey = plannedPartAssignmentsKey(zonePartitionId);

        ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(zonePartitionId);

        Set<Assignment> calculatedAssignments = calculateAssignmentForPartition(dataNodes, partNum, partitions, replicas);

        Set<Assignment> targetAssignmentSet;

        if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
            // All complicated logic here is needed because we want to return back to stable nodes
            // that are returned back after majority is lost and stable was narrowed.
            // Let's consider example:
            // stable = [A, B, C], dataNodes = [A, B, C]
            // B, C left, stable = [A], dataNodes = [A, B, C]
            // B returned, we want stable = [A, B], but in terms of data nodes they are not changed and equal [A, B, C]
            // So, because scale up mechanism in this case won't adjust stable, we need to add B to stable manually.
            // General idea is to filter offline nodes from data nodes, but we need to be careful and do not remove nodes
            // bypassing scale down mechanism. If node is offline and presented in previous stable, we won't remove that node.

            // First of all, we remove offline nodes from calculated assignments
            Set<Assignment> resultingAssignments = calculatedAssignments
                    .stream()
                    .filter(a -> aliveNodes.contains(a.consistentId()))
                    .collect(toSet());

            // Here we re-introduce nodes that currently exist in the stable configuration
            // but were previously removed without using the normal scale-down process.
            for (Assignment assignment : zoneCfgPartAssignments) {
                if (calculatedAssignments.contains(assignment)) {
                    resultingAssignments.add(assignment);
                }
            }

            targetAssignmentSet = resultingAssignments;
        } else {
            targetAssignmentSet = calculatedAssignments;
        }

        boolean isNewAssignments = !zoneCfgPartAssignments.equals(targetAssignmentSet);

        Assignments targetAssignments = Assignments.of(targetAssignmentSet, assignmentsTimestamp);
        AssignmentsQueue partAssignmentsPendingQueue = pendingAssignmentsCalculator()
                .stable(Assignments.of(zoneCfgPartAssignments, assignmentsTimestamp))
                .target(targetAssignments)
                .toQueue();

        byte[] partAssignmentsPlannedBytes = targetAssignments.toBytes();
        byte[] partAssignmentsPendingBytes = partAssignmentsPendingQueue.toBytes();

        //    if empty(partition.change.trigger) || partition.change.trigger < event.timestamp:
        //        if empty(partition.assignments.pending)
        //              && ((isNewAssignments && empty(partition.assignments.stable))
        //                  || (partition.assignments.stable != calcPartAssignments() && !empty(partition.assignments.stable))):
        //            partition.assignments.pending = partAssignmentsPendingQueue
        //            partition.change.trigger = event.timestamp
        //        else:
        //            if partition.assignments.pending != partAssignmentsPendingQueue && !empty(partition.assignments.pending)
        //                partition.assignments.planned = calcPartAssignments()
        //                partition.change.trigger = event.timestamp
        //            else if partition.assignments.pending == partAssignmentsPendingQueue
        //                remove(partition.assignments.planned)
        //                partition.change.trigger = event.timestamp
        //                message after the metastorage invoke:
        //                "Remove planned key because current pending key has the same value."
        //            else if empty(partition.assignments.pending)
        //                remove(partition.assignments.planned)
        //                partition.change.trigger = event.timestamp
        //                message after the metastorage invoke:
        //                "Remove planned key because pending is empty and calculated assignments are equal to current assignments."
        //    else:
        //        skip

        Condition newAssignmentsCondition = exists(partAssignmentsStableKey)
                .and(value(partAssignmentsStableKey).ne(partAssignmentsPlannedBytes));

        if (isNewAssignments) {
            newAssignmentsCondition = notExists(partAssignmentsStableKey).or(newAssignmentsCondition);
        }

        byte[] timestampBytes = longToBytesKeepingOrder(timestamp.longValue());

        Iif iif = iif(
                or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(timestampBytes)),
                iif(and(notExists(partAssignmentsPendingKey), newAssignmentsCondition),
                        ops(
                                put(partAssignmentsPendingKey, partAssignmentsPendingBytes),
                                put(partChangeTriggerKey, timestampBytes)
                        ).yield(PENDING_KEY_UPDATED.ordinal()),
                        iif(and(value(partAssignmentsPendingKey).ne(partAssignmentsPendingBytes), exists(partAssignmentsPendingKey)),
                                ops(
                                        put(partAssignmentsPlannedKey, partAssignmentsPlannedBytes),
                                        put(partChangeTriggerKey, timestampBytes)
                                ).yield(PLANNED_KEY_UPDATED.ordinal()),
                                iif(value(partAssignmentsPendingKey).eq(partAssignmentsPendingBytes),
                                        ops(
                                                remove(partAssignmentsPlannedKey),
                                                put(partChangeTriggerKey, timestampBytes)
                                        ).yield(PLANNED_KEY_REMOVED_EQUALS_PENDING.ordinal()),
                                        iif(notExists(partAssignmentsPendingKey),
                                                ops(
                                                        remove(partAssignmentsPlannedKey),
                                                        put(partChangeTriggerKey, timestampBytes)
                                                ).yield(PLANNED_KEY_REMOVED_EMPTY_PENDING.ordinal()),
                                                ops().yield(ASSIGNMENT_NOT_UPDATED.ordinal()))
                                ))),
                ops().yield(OUTDATED_UPDATE_RECEIVED.ordinal()));

        return metaStorageMgr.invoke(iif).thenAccept(sr -> {
            switch (UpdateStatus.valueOf(sr.getAsInt())) {
                case PENDING_KEY_UPDATED:
                    LOG.info(
                            "Update metastore pending partitions key [key={}, partition={}, zone={}/{}, newVal={}, timestamp={}]",
                            partAssignmentsPendingKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
                            partAssignmentsPendingQueue, timestamp);

                    break;
                case PLANNED_KEY_UPDATED:
                    LOG.info(
                            "Update metastore planned partitions key [key={}, partition={}, zone={}/{}, newVal={}]",
                            partAssignmentsPlannedKey, partNum, zoneDescriptor.id(), zoneDescriptor.name(),
                            targetAssignmentSet
                    );

                    break;
                case PLANNED_KEY_REMOVED_EQUALS_PENDING:
                    LOG.info(
                            "Remove planned key because current pending key has the same value [key={}, partition={}, zone={}/{}, val={}]",
                            partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
                            targetAssignmentSet
                    );

                    break;
                case PLANNED_KEY_REMOVED_EMPTY_PENDING:
                    LOG.info(
                            "Remove planned key because pending is empty and calculated assignments are equal to current assignments "
                                    + "[key={}, partition={}, zone={}/{}, val={}]",
                            partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
                            targetAssignmentSet
                    );

                    break;
                case ASSIGNMENT_NOT_UPDATED:
                    LOG.debug(
                            "Assignments are not updated [key={}, partition={}, zone={}/{}, val={}]",
                            partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(),
                            targetAssignmentSet
                    );

                    break;
                case OUTDATED_UPDATE_RECEIVED:
                    LOG.debug(
                            "Received outdated rebalance trigger event [revision={}, partition={}, zone={}/{}]",
                            revision, partNum, zoneDescriptor.id(), zoneDescriptor.name());

                    break;
                default:
                    throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
            }
        });
    }