private void doStableKeySwitch()

in modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java [276:517]


    private void doStableKeySwitch(
            Set<Assignment> stableFromRaft,
            TablePartitionId tablePartitionId,
            MetaStorageManager metaStorageMgr,
            long configurationTerm,
            long configurationIndex,
            BiFunction<TablePartitionId, Long, CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
    ) {
        try {
            ByteArray pendingPartAssignmentsKey = pendingPartAssignmentsQueueKey(tablePartitionId);
            ByteArray stablePartAssignmentsKey = stablePartAssignmentsKey(tablePartitionId);
            ByteArray plannedPartAssignmentsKey = plannedPartAssignmentsKey(tablePartitionId);
            ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
            ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
            ByteArray assignmentsChainKey = assignmentsChainKey(tablePartitionId);

            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
                    Set.of(
                            plannedPartAssignmentsKey,
                            pendingPartAssignmentsKey,
                            stablePartAssignmentsKey,
                            switchReduceKey,
                            switchAppendKey,
                            assignmentsChainKey
                    )
            ).get();

            Entry stableEntry = values.get(stablePartAssignmentsKey);
            Entry pendingEntry = values.get(pendingPartAssignmentsKey);
            Entry plannedEntry = values.get(plannedPartAssignmentsKey);
            Entry switchReduceEntry = values.get(switchReduceKey);
            Entry switchAppendEntry = values.get(switchAppendKey);
            Entry assignmentsChainEntry = values.get(assignmentsChainKey);

            Set<Assignment> retrievedStable = readAssignments(stableEntry).nodes();
            Set<Assignment> retrievedSwitchReduce = readAssignments(switchReduceEntry).nodes();
            Set<Assignment> retrievedSwitchAppend = readAssignments(switchAppendEntry).nodes();

            Assignments pendingAssignments = pendingEntry.value() == null
                    ? Assignments.EMPTY
                    : AssignmentsQueue.fromBytes(pendingEntry.value()).poll();
            Set<Assignment> retrievedPending = pendingAssignments.nodes();

            if (!retrievedPending.equals(stableFromRaft)) {
                return;
            }

            // We wait for catalog metadata to be applied up to the provided timestamp, so it should be safe to use the timestamp.
            Set<Assignment> calculatedAssignments = calculateAssignmentsFn.apply(tablePartitionId, pendingAssignments.timestamp())
                    .get();

            // Were reduced
            Set<Assignment> reducedNodes = difference(retrievedSwitchReduce, stableFromRaft);

            // Were added
            Set<Assignment> addedNodes = difference(stableFromRaft, retrievedStable);

            // For further reduction
            Set<Assignment> calculatedSwitchReduce = difference(retrievedSwitchReduce, reducedNodes);

            // For further addition
            Set<Assignment> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
            calculatedSwitchAppend = difference(calculatedSwitchAppend, addedNodes);
            calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);

            Set<Assignment> calculatedPendingReduction = difference(stableFromRaft, retrievedSwitchReduce);

            Set<Assignment> calculatedPendingAddition = union(stableFromRaft, reducedNodes);
            calculatedPendingAddition = intersect(calculatedAssignments, calculatedPendingAddition);

            // eq(revision(assignments.stable), retrievedAssignmentsStable.revision)
            SimpleCondition con1 = stableEntry.empty()
                    ? notExists(stablePartAssignmentsKey) :
                    revision(stablePartAssignmentsKey).eq(stableEntry.revision());

            // eq(revision(assignments.pending), retrievedAssignmentsPending.revision)
            SimpleCondition con2 = revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());

            // eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
            SimpleCondition con3 = switchReduceEntry.empty()
                    ? notExists(switchReduceKey) : revision(switchReduceKey).eq(switchReduceEntry.revision());

            // eq(revision(assignments.switch.append), retrievedAssignmentsSwitchAppend.revision)
            SimpleCondition con4 = switchAppendEntry.empty()
                    ? notExists(switchAppendKey) : revision(switchAppendKey).eq(switchAppendEntry.revision());

            // All conditions combined with AND operator.
            Condition retryPreconditions = and(con1, and(con2, and(con3, con4)));

            long catalogTimestamp = pendingAssignments.timestamp();

            Assignments newStableAssignments = Assignments.of(stableFromRaft, catalogTimestamp);

            Operation assignmentChainChangeOp = handleAssignmentsChainChange(
                    assignmentsChainKey,
                    assignmentsChainEntry,
                    pendingAssignments,
                    newStableAssignments,
                    configurationTerm,
                    configurationIndex
            );

            Update successCase;
            Update failCase;

            byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
            byte[] additionByteArray = AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition, catalogTimestamp));
            byte[] reductionByteArray = AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction, catalogTimestamp));
            byte[] switchReduceByteArray = Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
            byte[] switchAppendByteArray = Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);

            if (!calculatedSwitchAppend.isEmpty()) {
                successCase = ops(
                        put(stablePartAssignmentsKey, stableFromRaftByteArray),
                        put(pendingPartAssignmentsKey, additionByteArray),
                        put(switchReduceKey, switchReduceByteArray),
                        put(switchAppendKey, switchAppendByteArray),
                        assignmentChainChangeOp
                ).yield(SWITCH_APPEND_SUCCESS);
                failCase = ops().yield(SWITCH_APPEND_FAIL);
            } else if (!calculatedSwitchReduce.isEmpty()) {
                successCase = ops(
                        put(stablePartAssignmentsKey, stableFromRaftByteArray),
                        put(pendingPartAssignmentsKey, reductionByteArray),
                        put(switchReduceKey, switchReduceByteArray),
                        put(switchAppendKey, switchAppendByteArray),
                        assignmentChainChangeOp
                ).yield(SWITCH_REDUCE_SUCCESS);
                failCase = ops().yield(SWITCH_REDUCE_FAIL);
            } else {
                Condition con5;
                if (plannedEntry.value() != null) {
                    // eq(revision(partition.assignments.planned), plannedEntry.revision)
                    con5 = revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());

                    successCase = ops(
                            put(stablePartAssignmentsKey, stableFromRaftByteArray),
                            put(pendingPartAssignmentsKey, AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
                            remove(plannedPartAssignmentsKey),
                            assignmentChainChangeOp
                    ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);

                    failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
                } else {
                    // notExists(partition.assignments.planned)
                    con5 = notExists(plannedPartAssignmentsKey);

                    successCase = ops(
                            put(stablePartAssignmentsKey, stableFromRaftByteArray),
                            remove(pendingPartAssignmentsKey),
                            assignmentChainChangeOp
                    ).yield(FINISH_REBALANCE_SUCCESS);

                    failCase = ops().yield(FINISH_REBALANCE_FAIL);
                }

                retryPreconditions = and(retryPreconditions, con5);
            }

            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
            int res = metaStorageMgr.invoke(iif(retryPreconditions, successCase, failCase)).get().getAsInt();

            if (res < 0) {
                switch (res) {
                    case SWITCH_APPEND_FAIL:
                        LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. "
                                        + "Going to retry [tablePartitionID={}, appliedPeers={}]",
                                tablePartitionId, stableFromRaft
                        );
                        break;
                    case SWITCH_REDUCE_FAIL:
                        LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. "
                                        + "Going to retry [tablePartitionID={}, appliedPeers={}]",
                                tablePartitionId, stableFromRaft
                        );
                        break;
                    case SCHEDULE_PENDING_REBALANCE_FAIL:
                    case FINISH_REBALANCE_FAIL:
                        LOG.info("Rebalance keys changed while trying to update rebalance information. "
                                        + "Going to retry [tablePartitionId={}, appliedPeers={}]",
                                tablePartitionId, stableFromRaft
                        );
                        break;
                    default:
                        assert false : res;
                        break;
                }

                doStableKeySwitch(
                        stableFromRaft,
                        tablePartitionId,
                        metaStorageMgr,
                        configurationTerm,
                        configurationIndex,
                        calculateAssignmentsFn
                );

                return;
            }

            switch (res) {
                case SWITCH_APPEND_SUCCESS:
                    LOG.info("Rebalance finished. Going to schedule next rebalance with addition"
                                    + " [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
                            tablePartitionId, stableFromRaft, calculatedPendingAddition
                    );
                    break;
                case SWITCH_REDUCE_SUCCESS:
                    LOG.info("Rebalance finished. Going to schedule next rebalance with reduction"
                                    + " [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
                            tablePartitionId, stableFromRaft, calculatedPendingReduction
                    );
                    break;
                case SCHEDULE_PENDING_REBALANCE_SUCCESS:
                    LOG.info(
                            "Rebalance finished. Going to schedule next rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
                            tablePartitionId, stableFromRaft, Assignments.fromBytes(plannedEntry.value()).nodes()
                    );
                    break;
                case FINISH_REBALANCE_SUCCESS:
                    LOG.info("Rebalance finished [tablePartitionId={}, appliedPeers={}]", tablePartitionId, stableFromRaft);
                    break;

                default:
                    assert false : res;
                    break;
            }

        } catch (InterruptedException | ExecutionException e) {
            // TODO: IGNITE-14693
            if (!hasCause(e, NodeStoppingException.class)) {
                if (hasCause(e, TimeoutException.class)) {
                    // TODO: https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout properly.
                    LOG.error("Unable to commit partition configuration to metastore: {}", e, tablePartitionId);
                } else {
                    String errorMessage = String.format("Unable to commit partition configuration to metastore: %s", tablePartitionId);
                    failureProcessor.process(new FailureContext(e, errorMessage));
                }
            }
        }
    }