in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java [266:567]
public int recover(final Iterable<NodeDocument> suspects,
final int clusterId, final boolean dryRun)
throws DocumentStoreException {
// set a deadline if this is a self recovery. Self recovery does not
// update the lease in a background thread and must terminate before
// the lease acquired by the recovery lock expires.
long deadline = Long.MAX_VALUE;
if (clusterId == revisionContext.getClusterId()) {
ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
if (nodeInfo != null && nodeInfo.isActive()) {
long defaultDeadline = nodeInfo.getLeaseEndTime() - ClusterNodeInfo.DEFAULT_LEASE_FAILURE_MARGIN_MILLIS;
deadline = SYNC_RECOVERY_TIMEOUT_MILLIS < 0 ? defaultDeadline : Math.min(defaultDeadline, revisionContext.getClock().millis() + SYNC_RECOVERY_TIMEOUT_MILLIS);
if (deadline != defaultDeadline) {
log.info("Adjusted deadline for synchronous recovery from {} to {}.",
LocalDateTime.ofEpochSecond(defaultDeadline / 1000, 0, ZoneOffset.UTC),
LocalDateTime.ofEpochSecond(deadline / 1000, 0, ZoneOffset.UTC));
}
}
}
NodeDocument rootDoc = Utils.getRootDocument(store);
// first run a sweep
final AtomicReference<Revision> sweepRev = new AtomicReference<>();
if (rootDoc.getSweepRevisions().getRevision(clusterId) != null) {
// only run a sweep for a cluster node that already has a
// sweep revision. Initial sweep is not the responsibility
// of the recovery agent.
final RevisionContext context = new RecoveryContext(rootDoc,
revisionContext.getClock(), clusterId,
revisionContext::getCommitValue);
final NodeDocumentSweeper sweeper = new NodeDocumentSweeper(context, true);
// make sure recovery does not run on stale cache
// invalidate all suspects (OAK-9908)
log.info("Starting cache invalidation before sweep...");
CacheInvalidationStats stats = store.invalidateCache(
IterableUtils.transform(suspects, Document::getId));
log.info("Invalidation stats: {}", stats);
sweeper.sweep(suspects, new NodeDocumentSweepListener() {
@Override
public void sweepUpdate(Map<Path, UpdateOp> updates)
throws DocumentStoreException {
if (dryRun) {
log.info("Dry run of sweeper identified [{}] documents for " +
"cluster node [{}]: {}", updates.size(), clusterId,
updates.values());
return;
}
// create an invalidate entry
JournalEntry inv = JOURNAL.newDocument(store);
inv.modified(updates.keySet());
Revision r = context.newRevision().asBranchRevision();
UpdateOp invOp = inv.asUpdateOp(r);
// and reference it from a regular entry
JournalEntry entry = JOURNAL.newDocument(store);
entry.invalidate(Collections.singleton(r));
Revision jRev = context.newRevision();
UpdateOp jOp = entry.asUpdateOp(jRev);
if (!store.create(JOURNAL, new ArrayList<>(Arrays.asList(invOp, jOp)))) {
String msg = "Unable to create journal entries for " +
"document invalidation.";
throw new DocumentStoreException(msg);
}
sweepRev.set(Utils.max(sweepRev.get(), jRev));
// now that journal entry is in place, perform the actual
// updates on the documents
store.createOrUpdate(NODES, new ArrayList<>(updates.values()));
log.info("Sweeper updated {}", updates.keySet());
}
});
if (sweepRev.get() != null) {
// One or more journal entries were created by the sweeper.
// Make sure the sweep revision is different / newer than the
// last journal entry written so far. UnsavedModification
// further down needs a new revision for its journal entry.
sweepRev.set(Utils.max(sweepRev.get(), context.newRevision()));
}
}
// now deal with missing _lastRev updates
UnsavedModifications unsaved = new UnsavedModifications();
UnsavedModifications unsavedParents = new UnsavedModifications();
//Map of known last rev of checked paths
Map<Path, Revision> knownLastRevOrModification = MapFactory.getInstance().create();
JournalEntry changes = JOURNAL.newDocument(store);
Clock clock = revisionContext.getClock();
long totalCount = 0;
long lastCount = 0;
long startOfScan = clock.getTime();
long lastLog = startOfScan;
final List<Revision> pseudoBcRevs = new ArrayList<>();
int nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
for (NodeDocument doc : suspects) {
totalCount++;
lastCount++;
long now = clock.getTime();
long lastElapsed = now - lastLog;
if (lastElapsed >= LOGINTERVALMS) {
TimeDurationFormatter df = TimeDurationFormatter.forLogging();
long totalElapsed = now - startOfScan;
long totalRateMin = (totalCount * TimeUnit.MINUTES.toMillis(1)) / totalElapsed;
long lastRateMin = (lastCount * TimeUnit.MINUTES.toMillis(1)) / lastElapsed;
String message = String.format(
"Recovery for cluster node [%d]: %d nodes scanned in %s (~%d/m) - last interval %d nodes in %s (~%d/m)",
clusterId, totalCount, df.format(totalElapsed, TimeUnit.MILLISECONDS), totalRateMin, lastCount,
df.format(lastElapsed, TimeUnit.MILLISECONDS), lastRateMin);
log.info(message);
lastLog = now;
lastCount = 0;
}
Revision currentLastRev = doc.getLastRev().get(clusterId);
// 1. determine last committed modification on document
Revision lastModifiedRev = determineLastModification(doc, clusterId);
Revision lastRevForParents = Utils.max(lastModifiedRev, currentLastRev);
// remember the higher of the two revisions. this is the
// most recent revision currently obtained from either a
// _lastRev entry or an explicit modification on the document
if (lastRevForParents != null) {
knownLastRevOrModification.put(doc.getPath(), lastRevForParents);
}
//If both currentLastRev and lostLastRev are null it means
//that no change is done by suspect cluster on this document
//so nothing needs to be updated. Probably it was only changed by
//other cluster nodes. If this node is parent of any child node which
//has been modified by cluster then that node roll up would
//add this node path to unsaved
//2. Update lastRev for parent paths aka rollup
if (lastRevForParents != null) {
Path path = doc.getPath();
changes.modified(path); // track all changes
while (true) {
path = path.getParent();
if (path == null) {
break;
}
unsavedParents.put(path, lastRevForParents);
}
}
// avoid recalculating the size of the updateOp upon every single path
// but also avoid doing it only after we hit the 16MB limit
if (changes.getNumChangedNodes() >= nextFlushCheckCount) {
final Revision pseudoBcRev = Revision.newRevision(clusterId).asBranchRevision();
final UpdateOp pseudoBcUpdateOp = changes.asUpdateOp(pseudoBcRev);
final int approxPseudoBcUpdateOpSize = pseudoBcUpdateOp.toString().length();
if (approxPseudoBcUpdateOpSize >= PSEUDO_BRANCH_COMMIT_UPDATE_OP_THRESHOLD_BYTES) {
// flush the (pseudo) journal entry
// regarding 'pseudo' : this journal entry, while being a branch commit,
// does not correspond to an actual branch commit that happened before the crash.
// we might be able to in theory reconstruct the very original branch commits,
// but that's a tedious job, and we were not doing that prior to OAK-9535 neither.
// hence the optimization built-in here is that we create a journal entry
// of type 'branch commit', but with a revision that is different from
// what originally happened. Thx to the fact that the JournalEntry just
// contains a list of branch commit journal ids, that should work fine.
if (store.create(JOURNAL, singletonList(pseudoBcUpdateOp))) {
log.info("recover : created intermediate pseudo-bc journal entry with rev {} and approx size {} bytes.",
pseudoBcRev, approxPseudoBcUpdateOpSize);
pseudoBcRevs.add(pseudoBcRev);
changes = JOURNAL.newDocument(store);
nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
} else {
log.warn("recover : could not create intermediate pseudo-bc journal entry with rev {}",
pseudoBcRev);
// retry a little later then, hence reduce the next counter by half an interval
nextFlushCheckCount += changes.getNumChangedNodes() + (PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT / 2);
}
} else {
nextFlushCheckCount = changes.getNumChangedNodes() + PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
}
}
}
// propagate the pseudoBcRevs to the changes
changes.branchCommit(pseudoBcRevs);
for (Path parentPath : unsavedParents.getPaths()) {
Revision calcLastRev = unsavedParents.get(parentPath);
Revision knownLastRev = knownLastRevOrModification.get(parentPath);
if (knownLastRev == null) {
List<Path> missingDocuments = new ArrayList<>();
// we don't know when the document was last modified with
// the given clusterId. need to read from store
NodeDocument doc = findNearestAncestorOrSelf(parentPath, missingDocuments);
if (doc != null) {
Revision lastRev = doc.getLastRev().get(clusterId);
Revision lastMod = determineLastModification(doc, clusterId);
knownLastRev = Utils.max(lastRev, lastMod);
if (!missingDocuments.isEmpty()
&& doc.getLocalMap(DocumentBundlor.META_PROP_PATTERN).isEmpty()) {
// there are missing document and the returned document
// does not have bundled nodes
for (Path p : missingDocuments) {
log.warn("Unable to find document: {}", Utils.getIdFromPath(p));
}
}
}
}
//Copy the calcLastRev of parent only if they have changed
//In many case it might happen that parent have consistent lastRev
//This check ensures that unnecessary updates are not made
if (knownLastRev == null
|| calcLastRev.compareRevisionTime(knownLastRev) > 0) {
unsaved.put(parentPath, calcLastRev);
}
}
if (sweepRev.get() != null) {
unsaved.put(Path.ROOT, sweepRev.get());
}
// take the root's lastRev
final Revision lastRootRev = unsaved.get(Path.ROOT);
//Note the size before persist as persist operation
//would empty the internal state
int size = unsaved.getPaths().size();
String updates = unsaved.toString();
if (dryRun) {
log.info("Dry run of lastRev recovery identified [{}] documents for " +
"cluster node [{}]: {}", size, clusterId, updates);
} else {
// check deadline before the update
if (clock.getTime() > deadline) {
String msg = String.format("Cluster node %d was unable to " +
"perform lastRev recovery for clusterId %d within " +
"deadline: %s", clusterId, clusterId,
Utils.timestampToString(deadline));
throw new DocumentStoreException(msg);
}
//UnsavedModifications is designed to be used in concurrent
//access mode. For recovery case there is no concurrent access
//involve so just pass a new lock instance
// the lock uses to do the persisting is a plain reentrant lock
// thus it doesn't matter, where exactly the check is done
// as to whether the recovered lastRev has already been
// written to the journal.
final JournalEntry finalChanges = changes;
unsaved.persist(store, new Supplier<Revision>() {
@Override
public Revision get() {
return sweepRev.get();
}
}, new UnsavedModifications.Snapshot() {
@Override
public void acquiring(Revision mostRecent) {
if (lastRootRev == null) {
// this should never happen - when unsaved has no changes
// that is reflected in the 'map' to be empty - in that
// case 'persist()' quits early and never calls
// acquiring() here.
//
// but even if it would occur - if we have no lastRootRev
// then we cannot and probably don't have to persist anything
return;
}
final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
final JournalEntry existingEntry = store.find(Collection.JOURNAL, id);
if (existingEntry != null) {
// then the journal entry was already written - as can happen if
// someone else (or the original instance itself) wrote the
// journal entry, then died.
// in this case, don't write it again.
// hence: nothing to be done here. return.
log.warn("Journal entry {} already exists", id);
return;
}
// otherwise store a new journal entry now
if (store.create(JOURNAL, singletonList(finalChanges.asUpdateOp(lastRootRev)))) {
log.info("Recovery created journal entry {}", id);
} else {
log.warn("Unable to create journal entry {} (already exists).", id);
}
}
}, new ReentrantLock());
log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
"cluster node [{}]: {}", size, clusterId, updates);
}
return size;
}