private void doOnNewPeersConfigurationApplied()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java [290:479]


    private void doOnNewPeersConfigurationApplied(PeersAndLearners configuration) {
        try {
            ByteArray pendingPartAssignmentsKey = pendingPartAssignmentsKey(tablePartitionId);
            ByteArray stablePartAssignmentsKey = stablePartAssignmentsKey(tablePartitionId);
            ByteArray plannedPartAssignmentsKey = plannedPartAssignmentsKey(tablePartitionId);
            ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
            ByteArray switchAppendKey = switchAppendKey(tablePartitionId);

            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
                    Set.of(
                            plannedPartAssignmentsKey,
                            pendingPartAssignmentsKey,
                            stablePartAssignmentsKey,
                            switchReduceKey,
                            switchAppendKey
                    )
            ).get();

            Entry stableEntry = values.get(stablePartAssignmentsKey);
            Entry pendingEntry = values.get(pendingPartAssignmentsKey);
            Entry plannedEntry = values.get(plannedPartAssignmentsKey);
            Entry switchReduceEntry = values.get(switchReduceKey);
            Entry switchAppendEntry = values.get(switchAppendKey);

            Set<Assignment> retrievedStable = readAssignments(stableEntry);
            Set<Assignment> retrievedSwitchReduce = readAssignments(switchReduceEntry);
            Set<Assignment> retrievedSwitchAppend = readAssignments(switchAppendEntry);

            Set<Assignment> calculatedAssignments = calculateAssignmentsFn.apply(tablePartitionId);

            Set<Assignment> stable = createAssignments(configuration);

            // Were reduced
            Set<Assignment> reducedNodes = difference(retrievedSwitchReduce, stable);

            // Were added
            Set<Assignment> addedNodes = difference(stable, retrievedStable);

            // For further reduction
            Set<Assignment> calculatedSwitchReduce = difference(retrievedSwitchReduce, reducedNodes);

            // For further addition
            Set<Assignment> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
            calculatedSwitchAppend = difference(calculatedSwitchAppend, addedNodes);
            calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);

            Set<Assignment> calculatedPendingReduction = difference(stable, retrievedSwitchReduce);

            Set<Assignment> calculatedPendingAddition = union(stable, reducedNodes);
            calculatedPendingAddition = intersect(calculatedAssignments, calculatedPendingAddition);

            // eq(revision(assignments.stable), retrievedAssignmentsStable.revision)
            SimpleCondition con1 = stableEntry.empty()
                    ? notExists(stablePartAssignmentsKey) :
                    revision(stablePartAssignmentsKey).eq(stableEntry.revision());

            // eq(revision(assignments.pending), retrievedAssignmentsPending.revision)
            SimpleCondition con2 = revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());

            // eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
            SimpleCondition con3 = switchReduceEntry.empty()
                    ? notExists(switchReduceKey) : revision(switchReduceKey).eq(switchReduceEntry.revision());

            // eq(revision(assignments.switch.append), retrievedAssignmentsSwitchAppend.revision)
            SimpleCondition con4 = switchAppendEntry.empty()
                    ? notExists(switchAppendKey) : revision(switchAppendKey).eq(switchAppendEntry.revision());

            // All conditions combined with AND operator.
            Condition retryPreconditions = and(con1, and(con2, and(con3, con4)));

            Update successCase;
            Update failCase;

            byte[] stableByteArray = ByteUtils.toBytes(stable);
            byte[] additionByteArray = ByteUtils.toBytes(calculatedPendingAddition);
            byte[] reductionByteArray = ByteUtils.toBytes(calculatedPendingReduction);
            byte[] switchReduceByteArray = ByteUtils.toBytes(calculatedSwitchReduce);
            byte[] switchAppendByteArray = ByteUtils.toBytes(calculatedSwitchAppend);

            if (!calculatedSwitchAppend.isEmpty()) {
                successCase = ops(
                        put(stablePartAssignmentsKey, stableByteArray),
                        put(pendingPartAssignmentsKey, additionByteArray),
                        put(switchReduceKey, switchReduceByteArray),
                        put(switchAppendKey, switchAppendByteArray)
                ).yield(SWITCH_APPEND_SUCCESS);
                failCase = ops().yield(SWITCH_APPEND_FAIL);
            } else if (!calculatedSwitchReduce.isEmpty()) {
                successCase = ops(
                        put(stablePartAssignmentsKey, stableByteArray),
                        put(pendingPartAssignmentsKey, reductionByteArray),
                        put(switchReduceKey, switchReduceByteArray),
                        put(switchAppendKey, switchAppendByteArray)
                ).yield(SWITCH_REDUCE_SUCCESS);
                failCase = ops().yield(SWITCH_REDUCE_FAIL);
            } else {
                Condition con5;
                if (plannedEntry.value() != null) {
                    // eq(revision(partition.assignments.planned), plannedEntry.revision)
                    con5 = revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());

                    successCase = ops(
                            put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
                            put(pendingPartAssignmentsKey, plannedEntry.value()),
                            remove(plannedPartAssignmentsKey)
                    ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);

                    failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
                } else {
                    // notExists(partition.assignments.planned)
                    con5 = notExists(plannedPartAssignmentsKey);

                    successCase = ops(
                            put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
                            remove(pendingPartAssignmentsKey)
                    ).yield(FINISH_REBALANCE_SUCCESS);

                    failCase = ops().yield(FINISH_REBALANCE_FAIL);
                }

                retryPreconditions = and(retryPreconditions, con5);
            }

            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
            int res = metaStorageMgr.invoke(iif(retryPreconditions, successCase, failCase)).get().getAsInt();

            if (res < 0) {
                switch (res) {
                    case SWITCH_APPEND_FAIL:
                        LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. "
                                        + "Going to retry [tablePartitionID={}, appliedPeers={}]",
                                tablePartitionId, stable
                        );
                        break;
                    case SWITCH_REDUCE_FAIL:
                        LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. "
                                        + "Going to retry [tablePartitionID={}, appliedPeers={}]",
                                tablePartitionId, stable
                        );
                        break;
                    case SCHEDULE_PENDING_REBALANCE_FAIL:
                    case FINISH_REBALANCE_FAIL:
                        LOG.info("Rebalance keys changed while trying to update rebalance information. "
                                        + "Going to retry [tablePartitionId={}, appliedPeers={}]",
                                tablePartitionId, stable
                        );
                        break;
                    default:
                        assert false : res;
                        break;
                }

                doOnNewPeersConfigurationApplied(configuration);
                return;
            }

            switch (res) {
                case SWITCH_APPEND_SUCCESS:
                    LOG.info("Rebalance finished. Going to schedule next rebalance with addition"
                                    + " [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
                            tablePartitionId, stable, calculatedPendingAddition
                    );
                    break;
                case SWITCH_REDUCE_SUCCESS:
                    LOG.info("Rebalance finished. Going to schedule next rebalance with reduction"
                                    + " [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
                            tablePartitionId, stable, calculatedPendingReduction
                    );
                    break;
                case SCHEDULE_PENDING_REBALANCE_SUCCESS:
                    LOG.info(
                            "Rebalance finished. Going to schedule next rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
                            tablePartitionId, stable, ByteUtils.fromBytes(plannedEntry.value())
                    );
                    break;
                case FINISH_REBALANCE_SUCCESS:
                    LOG.info("Rebalance finished [tablePartitionId={}, appliedPeers={}]", tablePartitionId, stable);
                    break;
                default:
                    assert false : res;
                    break;
            }

            rebalanceAttempts.set(0);
        } catch (InterruptedException | ExecutionException e) {
            // TODO: IGNITE-14693
            LOG.warn("Unable to commit partition configuration to metastore: " + tablePartitionId, e);
        }
    }