private void updateLeaseBatchInternal()

in modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java [413:607]


        private void updateLeaseBatchInternal() {
            HybridTimestamp currentTime = clockService.current();

            leaseUpdateStatistics = new LeaseStats();

            long leaseExpirationInterval = replicationConfiguration.leaseExpirationIntervalMillis().value();

            long outdatedLeaseThreshold = currentTime.getPhysical() + leaseExpirationInterval / 2;

            HybridTimestamp newExpirationTimestamp = new HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);

            Leases leasesCurrent = leaseTracker.leasesCurrent();
            Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new HashMap<>();
            Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId().size());

            Map<ReplicationGroupId, TokenizedAssignments> tokenizedStableAssignmentsMap = assignmentsTracker.stableAssignments();
            Map<ReplicationGroupId, TokenizedAssignments> tokenizedPendingAssignmentsMap = assignmentsTracker.pendingAssignments();

            Set<ReplicationGroupId> groupsAmongCurrentStableAndPendingAssignments = union(
                    tokenizedPendingAssignmentsMap.keySet(),
                    tokenizedStableAssignmentsMap.keySet()
            );

            Map<ReplicationGroupId, Pair<Set<Assignment>, Set<Assignment>>> aggregatedStableAndPendingAssignmentsByGroups = new HashMap<>();

            for (ReplicationGroupId grpId : groupsAmongCurrentStableAndPendingAssignments) {
                Set<Assignment> stables = getAssignmentsFromTokenizedAssignmentsMap(grpId, tokenizedStableAssignmentsMap);
                Set<Assignment> pendings = getAssignmentsFromTokenizedAssignmentsMap(grpId, tokenizedPendingAssignmentsMap);

                aggregatedStableAndPendingAssignmentsByGroups.put(grpId, new Pair<>(stables, pendings));
            }

            // Numbers for statistics logging
            int currentStableAssignmentsSize = tokenizedStableAssignmentsMap.size();
            int currentPendingAssignmentsSize = tokenizedPendingAssignmentsMap.size();
            int activeLeasesCount = 0;

            Set<ReplicationGroupId> prolongableLeaseGroupIds = new HashSet<>();

            for (Map.Entry<ReplicationGroupId, Pair<Set<Assignment>, Set<Assignment>>> entry
                    : aggregatedStableAndPendingAssignmentsByGroups.entrySet()
            ) {
                ReplicationGroupId grpId = entry.getKey();

                Set<Assignment> stableAssignments = entry.getValue().getFirst();
                Set<Assignment> pendingAssignments = entry.getValue().getSecond();

                Lease lease = requireNonNullElse(leasesCurrent.leaseByGroupId().get(grpId), emptyLease(grpId));

                if (lease.isAccepted() && !isLeaseOutdated(lease)) {
                    activeLeasesCount++;
                }

                if (!lease.isAccepted()) {
                    LeaseAgreement agreement = leaseNegotiator.getAndRemoveIfReady(grpId);

                    agreement.checkValid(grpId, topologyTracker.currentTopologySnapshot(), union(stableAssignments, pendingAssignments));

                    if (lease.isProlongable() && agreement.isAccepted()) {
                        Lease negotiatedLease = agreement.getLease();

                        // Lease information is taken from lease tracker, where it appears on meta storage watch updates, so it can
                        // contain stale leases, if watch processing was delayed for some reason. It is ok: negotiated lease is
                        // guaranteed to be already written to meta storage before negotiation begins, and in this case its start time
                        // would be greater than lease's.
                        assert negotiatedLease.getStartTime().longValue() >= lease.getStartTime().longValue()
                                : format("Can't publish the lease that was not negotiated [groupId={}, startTime={}, "
                                + "agreementLeaseStartTime={}].", grpId, lease.getStartTime(), agreement.getLease().getStartTime());

                        publishLease(grpId, negotiatedLease, renewedLeases, leaseExpirationInterval);

                        continue;
                    } else if (!lease.isProlongable() || agreement.isDeclined()) {
                        // Here we initiate negotiations for UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
                        // Also, if the lease was denied, we create the new one.
                        chooseCandidateAndCreateNewLease(
                                grpId,
                                lease,
                                agreement,
                                stableAssignments,
                                pendingAssignments,
                                renewedLeases,
                                toBeNegotiated
                        );

                        continue;
                    }
                }

                // Calculate candidate for any accepted lease to define whether it can be prolonged (prolongation is possible only
                // if the candidate is the same as the current leaseholder).
                String proposedLeaseholder = lease.isProlongable()
                        ? lease.getLeaseholder()
                        : lease.proposedCandidate();

                ClusterNode candidate = nextLeaseHolder(stableAssignments, pendingAssignments, grpId, proposedLeaseholder);

                boolean canBeProlonged = lease.isAccepted()
                        && lease.isProlongable()
                        && candidate != null && candidate.id().equals(lease.getLeaseholderId());

                // The lease is expired or close to this.
                if (lease.getExpirationTime().getPhysical() < outdatedLeaseThreshold) {
                    // If we couldn't find a candidate neither stable nor pending assignments set, so update stats and skip iteration
                    if (candidate == null) {
                        leaseUpdateStatistics.onLeaseWithoutCandidate();

                        continue;
                    }

                    // We can't prolong the expired lease because we already have an interval of time when the lease was not active,
                    // so we must start a negotiation round from the beginning; the same we do for the groups that don't have
                    // leaseholders at all.
                    if (isLeaseOutdated(lease)) {
                        // New lease is granted.
                        Lease newLease = writeNewLease(grpId, candidate, renewedLeases);

                        boolean force = !lease.isProlongable() && lease.proposedCandidate() != null;

                        toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force));
                    } else if (canBeProlonged) {
                        // Old lease is renewed.
                        renewedLeases.put(grpId, prolongLease(lease, newExpirationTimestamp));
                    }
                } else if (canBeProlonged) {
                    prolongableLeaseGroupIds.add(grpId);
                }
            }

            ByteArray key = PLACEMENTDRIVER_LEASES_KEY;

            if (shouldLogLeaseStatistics()) {
                LOG.info(
                        "Leases updated (printed once per {} iteration(s)): [inCurrentIteration={}, active={}, "
                                + "currentStableAssignmentsSize={}, currentPendingAssignmentsSize={}].",
                        LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS,
                        leaseUpdateStatistics,
                        activeLeasesCount,
                        currentStableAssignmentsSize,
                        currentPendingAssignmentsSize
                );
            }

            // This condition allows to skip the meta storage invoke when there are no leases to update (renewedLeases.isEmpty()).
            // However there is the case when we need to save empty leases collection: when the assignments are empty and
            // leasesCurrent (those that reflect the meta storage state) is not empty. The negation of this condition gives us
            // the condition to skip the update and the result is:
            // !(emptyAssignments && !leasesCurrent.isEmpty()) == (!emptyAssignments || leasesCurrent.isEmpty())
            boolean emptyAssignments = aggregatedStableAndPendingAssignmentsByGroups.isEmpty();
            if (renewedLeases.isEmpty() && (!emptyAssignments || leasesCurrent.leaseByGroupId().isEmpty())) {
                LOG.debug("No leases to update found.");
                return;
            }

            leasesCurrent.leaseByGroupId().forEach(renewedLeases::putIfAbsent);

            for (Iterator<Entry<ReplicationGroupId, Lease>> iter = renewedLeases.entrySet().iterator(); iter.hasNext(); ) {
                Map.Entry<ReplicationGroupId, Lease> entry = iter.next();
                ReplicationGroupId groupId = entry.getKey();
                Lease lease = entry.getValue();

                if (clockService.before(lease.getExpirationTime(), currentTime)
                        && !groupsAmongCurrentStableAndPendingAssignments.contains(groupId)) {
                    iter.remove();
                } else if (prolongableLeaseGroupIds.contains(groupId)) {
                    entry.setValue(prolongLease(lease, newExpirationTimestamp));
                }
            }

            byte[] renewedValue = new LeaseBatch(renewedLeases.values()).bytes();

            msManager.invoke(
                    or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())),
                    put(key, renewedValue),
                    noop()
            ).whenComplete((success, e) -> {
                if (e != null) {
                    if (!hasCause(e, NodeStoppingException.class)) {
                        failureProcessor.process(new FailureContext(e, "Lease update invocation failed"));
                    }

                    return;
                }

                if (!success) {
                    LOG.warn("Lease update invocation failed because of outdated lease data on this node.");

                    return;
                }

                for (Map.Entry<ReplicationGroupId, LeaseAgreement> entry : toBeNegotiated.entrySet()) {
                    leaseNegotiator.negotiate(entry.getValue());
                }
            });
        }