public int recover()

in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java [266:567]


    public int recover(final Iterable<NodeDocument> suspects,
                       final int clusterId, final boolean dryRun)
            throws DocumentStoreException {
        // set a deadline if this is a self recovery. Self recovery does not
        // update the lease in a background thread and must terminate before
        // the lease acquired by the recovery lock expires.
        long deadline = Long.MAX_VALUE;
        if (clusterId == revisionContext.getClusterId()) {
            ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
            if (nodeInfo != null && nodeInfo.isActive()) {
                long defaultDeadline = nodeInfo.getLeaseEndTime() - ClusterNodeInfo.DEFAULT_LEASE_FAILURE_MARGIN_MILLIS;
                deadline = SYNC_RECOVERY_TIMEOUT_MILLIS < 0 ? defaultDeadline : Math.min(defaultDeadline, revisionContext.getClock().millis() + SYNC_RECOVERY_TIMEOUT_MILLIS);
                if (deadline != defaultDeadline) {
                    log.info("Adjusted deadline for synchronous recovery from {} to {}.",
                            LocalDateTime.ofEpochSecond(defaultDeadline / 1000, 0, ZoneOffset.UTC),
                            LocalDateTime.ofEpochSecond(deadline / 1000, 0, ZoneOffset.UTC));
                }
            }
        }

        NodeDocument rootDoc = Utils.getRootDocument(store);

        // first run a sweep
        final AtomicReference<Revision> sweepRev = new AtomicReference<>();
        if (rootDoc.getSweepRevisions().getRevision(clusterId) != null) {
            // only run a sweep for a cluster node that already has a
            // sweep revision. Initial sweep is not the responsibility
            // of the recovery agent.
            final RevisionContext context = new RecoveryContext(rootDoc,
                    revisionContext.getClock(), clusterId,
                    revisionContext::getCommitValue);
            final NodeDocumentSweeper sweeper = new NodeDocumentSweeper(context, true);
            // make sure recovery does not run on stale cache
            // invalidate all suspects (OAK-9908)
            log.info("Starting cache invalidation before sweep...");
            CacheInvalidationStats stats = store.invalidateCache(
                    IterableUtils.transform(suspects, Document::getId));
            log.info("Invalidation stats: {}", stats);
            sweeper.sweep(suspects, new NodeDocumentSweepListener() {
                @Override
                public void sweepUpdate(Map<Path, UpdateOp> updates)
                        throws DocumentStoreException {
                    if (dryRun) {
                        log.info("Dry run of sweeper identified [{}] documents for " +
                                        "cluster node [{}]: {}", updates.size(), clusterId,
                                updates.values());
                        return;
                    }
                    // create an invalidate entry
                    JournalEntry inv = JOURNAL.newDocument(store);
                    inv.modified(updates.keySet());
                    Revision r = context.newRevision().asBranchRevision();
                    UpdateOp invOp = inv.asUpdateOp(r);
                    // and reference it from a regular entry
                    JournalEntry entry = JOURNAL.newDocument(store);
                    entry.invalidate(Collections.singleton(r));
                    Revision jRev = context.newRevision();
                    UpdateOp jOp = entry.asUpdateOp(jRev);
                    if (!store.create(JOURNAL, new ArrayList<>(Arrays.asList(invOp, jOp)))) {
                        String msg = "Unable to create journal entries for " +
                                "document invalidation.";
                        throw new DocumentStoreException(msg);
                    }
                    sweepRev.set(Utils.max(sweepRev.get(), jRev));
                    // now that journal entry is in place, perform the actual
                    // updates on the documents
                    store.createOrUpdate(NODES, new ArrayList<>(updates.values()));
                    log.info("Sweeper updated {}", updates.keySet());
                }
            });

            if (sweepRev.get() != null) {
                // One or more journal entries were created by the sweeper.
                // Make sure the sweep revision is different / newer than the
                // last journal entry written so far. UnsavedModification
                // further down needs a new revision for its journal entry.
                sweepRev.set(Utils.max(sweepRev.get(), context.newRevision()));
            }
        }

        // now deal with missing _lastRev updates
        UnsavedModifications unsaved = new UnsavedModifications();
        UnsavedModifications unsavedParents = new UnsavedModifications();

        //Map of known last rev of checked paths
        Map<Path, Revision> knownLastRevOrModification = MapFactory.getInstance().create();
        JournalEntry changes = JOURNAL.newDocument(store);

        Clock clock = revisionContext.getClock();

        long totalCount = 0;
        long lastCount = 0;
        long startOfScan = clock.getTime();
        long lastLog = startOfScan;

        final List<Revision> pseudoBcRevs = new ArrayList<>();
        int nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
        for (NodeDocument doc : suspects) {
            totalCount++;
            lastCount++;

            long now = clock.getTime();
            long lastElapsed = now - lastLog;
            if (lastElapsed >= LOGINTERVALMS) {
                TimeDurationFormatter df = TimeDurationFormatter.forLogging();

                long totalElapsed = now - startOfScan;
                long totalRateMin = (totalCount * TimeUnit.MINUTES.toMillis(1)) / totalElapsed;
                long lastRateMin = (lastCount * TimeUnit.MINUTES.toMillis(1)) / lastElapsed;

                String message = String.format(
                        "Recovery for cluster node [%d]: %d nodes scanned in %s (~%d/m) - last interval %d nodes in %s (~%d/m)",
                        clusterId, totalCount, df.format(totalElapsed, TimeUnit.MILLISECONDS), totalRateMin, lastCount,
                        df.format(lastElapsed, TimeUnit.MILLISECONDS), lastRateMin);

                log.info(message);
                lastLog = now;
                lastCount = 0;
            }

            Revision currentLastRev = doc.getLastRev().get(clusterId);

            // 1. determine last committed modification on document
            Revision lastModifiedRev = determineLastModification(doc, clusterId);

            Revision lastRevForParents = Utils.max(lastModifiedRev, currentLastRev);
            // remember the higher of the two revisions. this is the
            // most recent revision currently obtained from either a
            // _lastRev entry or an explicit modification on the document
            if (lastRevForParents != null) {
                knownLastRevOrModification.put(doc.getPath(), lastRevForParents);
            }

            //If both currentLastRev and lostLastRev are null it means
            //that no change is done by suspect cluster on this document
            //so nothing needs to be updated. Probably it was only changed by
            //other cluster nodes. If this node is parent of any child node which
            //has been modified by cluster then that node roll up would
            //add this node path to unsaved

            //2. Update lastRev for parent paths aka rollup
            if (lastRevForParents != null) {
                Path path = doc.getPath();
                changes.modified(path); // track all changes
                while (true) {
                    path = path.getParent();
                    if (path == null) {
                        break;
                    }
                    unsavedParents.put(path, lastRevForParents);
                }
            }
            // avoid recalculating the size of the updateOp upon every single path
            // but also avoid doing it only after we hit the 16MB limit
            if (changes.getNumChangedNodes() >= nextFlushCheckCount) {
                final Revision pseudoBcRev = Revision.newRevision(clusterId).asBranchRevision();
                final UpdateOp pseudoBcUpdateOp = changes.asUpdateOp(pseudoBcRev);
                final int approxPseudoBcUpdateOpSize = pseudoBcUpdateOp.toString().length();
                if (approxPseudoBcUpdateOpSize >= PSEUDO_BRANCH_COMMIT_UPDATE_OP_THRESHOLD_BYTES) {
                    // flush the (pseudo) journal entry
                    // regarding 'pseudo' : this journal entry, while being a branch commit,
                    // does not correspond to an actual branch commit that happened before the crash.
                    // we might be able to in theory reconstruct the very original branch commits,
                    // but that's a tedious job, and we were not doing that prior to OAK-9535 neither.
                    // hence the optimization built-in here is that we create a journal entry
                    // of type 'branch commit', but with a revision that is different from
                    // what originally happened. Thx to the fact that the JournalEntry just
                    // contains a list of branch commit journal ids, that should work fine.
                    if (store.create(JOURNAL, singletonList(pseudoBcUpdateOp))) {
                        log.info("recover : created intermediate pseudo-bc journal entry with rev {} and approx size {} bytes.",
                                pseudoBcRev, approxPseudoBcUpdateOpSize);
                        pseudoBcRevs.add(pseudoBcRev);
                        changes = JOURNAL.newDocument(store);
                        nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
                    } else {
                        log.warn("recover : could not create intermediate pseudo-bc journal entry with rev {}",
                                pseudoBcRev);
                        // retry a little later then, hence reduce the next counter by half an interval
                        nextFlushCheckCount += changes.getNumChangedNodes() + (PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT / 2);
                    }
                } else {
                    nextFlushCheckCount = changes.getNumChangedNodes() + PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
                }
            }
        }
        // propagate the pseudoBcRevs to the changes
        changes.branchCommit(pseudoBcRevs);

        for (Path parentPath : unsavedParents.getPaths()) {
            Revision calcLastRev = unsavedParents.get(parentPath);
            Revision knownLastRev = knownLastRevOrModification.get(parentPath);
            if (knownLastRev == null) {
                List<Path> missingDocuments = new ArrayList<>();
                // we don't know when the document was last modified with
                // the given clusterId. need to read from store
                NodeDocument doc = findNearestAncestorOrSelf(parentPath, missingDocuments);
                if (doc != null) {
                    Revision lastRev = doc.getLastRev().get(clusterId);
                    Revision lastMod = determineLastModification(doc, clusterId);
                    knownLastRev = Utils.max(lastRev, lastMod);

                    if (!missingDocuments.isEmpty()
                            && doc.getLocalMap(DocumentBundlor.META_PROP_PATTERN).isEmpty()) {
                        // there are missing document and the returned document
                        // does not have bundled nodes
                        for (Path p : missingDocuments) {
                            log.warn("Unable to find document: {}", Utils.getIdFromPath(p));
                        }
                    }
                }
            }

            //Copy the calcLastRev of parent only if they have changed
            //In many case it might happen that parent have consistent lastRev
            //This check ensures that unnecessary updates are not made
            if (knownLastRev == null
                    || calcLastRev.compareRevisionTime(knownLastRev) > 0) {
                unsaved.put(parentPath, calcLastRev);
            }
        }

        if (sweepRev.get() != null) {
            unsaved.put(Path.ROOT, sweepRev.get());
        }

        // take the root's lastRev
        final Revision lastRootRev = unsaved.get(Path.ROOT);

        //Note the size before persist as persist operation
        //would empty the internal state
        int size = unsaved.getPaths().size();
        String updates = unsaved.toString();

        if (dryRun) {
            log.info("Dry run of lastRev recovery identified [{}] documents for " +
                    "cluster node [{}]: {}", size, clusterId, updates);
        } else {
            // check deadline before the update
            if (clock.getTime() > deadline) {
                String msg = String.format("Cluster node %d was unable to " +
                        "perform lastRev recovery for clusterId %d within " +
                        "deadline: %s", clusterId, clusterId,
                        Utils.timestampToString(deadline));
                throw new DocumentStoreException(msg);
            }

            //UnsavedModifications is designed to be used in concurrent
            //access mode. For recovery case there is no concurrent access
            //involve so just pass a new lock instance

            // the lock uses to do the persisting is a plain reentrant lock
            // thus it doesn't matter, where exactly the check is done
            // as to whether the recovered lastRev has already been
            // written to the journal.
            final JournalEntry finalChanges = changes;
            unsaved.persist(store, new Supplier<Revision>() {
                @Override
                public Revision get() {
                    return sweepRev.get();
                }
            }, new UnsavedModifications.Snapshot() {

                @Override
                public void acquiring(Revision mostRecent) {
                    if (lastRootRev == null) {
                        // this should never happen - when unsaved has no changes
                        // that is reflected in the 'map' to be empty - in that
                        // case 'persist()' quits early and never calls
                        // acquiring() here.
                        //
                        // but even if it would occur - if we have no lastRootRev
                        // then we cannot and probably don't have to persist anything
                        return;
                    }

                    final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
                    final JournalEntry existingEntry = store.find(Collection.JOURNAL, id);
                    if (existingEntry != null) {
                        // then the journal entry was already written - as can happen if
                        // someone else (or the original instance itself) wrote the
                        // journal entry, then died.
                        // in this case, don't write it again.
                        // hence: nothing to be done here. return.
                        log.warn("Journal entry {} already exists", id);
                        return;
                    }

                    // otherwise store a new journal entry now
                    if (store.create(JOURNAL, singletonList(finalChanges.asUpdateOp(lastRootRev)))) {
                        log.info("Recovery created journal entry {}", id);
                    } else {
                        log.warn("Unable to create journal entry {} (already exists).", id);
                    }
                }
            }, new ReentrantLock());

            log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
                    "cluster node [{}]: {}", size, clusterId, updates);
        }

        return size;
    }