in modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java [413:607]
private void updateLeaseBatchInternal() {
HybridTimestamp currentTime = clockService.current();
leaseUpdateStatistics = new LeaseStats();
long leaseExpirationInterval = replicationConfiguration.leaseExpirationIntervalMillis().value();
long outdatedLeaseThreshold = currentTime.getPhysical() + leaseExpirationInterval / 2;
HybridTimestamp newExpirationTimestamp = new HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);
Leases leasesCurrent = leaseTracker.leasesCurrent();
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId().size());
Map<ReplicationGroupId, TokenizedAssignments> tokenizedStableAssignmentsMap = assignmentsTracker.stableAssignments();
Map<ReplicationGroupId, TokenizedAssignments> tokenizedPendingAssignmentsMap = assignmentsTracker.pendingAssignments();
Set<ReplicationGroupId> groupsAmongCurrentStableAndPendingAssignments = union(
tokenizedPendingAssignmentsMap.keySet(),
tokenizedStableAssignmentsMap.keySet()
);
Map<ReplicationGroupId, Pair<Set<Assignment>, Set<Assignment>>> aggregatedStableAndPendingAssignmentsByGroups = new HashMap<>();
for (ReplicationGroupId grpId : groupsAmongCurrentStableAndPendingAssignments) {
Set<Assignment> stables = getAssignmentsFromTokenizedAssignmentsMap(grpId, tokenizedStableAssignmentsMap);
Set<Assignment> pendings = getAssignmentsFromTokenizedAssignmentsMap(grpId, tokenizedPendingAssignmentsMap);
aggregatedStableAndPendingAssignmentsByGroups.put(grpId, new Pair<>(stables, pendings));
}
// Numbers for statistics logging
int currentStableAssignmentsSize = tokenizedStableAssignmentsMap.size();
int currentPendingAssignmentsSize = tokenizedPendingAssignmentsMap.size();
int activeLeasesCount = 0;
Set<ReplicationGroupId> prolongableLeaseGroupIds = new HashSet<>();
for (Map.Entry<ReplicationGroupId, Pair<Set<Assignment>, Set<Assignment>>> entry
: aggregatedStableAndPendingAssignmentsByGroups.entrySet()
) {
ReplicationGroupId grpId = entry.getKey();
Set<Assignment> stableAssignments = entry.getValue().getFirst();
Set<Assignment> pendingAssignments = entry.getValue().getSecond();
Lease lease = requireNonNullElse(leasesCurrent.leaseByGroupId().get(grpId), emptyLease(grpId));
if (lease.isAccepted() && !isLeaseOutdated(lease)) {
activeLeasesCount++;
}
if (!lease.isAccepted()) {
LeaseAgreement agreement = leaseNegotiator.getAndRemoveIfReady(grpId);
agreement.checkValid(grpId, topologyTracker.currentTopologySnapshot(), union(stableAssignments, pendingAssignments));
if (lease.isProlongable() && agreement.isAccepted()) {
Lease negotiatedLease = agreement.getLease();
// Lease information is taken from lease tracker, where it appears on meta storage watch updates, so it can
// contain stale leases, if watch processing was delayed for some reason. It is ok: negotiated lease is
// guaranteed to be already written to meta storage before negotiation begins, and in this case its start time
// would be greater than lease's.
assert negotiatedLease.getStartTime().longValue() >= lease.getStartTime().longValue()
: format("Can't publish the lease that was not negotiated [groupId={}, startTime={}, "
+ "agreementLeaseStartTime={}].", grpId, lease.getStartTime(), agreement.getLease().getStartTime());
publishLease(grpId, negotiatedLease, renewedLeases, leaseExpirationInterval);
continue;
} else if (!lease.isProlongable() || agreement.isDeclined()) {
// Here we initiate negotiations for UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
// Also, if the lease was denied, we create the new one.
chooseCandidateAndCreateNewLease(
grpId,
lease,
agreement,
stableAssignments,
pendingAssignments,
renewedLeases,
toBeNegotiated
);
continue;
}
}
// Calculate candidate for any accepted lease to define whether it can be prolonged (prolongation is possible only
// if the candidate is the same as the current leaseholder).
String proposedLeaseholder = lease.isProlongable()
? lease.getLeaseholder()
: lease.proposedCandidate();
ClusterNode candidate = nextLeaseHolder(stableAssignments, pendingAssignments, grpId, proposedLeaseholder);
boolean canBeProlonged = lease.isAccepted()
&& lease.isProlongable()
&& candidate != null && candidate.id().equals(lease.getLeaseholderId());
// The lease is expired or close to this.
if (lease.getExpirationTime().getPhysical() < outdatedLeaseThreshold) {
// If we couldn't find a candidate neither stable nor pending assignments set, so update stats and skip iteration
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
continue;
}
// We can't prolong the expired lease because we already have an interval of time when the lease was not active,
// so we must start a negotiation round from the beginning; the same we do for the groups that don't have
// leaseholders at all.
if (isLeaseOutdated(lease)) {
// New lease is granted.
Lease newLease = writeNewLease(grpId, candidate, renewedLeases);
boolean force = !lease.isProlongable() && lease.proposedCandidate() != null;
toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force));
} else if (canBeProlonged) {
// Old lease is renewed.
renewedLeases.put(grpId, prolongLease(lease, newExpirationTimestamp));
}
} else if (canBeProlonged) {
prolongableLeaseGroupIds.add(grpId);
}
}
ByteArray key = PLACEMENTDRIVER_LEASES_KEY;
if (shouldLogLeaseStatistics()) {
LOG.info(
"Leases updated (printed once per {} iteration(s)): [inCurrentIteration={}, active={}, "
+ "currentStableAssignmentsSize={}, currentPendingAssignmentsSize={}].",
LEASE_UPDATE_STATISTICS_PRINT_ONCE_PER_ITERATIONS,
leaseUpdateStatistics,
activeLeasesCount,
currentStableAssignmentsSize,
currentPendingAssignmentsSize
);
}
// This condition allows to skip the meta storage invoke when there are no leases to update (renewedLeases.isEmpty()).
// However there is the case when we need to save empty leases collection: when the assignments are empty and
// leasesCurrent (those that reflect the meta storage state) is not empty. The negation of this condition gives us
// the condition to skip the update and the result is:
// !(emptyAssignments && !leasesCurrent.isEmpty()) == (!emptyAssignments || leasesCurrent.isEmpty())
boolean emptyAssignments = aggregatedStableAndPendingAssignmentsByGroups.isEmpty();
if (renewedLeases.isEmpty() && (!emptyAssignments || leasesCurrent.leaseByGroupId().isEmpty())) {
LOG.debug("No leases to update found.");
return;
}
leasesCurrent.leaseByGroupId().forEach(renewedLeases::putIfAbsent);
for (Iterator<Entry<ReplicationGroupId, Lease>> iter = renewedLeases.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<ReplicationGroupId, Lease> entry = iter.next();
ReplicationGroupId groupId = entry.getKey();
Lease lease = entry.getValue();
if (clockService.before(lease.getExpirationTime(), currentTime)
&& !groupsAmongCurrentStableAndPendingAssignments.contains(groupId)) {
iter.remove();
} else if (prolongableLeaseGroupIds.contains(groupId)) {
entry.setValue(prolongLease(lease, newExpirationTimestamp));
}
}
byte[] renewedValue = new LeaseBatch(renewedLeases.values()).bytes();
msManager.invoke(
or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())),
put(key, renewedValue),
noop()
).whenComplete((success, e) -> {
if (e != null) {
if (!hasCause(e, NodeStoppingException.class)) {
failureProcessor.process(new FailureContext(e, "Lease update invocation failed"));
}
return;
}
if (!success) {
LOG.warn("Lease update invocation failed because of outdated lease data on this node.");
return;
}
for (Map.Entry<ReplicationGroupId, LeaseAgreement> entry : toBeNegotiated.entrySet()) {
leaseNegotiator.negotiate(entry.getValue());
}
});
}