in oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java [489:635]
private void runWhenPermitted() {
if (indexStats.isPaused()) {
if (indexStats.forcedLeaseRelease){
try {
clearLease();
} catch (CommitFailedException e) {
log.warn("Unable to release lease, please try again", e);
}
indexStats.forcedLeaseRelease = false;
}
log.debug("[{}] Ignoring the run as indexing is paused", name);
return;
}
log.debug("[{}] Running background index task", name);
NodeState root = store.getRoot();
NodeState async = root.getChildNode(ASYNC);
if (isLeaseCheckEnabled(leaseTimeOut)) {
// check for concurrent updates
long leaseEndTime = async.getLong(leasify(name));
long currentTime = System.currentTimeMillis();
if (leaseEndTime > currentTime) {
long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
String err = String.format(CONCURRENT_EXCEPTION_MSG +
"Time left for lease to expire %d s. Indexing can resume by %tT", leaseExpMsg, leaseEndTime);
indexStats.failed(new Exception(err, newConcurrentUpdateException()));
return;
}
}
// start collecting runtime statistics
preAsyncRunStatsStats(indexStats);
// find the last indexed state, and check if there are recent changes
NodeState before;
String beforeCheckpoint = async.getString(name);
AsyncUpdateCallback callback = newAsyncUpdateCallback(store,
name, leaseTimeOut, beforeCheckpoint, indexStats,
forcedStopFlag);
if (beforeCheckpoint != null) {
NodeState state = store.retrieve(beforeCheckpoint);
if (state == null) {
// to make sure we're not reading a stale root rev, we're
// attempting a write+read via the lease-grab mechanics
try {
callback.initLease();
} catch (CommitFailedException e) {
indexStats.failed(e);
return;
}
root = store.getRoot();
beforeCheckpoint = root.getChildNode(ASYNC).getString(name);
if (beforeCheckpoint != null) {
state = store.retrieve(beforeCheckpoint);
callback.setCheckpoint(beforeCheckpoint);
}
}
if (state == null) {
log.warn(
"[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
name, beforeCheckpoint);
beforeCheckpoint = null;
callback.setCheckpoint(beforeCheckpoint);
before = MISSING_NODE;
} else if (noVisibleChanges(state, root) && !switchOnSync) {
log.debug(
"[{}] No changes since last checkpoint; skipping the index update",
name);
postAsyncRunStatsStatus(indexStats);
return;
} else {
before = state;
}
} else {
log.info("[{}] Initial index update", name);
before = MISSING_NODE;
}
// there are some recent changes, so let's create a new checkpoint
String afterTime = now();
String oldThreadName = Thread.currentThread().getName();
boolean threadNameChanged = false;
String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
"creator", AsyncIndexUpdate.class.getSimpleName(),
"created", afterTime,
"thread", oldThreadName,
"name", name));
NodeState after = store.retrieve(afterCheckpoint);
if (after == null) {
log.debug(
"[{}] Unable to retrieve newly created checkpoint {}, skipping the index update",
name, afterCheckpoint);
//Do not update the status as technically the run is not complete
return;
}
AtomicReference<String> checkpointToReleaseRef = new AtomicReference<>(afterCheckpoint);
boolean updatePostRunStatus = false;
try {
String newThreadName = "async-index-update-" + name;
log.trace("Switching thread name to {}", newThreadName);
threadNameChanged = true;
Thread.currentThread().setName(newThreadName);
updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
afterCheckpoint, afterTime, callback, checkpointToReleaseRef);
// the update succeeded, i.e. it no longer fails
if (indexStats.didLastIndexingCycleFailed()) {
indexStats.fixed();
}
// the update succeeded, so we are sure we can release the earlier checkpoint -
// otherwise the new checkpoint associated with the failed update
// may still get released in the finally block (depending on where the index update failed)
checkpointToReleaseRef.set(beforeCheckpoint);
indexStats.setReferenceCheckpoint(afterCheckpoint);
indexStats.setProcessedCheckpoint("");
indexStats.releaseTempCheckpoint(afterCheckpoint);
} catch (Exception e) {
indexStats.failed(e);
} finally {
if (threadNameChanged) {
log.trace("Switching thread name back to {}", oldThreadName);
Thread.currentThread().setName(oldThreadName);
}
// null during initial indexing
// and skip release if this cp was used in a split operation
String checkpointToRelease = checkpointToReleaseRef.get();
if (checkpointToRelease != null
&& !checkpointToRelease.equals(taskSplitter
.getLastReferencedCp())) {
if (!store.release(checkpointToRelease)) {
log.debug("[{}] Unable to release checkpoint {}", name,
checkpointToRelease);
}
}
maybeCleanUpCheckpoints();
if (updatePostRunStatus) {
postAsyncRunStatsStatus(indexStats);
}
}
}