public boolean renewLease()

in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java [891:1089]


    public boolean renewLease() throws DocumentStoreException {
        long now = getCurrentTime();

        if (LOG.isTraceEnabled()) {
            LOG.trace("renewLease - leaseEndTime: " + leaseEndTime + ", leaseTime: " + leaseTime + ", leaseUpdateInterval: " + leaseUpdateInterval);
        }

        if (now < leaseEndTime - leaseTime + leaseUpdateInterval) {
            // no need to renew the lease - it is still within 'leaseUpdateInterval'
            return false;
        }
        // lease requires renewal

        long updatedLeaseEndTime;
        synchronized(this) {
            // this is synchronized since access to leaseCheckFailed and leaseEndTime
            // are both normally synchronized to propagate values between renewLease()
            // and performLeaseCheck().
            // (there are unsynchronized accesses to both of these as well - however
            // they are both double-checked - and with both reading a stale value is thus OK)

            if (leaseCheckFailed) {
                // prevent lease renewal after it failed
                throw leaseExpired(LEASE_CHECK_FAILED_MSG + " (since " + asISO8601(leaseEndTime) + ")", true);
            }
            // synchronized could have delayed the 'now', so
            // set it again..
            now = getCurrentTime();
            updatedLeaseEndTime = now + leaseTime;
        }

        if (leaseCheckMode == LeaseCheckMode.STRICT) {
            // check whether the lease is still valid and can be renewed
            if (isLeaseExpired(now)) {
                synchronized (this) {
                    if (leaseCheckFailed) {
                        // some other thread already noticed and calls failure handler
                        throw leaseExpired(LEASE_CHECK_FAILED_MSG + " (since " + asISO8601(leaseEndTime) + ")", true);
                    }
                    // current thread calls failure handler
                    // outside synchronized block
                    leaseCheckFailed = true;
                }
                String template = "%s (mode: %s, leaseEndTime: %d (%s), leaseTime: %d, leaseFailureMargin: %d, "
                        + "lease check end time (leaseEndTime - leaseFailureMargin): %d (%s), now: %d (%s), remaining: %d)"
                        + " Need to stop oak-store-document/DocumentNodeStoreService.";
                String errorMsg = String.format(template, LEASE_CHECK_FAILED_MSG, leaseCheckMode.name(), leaseEndTime,
                        asISO8601(leaseEndTime), leaseTime, leaseFailureMargin, leaseEndTime - leaseFailureMargin,
                        asISO8601(leaseEndTime - leaseFailureMargin), now, asISO8601(now), (leaseEndTime - leaseFailureMargin) - now);
                LOG.error(errorMsg);
                handleLeaseFailure(errorMsg);
                // should never be reached: handleLeaseFailure throws a DocumentStoreException
                return false;
            }
        }

        UpdateOp update = new UpdateOp("" + id, false);
        // Update the field only if the newer value is higher (or doesn't exist)
        update.max(LEASE_END_KEY, updatedLeaseEndTime);

        if (leaseCheckMode != LeaseCheckMode.DISABLED) {
            // if leaseCheckDisabled, then we just update the lease without
            // checking
            // OAK-3398:
            // if we renewed the lease ever with this instance/ClusterNodeInfo
            // (which is the normal case.. except for startup),
            // then we can now make an assertion that the lease is unchanged
            // and the incremental update must only succeed if no-one else
            // did a recover/inactivation in the meantime
            // make three assertions: it must still be active
            update.equals(STATE, null, ACTIVE.name());
            // plus it must not have a recovery lock on it
            update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
            // and the runtimeId that we create at startup each time
            // should be the same
            update.equals(RUNTIME_ID_KEY, null, runtimeId);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Renewing lease for cluster id " + id + " with UpdateOp " + update);
        }
        Stopwatch sw = Stopwatch.createStarted();
        DocumentStoreException dse;
        Object result = null;
        try {
            ClusterNodeInfoDocument doc = store.findAndUpdate(Collection.CLUSTER_NODES, update);
            result = doc;

            if (doc == null) { // should not occur when leaseCheckDisabled
                // OAK-3398 : someone else either started recovering or is already through with that.
                // in both cases the local instance lost the lease-update-game - and hence
                // should behave and must consider itself as 'lease failed'

                synchronized(this) {
                    if (leaseCheckFailed) {
                        // somehow the instance figured out otherwise that the
                        // lease check failed - so we don't have to too - so we just log/throw
                        throw leaseExpired(LEASE_CHECK_FAILED_MSG, true);
                    }
                    leaseCheckFailed = true; // make sure only one thread 'wins', ie goes any further
                }

                String errorMsg = LEASE_CHECK_FAILED_MSG
                        + " (Could not update lease anymore, someone else in the cluster "
                        + "must have noticed this instance' slowness already. "
                        + "Going to invoke leaseFailureHandler!)";

                // try to add more diagnostics
                try {
                    ClusterNodeInfoDocument current = store.find(Collection.CLUSTER_NODES, "" + id);
                    if (current != null) {
                        Object leaseEnd = current.get(LEASE_END_KEY);
                        Object recoveryLock = current.get(REV_RECOVERY_LOCK);
                        Object recoveryBy = current.get(REV_RECOVERY_BY);
                        Object cnState = current.get(STATE);
                        errorMsg += " (leaseEnd: " + leaseEnd + " (expected: " + leaseEndTime + ")" +
                                ", (state: " + cnState + " (expected: " + ACTIVE.name() + ")" +
                                ", recoveryLock: " + recoveryLock +
                                ", recoveryBy: " + recoveryBy + ")";
                    }
                } catch (DocumentStoreException ex) {
                    LOG.error("trying to read ClusterNodeInfo for cluster id " + id, ex);
                }

                LOG.error(errorMsg);

                handleLeaseFailure(errorMsg);
                // should never be reached: handleLeaseFailure throws a DocumentStoreException
                return false;
            }
            leaseEndTime = updatedLeaseEndTime;
            String mode = (String) doc.get(READ_WRITE_MODE_KEY);
            if (mode != null && !mode.equals(readWriteMode)) {
                readWriteMode = mode;
                store.setReadWriteMode(mode);
            }
            return true;
        } catch (DocumentStoreException e) {
            dse = e;
            result = e.toString();
        } finally {
            sw.stop();
            String msg = "Lease renewal for cluster id {} took {}, resulted in: {}";
            if (sw.elapsed(TimeUnit.SECONDS) > 10) {
                LOG.warn(msg, id, sw, result);
            } else if (sw.elapsed(TimeUnit.SECONDS) > 1) {
                LOG.info(msg, id, sw, result);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug(msg, id, sw, result);
            }
        }
        // if we get here, the update failed with an exception, try to read the
        // current cluster node info document and update leaseEndTime
        // accordingly until leaseEndTime is reached
        while (getCurrentTime() < updatedLeaseEndTime) {
            synchronized (this) {
                if (leaseCheckFailed) {
                    // no need to read from store, lease check already failed
                    break;
                }
            }
            long t1 = clock.getTime();
            ClusterNodeInfoDocument doc;
            try {
                doc = store.find(Collection.CLUSTER_NODES, String.valueOf(id));
            } catch (DocumentStoreException e) {
                LOG.warn("Reading ClusterNodeInfoDocument for id " + id + " failed", e);
                // do not retry more than once a second
                try {
                    clock.waitUntil(t1 + 1000);
                } catch (InterruptedException iex) {
                    // ignore
                }
                continue;
            }
            if (doc != null) {
                if (!doc.isActive()) {
                    LOG.warn("ClusterNodeInfoDocument for id {} is not active " +
                            "anymore. {}", id, doc);
                    // break here and let the next lease update attempt fail
                    break;
                } else if (runtimeId.equals(doc.getRuntimeId())) {
                    // set lease end time to current value, as they belong
                    // to this same cluster node
                    leaseEndTime = doc.getLeaseEndTime();
                    break;
                } else {
                    // leaseEndTime is neither the previous nor the new value
                    // another cluster node must have updated the leaseEndTime
                    // break here and let the next lease update attempt fail
                    break;
                }
            } else {
                LOG.warn("ClusterNodeInfoDocument for id {} does not exist anymore", id);
                break;
            }
        }
        throw dse;
    }