in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java [891:1089]
public boolean renewLease() throws DocumentStoreException {
long now = getCurrentTime();
if (LOG.isTraceEnabled()) {
LOG.trace("renewLease - leaseEndTime: " + leaseEndTime + ", leaseTime: " + leaseTime + ", leaseUpdateInterval: " + leaseUpdateInterval);
}
if (now < leaseEndTime - leaseTime + leaseUpdateInterval) {
// no need to renew the lease - it is still within 'leaseUpdateInterval'
return false;
}
// lease requires renewal
long updatedLeaseEndTime;
synchronized(this) {
// this is synchronized since access to leaseCheckFailed and leaseEndTime
// are both normally synchronized to propagate values between renewLease()
// and performLeaseCheck().
// (there are unsynchronized accesses to both of these as well - however
// they are both double-checked - and with both reading a stale value is thus OK)
if (leaseCheckFailed) {
// prevent lease renewal after it failed
throw leaseExpired(LEASE_CHECK_FAILED_MSG + " (since " + asISO8601(leaseEndTime) + ")", true);
}
// synchronized could have delayed the 'now', so
// set it again..
now = getCurrentTime();
updatedLeaseEndTime = now + leaseTime;
}
if (leaseCheckMode == LeaseCheckMode.STRICT) {
// check whether the lease is still valid and can be renewed
if (isLeaseExpired(now)) {
synchronized (this) {
if (leaseCheckFailed) {
// some other thread already noticed and calls failure handler
throw leaseExpired(LEASE_CHECK_FAILED_MSG + " (since " + asISO8601(leaseEndTime) + ")", true);
}
// current thread calls failure handler
// outside synchronized block
leaseCheckFailed = true;
}
String template = "%s (mode: %s, leaseEndTime: %d (%s), leaseTime: %d, leaseFailureMargin: %d, "
+ "lease check end time (leaseEndTime - leaseFailureMargin): %d (%s), now: %d (%s), remaining: %d)"
+ " Need to stop oak-store-document/DocumentNodeStoreService.";
String errorMsg = String.format(template, LEASE_CHECK_FAILED_MSG, leaseCheckMode.name(), leaseEndTime,
asISO8601(leaseEndTime), leaseTime, leaseFailureMargin, leaseEndTime - leaseFailureMargin,
asISO8601(leaseEndTime - leaseFailureMargin), now, asISO8601(now), (leaseEndTime - leaseFailureMargin) - now);
LOG.error(errorMsg);
handleLeaseFailure(errorMsg);
// should never be reached: handleLeaseFailure throws a DocumentStoreException
return false;
}
}
UpdateOp update = new UpdateOp("" + id, false);
// Update the field only if the newer value is higher (or doesn't exist)
update.max(LEASE_END_KEY, updatedLeaseEndTime);
if (leaseCheckMode != LeaseCheckMode.DISABLED) {
// if leaseCheckDisabled, then we just update the lease without
// checking
// OAK-3398:
// if we renewed the lease ever with this instance/ClusterNodeInfo
// (which is the normal case.. except for startup),
// then we can now make an assertion that the lease is unchanged
// and the incremental update must only succeed if no-one else
// did a recover/inactivation in the meantime
// make three assertions: it must still be active
update.equals(STATE, null, ACTIVE.name());
// plus it must not have a recovery lock on it
update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
// and the runtimeId that we create at startup each time
// should be the same
update.equals(RUNTIME_ID_KEY, null, runtimeId);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Renewing lease for cluster id " + id + " with UpdateOp " + update);
}
Stopwatch sw = Stopwatch.createStarted();
DocumentStoreException dse;
Object result = null;
try {
ClusterNodeInfoDocument doc = store.findAndUpdate(Collection.CLUSTER_NODES, update);
result = doc;
if (doc == null) { // should not occur when leaseCheckDisabled
// OAK-3398 : someone else either started recovering or is already through with that.
// in both cases the local instance lost the lease-update-game - and hence
// should behave and must consider itself as 'lease failed'
synchronized(this) {
if (leaseCheckFailed) {
// somehow the instance figured out otherwise that the
// lease check failed - so we don't have to too - so we just log/throw
throw leaseExpired(LEASE_CHECK_FAILED_MSG, true);
}
leaseCheckFailed = true; // make sure only one thread 'wins', ie goes any further
}
String errorMsg = LEASE_CHECK_FAILED_MSG
+ " (Could not update lease anymore, someone else in the cluster "
+ "must have noticed this instance' slowness already. "
+ "Going to invoke leaseFailureHandler!)";
// try to add more diagnostics
try {
ClusterNodeInfoDocument current = store.find(Collection.CLUSTER_NODES, "" + id);
if (current != null) {
Object leaseEnd = current.get(LEASE_END_KEY);
Object recoveryLock = current.get(REV_RECOVERY_LOCK);
Object recoveryBy = current.get(REV_RECOVERY_BY);
Object cnState = current.get(STATE);
errorMsg += " (leaseEnd: " + leaseEnd + " (expected: " + leaseEndTime + ")" +
", (state: " + cnState + " (expected: " + ACTIVE.name() + ")" +
", recoveryLock: " + recoveryLock +
", recoveryBy: " + recoveryBy + ")";
}
} catch (DocumentStoreException ex) {
LOG.error("trying to read ClusterNodeInfo for cluster id " + id, ex);
}
LOG.error(errorMsg);
handleLeaseFailure(errorMsg);
// should never be reached: handleLeaseFailure throws a DocumentStoreException
return false;
}
leaseEndTime = updatedLeaseEndTime;
String mode = (String) doc.get(READ_WRITE_MODE_KEY);
if (mode != null && !mode.equals(readWriteMode)) {
readWriteMode = mode;
store.setReadWriteMode(mode);
}
return true;
} catch (DocumentStoreException e) {
dse = e;
result = e.toString();
} finally {
sw.stop();
String msg = "Lease renewal for cluster id {} took {}, resulted in: {}";
if (sw.elapsed(TimeUnit.SECONDS) > 10) {
LOG.warn(msg, id, sw, result);
} else if (sw.elapsed(TimeUnit.SECONDS) > 1) {
LOG.info(msg, id, sw, result);
} else if (LOG.isDebugEnabled()) {
LOG.debug(msg, id, sw, result);
}
}
// if we get here, the update failed with an exception, try to read the
// current cluster node info document and update leaseEndTime
// accordingly until leaseEndTime is reached
while (getCurrentTime() < updatedLeaseEndTime) {
synchronized (this) {
if (leaseCheckFailed) {
// no need to read from store, lease check already failed
break;
}
}
long t1 = clock.getTime();
ClusterNodeInfoDocument doc;
try {
doc = store.find(Collection.CLUSTER_NODES, String.valueOf(id));
} catch (DocumentStoreException e) {
LOG.warn("Reading ClusterNodeInfoDocument for id " + id + " failed", e);
// do not retry more than once a second
try {
clock.waitUntil(t1 + 1000);
} catch (InterruptedException iex) {
// ignore
}
continue;
}
if (doc != null) {
if (!doc.isActive()) {
LOG.warn("ClusterNodeInfoDocument for id {} is not active " +
"anymore. {}", id, doc);
// break here and let the next lease update attempt fail
break;
} else if (runtimeId.equals(doc.getRuntimeId())) {
// set lease end time to current value, as they belong
// to this same cluster node
leaseEndTime = doc.getLeaseEndTime();
break;
} else {
// leaseEndTime is neither the previous nor the new value
// another cluster node must have updated the leaseEndTime
// break here and let the next lease update attempt fail
break;
}
} else {
LOG.warn("ClusterNodeInfoDocument for id {} does not exist anymore", id);
break;
}
}
throw dse;
}