private void runWhenPermitted()

in oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java [489:635]


    private void runWhenPermitted() {
        if (indexStats.isPaused()) {
            if (indexStats.forcedLeaseRelease){
                try {
                    clearLease();
                } catch (CommitFailedException e) {
                    log.warn("Unable to release lease, please try again", e);
                }
                indexStats.forcedLeaseRelease = false;
            }
            log.debug("[{}] Ignoring the run as indexing is paused", name);
            return;
        }
        log.debug("[{}] Running background index task", name);

        NodeState root = store.getRoot();
        NodeState async = root.getChildNode(ASYNC);

        if (isLeaseCheckEnabled(leaseTimeOut)) {
            // check for concurrent updates
            long leaseEndTime = async.getLong(leasify(name));
            long currentTime = System.currentTimeMillis();
            if (leaseEndTime > currentTime) {
                long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
                String err = String.format(CONCURRENT_EXCEPTION_MSG +
                        "Time left for lease to expire %d s. Indexing can resume by %tT", leaseExpMsg, leaseEndTime);
                indexStats.failed(new Exception(err, newConcurrentUpdateException()));
                return;
            }
        }

        // start collecting runtime statistics
        preAsyncRunStatsStats(indexStats);

        // find the last indexed state, and check if there are recent changes
        NodeState before;
        String beforeCheckpoint = async.getString(name);
        AsyncUpdateCallback callback = newAsyncUpdateCallback(store,
                name, leaseTimeOut, beforeCheckpoint, indexStats,
                forcedStopFlag);
        if (beforeCheckpoint != null) {
            NodeState state = store.retrieve(beforeCheckpoint);
            if (state == null) {
                // to make sure we're not reading a stale root rev, we're
                // attempting a write+read via the lease-grab mechanics
                try {
                    callback.initLease();
                } catch (CommitFailedException e) {
                    indexStats.failed(e);
                    return;
                }
                root = store.getRoot();
                beforeCheckpoint = root.getChildNode(ASYNC).getString(name);
                if (beforeCheckpoint != null) {
                    state = store.retrieve(beforeCheckpoint);
                    callback.setCheckpoint(beforeCheckpoint);
                }
            }

            if (state == null) {
                log.warn(
                        "[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
                        name, beforeCheckpoint);
                beforeCheckpoint = null;
                callback.setCheckpoint(beforeCheckpoint);
                before = MISSING_NODE;
            } else if (noVisibleChanges(state, root) && !switchOnSync) {
                log.debug(
                        "[{}] No changes since last checkpoint; skipping the index update",
                        name);
                postAsyncRunStatsStatus(indexStats);
                return;
            } else {
                before = state;
            }
        } else {
            log.info("[{}] Initial index update", name);
            before = MISSING_NODE;
        }

        // there are some recent changes, so let's create a new checkpoint
        String afterTime = now();
        String oldThreadName = Thread.currentThread().getName();
        boolean threadNameChanged = false;
        String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
                "creator", AsyncIndexUpdate.class.getSimpleName(),
                "created", afterTime,
                "thread", oldThreadName,
                "name", name));
        NodeState after = store.retrieve(afterCheckpoint);
        if (after == null) {
            log.debug(
                    "[{}] Unable to retrieve newly created checkpoint {}, skipping the index update",
                    name, afterCheckpoint);
            //Do not update the status as technically the run is not complete
            return;
        }

        AtomicReference<String> checkpointToReleaseRef = new AtomicReference<>(afterCheckpoint);
        boolean updatePostRunStatus = false;
        try {
            String newThreadName = "async-index-update-" + name;
            log.trace("Switching thread name to {}", newThreadName);
            threadNameChanged = true;
            Thread.currentThread().setName(newThreadName);
            updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
                    afterCheckpoint, afterTime, callback, checkpointToReleaseRef);

            // the update succeeded, i.e. it no longer fails
            if (indexStats.didLastIndexingCycleFailed()) {
                indexStats.fixed();
            }

            // the update succeeded, so we are sure we can release the earlier checkpoint -
            // otherwise the new checkpoint associated with the failed update
            // may still get released in the finally block (depending on where the index update failed)
            checkpointToReleaseRef.set(beforeCheckpoint);
            indexStats.setReferenceCheckpoint(afterCheckpoint);
            indexStats.setProcessedCheckpoint("");
            indexStats.releaseTempCheckpoint(afterCheckpoint);

        } catch (Exception e) {
            indexStats.failed(e);

        } finally {
            if (threadNameChanged) {
                log.trace("Switching thread name back to {}", oldThreadName);
                Thread.currentThread().setName(oldThreadName);
            }
            // null during initial indexing
            // and skip release if this cp was used in a split operation
            String checkpointToRelease = checkpointToReleaseRef.get();
            if (checkpointToRelease != null
                    && !checkpointToRelease.equals(taskSplitter
                    .getLastReferencedCp())) {
                if (!store.release(checkpointToRelease)) {
                    log.debug("[{}] Unable to release checkpoint {}", name,
                            checkpointToRelease);
                }
            }
            maybeCleanUpCheckpoints();

            if (updatePostRunStatus) {
                postAsyncRunStatsStatus(indexStats);
            }
        }
    }