public DocumentNodeStore()

in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java [565:855]


    public DocumentNodeStore(DocumentNodeStoreBuilder<?> builder) {
        this.nodeCachePredicate = builder.getNodeCachePathPredicate();
        this.updateLimit = builder.getUpdateLimit();
        this.commitValueResolver = new CachingCommitValueResolver(
                builder.getCommitValueCacheSize(), this::getSweepRevisions)
                .withEmptyCommitValueCache(
                        builder.getCacheEmptyCommitValue() && builder.getReadOnlyMode(),
                        builder.getClock(), builder.getJournalGCMaxAge());
        this.blobStore = builder.getBlobStore();
        this.nodeStoreStatsCollector = builder.getNodeStoreStatsCollector();
        if (builder.isUseSimpleRevision()) {
            this.simpleRevisionCounter = new AtomicInteger(0);
        }
        DocumentStore s = builder.getDocumentStore();
        checkServerTimeDifference(s);
        if (builder.getTiming()) {
            s = new TimingDocumentStoreWrapper(s);
        }
        if (builder.getLogging()) {
            if (builder.getLoggingPrefix() != null) {
                s = new LoggingDocumentStoreWrapper(s, builder.getLoggingPrefix());
            } else {
                s = new LoggingDocumentStoreWrapper(s);
            }
        }
        if (builder.getReadOnlyMode()) {
            s = ReadOnlyDocumentStoreWrapperFactory.getInstance(s);
            readOnlyMode = true;
        } else {
            readOnlyMode = false;
        }
        checkVersion(s, readOnlyMode);
        this.nonLeaseCheckingStore = s;
        this.executor = builder.getExecutor();
        this.lastRevSeeker = builder.createMissingLastRevSeeker();
        this.clock = builder.getClock();

        int cid = builder.getClusterId();
        cid = SystemPropertySupplier.create("oak.documentMK.clusterId", cid).loggingTo(LOG).get();
        if (readOnlyMode) {
            clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(nonLeaseCheckingStore);
        } else {
            clusterNodeInfo = ClusterNodeInfo.getInstance(nonLeaseCheckingStore,
                    new RecoveryHandlerImpl(nonLeaseCheckingStore, clock, lastRevSeeker),
                    null, null, cid, builder.isClusterInvisible());
            checkRevisionAge(nonLeaseCheckingStore, clusterNodeInfo, clock);
        }
        this.clusterId = clusterNodeInfo.getId();

        if (isThrottlingEnabled(builder)) {
            s = new ThrottlingDocumentStoreWrapper(s, builder.getThrottlingStatsCollector());
        }

        clusterNodeInfo.setLeaseCheckMode(builder.getLeaseCheckMode());
        if (builder.getLeaseCheckMode() != LeaseCheckMode.DISABLED) {
            s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
            clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
        }
        String threadNamePostfix = "(" + clusterId + ")";
        leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, stopLeaseUpdateThread),
                "DocumentNodeStore lease update thread " + threadNamePostfix);
        leaseUpdateThread.setDaemon(true);
        if (!readOnlyMode) {
            // OAK-3398 : make lease updating more robust by ensuring it
            // has higher likelihood of succeeding than other threads
            // on a very busy machine - so as to prevent lease timeout.
            leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
            leaseUpdateThread.start();
        }

        this.prefetchFeature = builder.getPrefetchFeature();
        this.cacheWarming = new CacheWarming(s);

        this.journalPropertyHandlerFactory = builder.getJournalPropertyHandlerFactory();
        this.store = s;
        this.changes = newJournalEntry();
        this.branches = new UnmergedBranches();
        this.asyncDelay = builder.getAsyncDelay();
        this.versionGarbageCollector = new VersionGarbageCollector(
                this, builder.createVersionGCSupport());
        this.versionGarbageCollector.setStatisticsProvider(builder.getStatisticsProvider());
        this.versionGarbageCollector.setGCMonitor(builder.getGCMonitor());
        this.journalGarbageCollector = new JournalGarbageCollector(
                this, builder.getJournalGCMaxAge());
        this.referencedBlobs =
                builder.createReferencedBlobs(this);
        this.lastRevRecoveryAgent = new LastRevRecoveryAgent(store, this,
                lastRevSeeker, clusterId -> this.signalClusterStateChange());
        this.disableBranches = builder.isDisableBranches();
        this.missing = new DocumentNodeState(this, new Path("missing"),
                new RevisionVector(new Revision(0, 0, 0))) {
            @Override
            public int getMemory() {
                return 8;
            }
        };

        //TODO Make stats collection configurable as it add slight overhead

        nodeCache = builder.buildNodeCache(this);
        nodeCacheStats = new CacheStats(nodeCache, "Document-NodeState",
                builder.getWeigher(), builder.getNodeCacheSize());

        nodeChildrenCache = builder.buildChildrenCache(this);
        nodeChildrenCacheStats = new CacheStats(nodeChildrenCache, "Document-NodeChildren",
                builder.getWeigher(), builder.getChildrenCacheSize());

        diffCache = builder.getDiffCache(this.clusterId);

        // check if root node exists
        NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath(ROOT));
        if (rootDoc == null) {
            if (readOnlyMode) {
                throw new DocumentStoreException("Unable to initialize a " +
                        "read-only DocumentNodeStore. The DocumentStore nodes " +
                        "collection does not have a root document.");
            }
            // root node is missing: repository is not initialized
            Revision commitRev = newRevision();
            RevisionVector head = new RevisionVector(commitRev);
            Commit commit = new CommitBuilder(this, commitRev, null)
                    .addNode(ROOT)
                    .build();
            try {
                commit.applyToDocumentStore();
            } catch (ConflictException e) {
                commit.rollback();
                throw new IllegalStateException("Conflict while creating root document", e);
            }
            unsavedLastRevisions.put(ROOT, commitRev);
            sweepRevisions = sweepRevisions.pmax(head);
            setRoot(head);
            // make sure _lastRev is written back to store
            backgroundWrite();
            rootDoc = store.find(NODES, Utils.getIdFromPath(ROOT));
            // at this point the root document must exist
            if (rootDoc == null) {
                throw new IllegalStateException("Root document does not exist");
            }
        } else {
            sweepRevisions = sweepRevisions.pmax(rootDoc.getSweepRevisions());
            initializeRootState(rootDoc);
            // check if _lastRev for our clusterId exists
            if (!rootDoc.getLastRev().containsKey(clusterId)) {
                RevisionVector rootRev = getRoot().getRootRevision();
                Revision initialRev = rootRev.getRevision(clusterId);
                if (initialRev == null) {
                    throw new IllegalStateException(
                            "missing revision for clusterId " + clusterId +
                                    ": " + rootRev);
                }
                unsavedLastRevisions.put(ROOT, initialRev);
                // set initial sweep revision
                sweepRevisions = sweepRevisions.pmax(new RevisionVector(initialRev));
                if (!readOnlyMode) {
                    backgroundWrite();
                }
            }
        }

        checkpoints = new Checkpoints(this);
        // initialize branchCommits
        branches.init(store, this, purgeUncommittedRevisions);

        dispatcher = builder.isPrefetchExternalChanges() ?
                new PrefetchDispatcher(getRoot(), executor) :
                new ChangeDispatcher(getRoot());
        commitQueue = new CommitQueue(this);
        commitQueue.setStatisticsCollector(nodeStoreStatsCollector);
        commitQueue.setSuspendTimeoutMillis(builder.getSuspendTimeoutMillis());
        batchCommitQueue = new BatchCommitQueue(store);
        // prepare background threads
        backgroundReadThread = new Thread(
                new BackgroundReadOperation(this, isDisposed),
                "DocumentNodeStore background read thread " + threadNamePostfix);
        backgroundReadThread.setDaemon(true);
        backgroundPurgeThread = new Thread(
                new BackgroundPurgeOperation(this, isDisposed),
                "DocumentNodeStore background purge thread " + threadNamePostfix);
        backgroundPurgeThread.setDaemon(true);
        backgroundUpdateThread = new Thread(
                new BackgroundUpdateOperation(this, isDisposed),
                "DocumentNodeStore background update thread " + threadNamePostfix);
        backgroundUpdateThread.setDaemon(true);
        backgroundSweepThread = new Thread(
                new BackgroundSweepOperation(this, isDisposed),
                "DocumentNodeStore background sweep thread " + threadNamePostfix);
        backgroundSweepThread.setDaemon(true);
        clusterUpdateThread = new Thread(new BackgroundClusterUpdate(this, isDisposed),
                "DocumentNodeStore cluster update thread " + threadNamePostfix);
        clusterUpdateThread.setDaemon(true);
        // now start the background threads
        clusterUpdateThread.start();
        backgroundReadThread.start();
        if (!readOnlyMode) {
            // OAK-8466 - background sweep may take a long time if there is no
            // sweep revision for this clusterId. When this process is suddenly
            // stopped while performing the sweep, a recovery will be needed
            // starting at the timestamp of _lastRev for this clusterId, which
            // is potentially old and the recovery will be expensive. Hence
            // triggering below function to update _lastRev, just before
            // triggering sweep
            runBackgroundUpdateOperations();

            // check if we need a sweep2 *before* doing a backgroundSweep.
            // this enables us to detect a direct Oak <= 1.6 upgrade situation,
            // where a sweep2 is *not* needed.

            // there are 3 different cases with sweep[1]/sweep2:
            // 1) Oak <= 1.6 direct upgrade:
            //    -> no sweep2 is needed as a sweep1 is needed anyway and sweep2
            //       from now on happens as part of it (with the OAK-9176 fix)
            // 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
            //    -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
            //       never ran <= 1.6)
            // 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
            //    -> A (full) sweep2 is needed. This is the main case of OAK-9176.

            // In case 3 there is a valid, recent "_sweepRev" - and
            // we can go ahead and do a "quick" backgroundSweep() here
            // before continuing, to unblock the startup.
            // After that, an async/background task is started for sweep2.

            // which of cases 1-3 we have is determined via 'sweep2LockIfNecessary'
            // and recorded in the settings collection.

            // except for this case detection (which acquires a "sweep2 lock" if needed)
            // we can otherwise continue normally. That means, a sweep1 can
            // be considered as usual.
            // Note that for case 3, doing this normal sweep1 can now also
            // fix some "_bc" - which before OAK-9176 were missing
            // and which sweep2 would separately fix as well - but this is not a problem.

            // Note that by setting the SYS_PROP_DISABLE_SWEEP2 system property
            // the sweep2 is bypassed and the sweep2 status is explicitly stored as "swept".
            final long sweep2Lock;
            if (disableSweep2) {
                try {
                    final Sweep2StatusDocument sweep2Status = Sweep2StatusDocument.readFrom(store);
                    if (sweep2Status == null || !sweep2Status.isSwept()) {
                        // setting the disableSweep2 flag stores this in the repository
                        Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId);
                    }
                } catch(Exception e) {
                    LOG.warn("<init> sweep2 is diabled as instructed by system property ("
                            + SYS_PROP_DISABLE_SWEEP2 + "=true) - however, got an Exception"
                                    + " while storing sweep2 status in the settings collection: " + e, e);
                }
                sweep2Lock = -1;
            } else {
                // So: unless sweep2 is disabled: acquire sweep2 if one is (maybe) necessary
                sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
            }

            // perform an initial document sweep if needed
            // this may be long running if there is no sweep revision
            // for this clusterId (upgrade from Oak <= 1.6).
            // it is therefore important the lease thread is running already.
            backgroundSweep();

            backgroundUpdateThread.start();
            backgroundSweepThread.start();
            backgroundPurgeThread.start();

            if (sweep2Lock >= 0) {
                // sweep2 is necessary - so start a sweep2 background task
                backgroundSweep2Thread = new Thread(
                        new BackgroundSweep2Operation(this, isDisposed, sweep2Lock),
                        "DocumentNodeStore background sweep2 thread " + threadNamePostfix);
                backgroundSweep2Thread.setDaemon(true);
                backgroundSweep2Thread.start();
            }
        }

        persistentCache = builder.getPersistentCache();
        if (!readOnlyMode && persistentCache != null) {
            DynamicBroadcastConfig broadcastConfig = new DocumentBroadcastConfig(this);
            persistentCache.setBroadcastConfig(broadcastConfig);
        }
        journalCache = builder.getJournalCache();

        this.mbean = createMBean(builder);
        LOG.info("ChangeSetBuilder enabled and size set to maxItems: {}, maxDepth: {}", changeSetMaxItems, changeSetMaxDepth);
        LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}, updateLimit: {} ({})",
                clusterId, updateLimit,
                getClusterNodeInfoDisplayString());

        if (!builder.isBundlingDisabled()) {
            bundlingConfigHandler.initialize(this, executor);
        }
    }