in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java [565:855]
public DocumentNodeStore(DocumentNodeStoreBuilder<?> builder) {
this.nodeCachePredicate = builder.getNodeCachePathPredicate();
this.updateLimit = builder.getUpdateLimit();
this.commitValueResolver = new CachingCommitValueResolver(
builder.getCommitValueCacheSize(), this::getSweepRevisions)
.withEmptyCommitValueCache(
builder.getCacheEmptyCommitValue() && builder.getReadOnlyMode(),
builder.getClock(), builder.getJournalGCMaxAge());
this.blobStore = builder.getBlobStore();
this.nodeStoreStatsCollector = builder.getNodeStoreStatsCollector();
if (builder.isUseSimpleRevision()) {
this.simpleRevisionCounter = new AtomicInteger(0);
}
DocumentStore s = builder.getDocumentStore();
checkServerTimeDifference(s);
if (builder.getTiming()) {
s = new TimingDocumentStoreWrapper(s);
}
if (builder.getLogging()) {
if (builder.getLoggingPrefix() != null) {
s = new LoggingDocumentStoreWrapper(s, builder.getLoggingPrefix());
} else {
s = new LoggingDocumentStoreWrapper(s);
}
}
if (builder.getReadOnlyMode()) {
s = ReadOnlyDocumentStoreWrapperFactory.getInstance(s);
readOnlyMode = true;
} else {
readOnlyMode = false;
}
checkVersion(s, readOnlyMode);
this.nonLeaseCheckingStore = s;
this.executor = builder.getExecutor();
this.lastRevSeeker = builder.createMissingLastRevSeeker();
this.clock = builder.getClock();
int cid = builder.getClusterId();
cid = SystemPropertySupplier.create("oak.documentMK.clusterId", cid).loggingTo(LOG).get();
if (readOnlyMode) {
clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(nonLeaseCheckingStore);
} else {
clusterNodeInfo = ClusterNodeInfo.getInstance(nonLeaseCheckingStore,
new RecoveryHandlerImpl(nonLeaseCheckingStore, clock, lastRevSeeker),
null, null, cid, builder.isClusterInvisible());
checkRevisionAge(nonLeaseCheckingStore, clusterNodeInfo, clock);
}
this.clusterId = clusterNodeInfo.getId();
if (isThrottlingEnabled(builder)) {
s = new ThrottlingDocumentStoreWrapper(s, builder.getThrottlingStatsCollector());
}
clusterNodeInfo.setLeaseCheckMode(builder.getLeaseCheckMode());
if (builder.getLeaseCheckMode() != LeaseCheckMode.DISABLED) {
s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
}
String threadNamePostfix = "(" + clusterId + ")";
leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, stopLeaseUpdateThread),
"DocumentNodeStore lease update thread " + threadNamePostfix);
leaseUpdateThread.setDaemon(true);
if (!readOnlyMode) {
// OAK-3398 : make lease updating more robust by ensuring it
// has higher likelihood of succeeding than other threads
// on a very busy machine - so as to prevent lease timeout.
leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
leaseUpdateThread.start();
}
this.prefetchFeature = builder.getPrefetchFeature();
this.cacheWarming = new CacheWarming(s);
this.journalPropertyHandlerFactory = builder.getJournalPropertyHandlerFactory();
this.store = s;
this.changes = newJournalEntry();
this.branches = new UnmergedBranches();
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(
this, builder.createVersionGCSupport());
this.versionGarbageCollector.setStatisticsProvider(builder.getStatisticsProvider());
this.versionGarbageCollector.setGCMonitor(builder.getGCMonitor());
this.journalGarbageCollector = new JournalGarbageCollector(
this, builder.getJournalGCMaxAge());
this.referencedBlobs =
builder.createReferencedBlobs(this);
this.lastRevRecoveryAgent = new LastRevRecoveryAgent(store, this,
lastRevSeeker, clusterId -> this.signalClusterStateChange());
this.disableBranches = builder.isDisableBranches();
this.missing = new DocumentNodeState(this, new Path("missing"),
new RevisionVector(new Revision(0, 0, 0))) {
@Override
public int getMemory() {
return 8;
}
};
//TODO Make stats collection configurable as it add slight overhead
nodeCache = builder.buildNodeCache(this);
nodeCacheStats = new CacheStats(nodeCache, "Document-NodeState",
builder.getWeigher(), builder.getNodeCacheSize());
nodeChildrenCache = builder.buildChildrenCache(this);
nodeChildrenCacheStats = new CacheStats(nodeChildrenCache, "Document-NodeChildren",
builder.getWeigher(), builder.getChildrenCacheSize());
diffCache = builder.getDiffCache(this.clusterId);
// check if root node exists
NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath(ROOT));
if (rootDoc == null) {
if (readOnlyMode) {
throw new DocumentStoreException("Unable to initialize a " +
"read-only DocumentNodeStore. The DocumentStore nodes " +
"collection does not have a root document.");
}
// root node is missing: repository is not initialized
Revision commitRev = newRevision();
RevisionVector head = new RevisionVector(commitRev);
Commit commit = new CommitBuilder(this, commitRev, null)
.addNode(ROOT)
.build();
try {
commit.applyToDocumentStore();
} catch (ConflictException e) {
commit.rollback();
throw new IllegalStateException("Conflict while creating root document", e);
}
unsavedLastRevisions.put(ROOT, commitRev);
sweepRevisions = sweepRevisions.pmax(head);
setRoot(head);
// make sure _lastRev is written back to store
backgroundWrite();
rootDoc = store.find(NODES, Utils.getIdFromPath(ROOT));
// at this point the root document must exist
if (rootDoc == null) {
throw new IllegalStateException("Root document does not exist");
}
} else {
sweepRevisions = sweepRevisions.pmax(rootDoc.getSweepRevisions());
initializeRootState(rootDoc);
// check if _lastRev for our clusterId exists
if (!rootDoc.getLastRev().containsKey(clusterId)) {
RevisionVector rootRev = getRoot().getRootRevision();
Revision initialRev = rootRev.getRevision(clusterId);
if (initialRev == null) {
throw new IllegalStateException(
"missing revision for clusterId " + clusterId +
": " + rootRev);
}
unsavedLastRevisions.put(ROOT, initialRev);
// set initial sweep revision
sweepRevisions = sweepRevisions.pmax(new RevisionVector(initialRev));
if (!readOnlyMode) {
backgroundWrite();
}
}
}
checkpoints = new Checkpoints(this);
// initialize branchCommits
branches.init(store, this, purgeUncommittedRevisions);
dispatcher = builder.isPrefetchExternalChanges() ?
new PrefetchDispatcher(getRoot(), executor) :
new ChangeDispatcher(getRoot());
commitQueue = new CommitQueue(this);
commitQueue.setStatisticsCollector(nodeStoreStatsCollector);
commitQueue.setSuspendTimeoutMillis(builder.getSuspendTimeoutMillis());
batchCommitQueue = new BatchCommitQueue(store);
// prepare background threads
backgroundReadThread = new Thread(
new BackgroundReadOperation(this, isDisposed),
"DocumentNodeStore background read thread " + threadNamePostfix);
backgroundReadThread.setDaemon(true);
backgroundPurgeThread = new Thread(
new BackgroundPurgeOperation(this, isDisposed),
"DocumentNodeStore background purge thread " + threadNamePostfix);
backgroundPurgeThread.setDaemon(true);
backgroundUpdateThread = new Thread(
new BackgroundUpdateOperation(this, isDisposed),
"DocumentNodeStore background update thread " + threadNamePostfix);
backgroundUpdateThread.setDaemon(true);
backgroundSweepThread = new Thread(
new BackgroundSweepOperation(this, isDisposed),
"DocumentNodeStore background sweep thread " + threadNamePostfix);
backgroundSweepThread.setDaemon(true);
clusterUpdateThread = new Thread(new BackgroundClusterUpdate(this, isDisposed),
"DocumentNodeStore cluster update thread " + threadNamePostfix);
clusterUpdateThread.setDaemon(true);
// now start the background threads
clusterUpdateThread.start();
backgroundReadThread.start();
if (!readOnlyMode) {
// OAK-8466 - background sweep may take a long time if there is no
// sweep revision for this clusterId. When this process is suddenly
// stopped while performing the sweep, a recovery will be needed
// starting at the timestamp of _lastRev for this clusterId, which
// is potentially old and the recovery will be expensive. Hence
// triggering below function to update _lastRev, just before
// triggering sweep
runBackgroundUpdateOperations();
// check if we need a sweep2 *before* doing a backgroundSweep.
// this enables us to detect a direct Oak <= 1.6 upgrade situation,
// where a sweep2 is *not* needed.
// there are 3 different cases with sweep[1]/sweep2:
// 1) Oak <= 1.6 direct upgrade:
// -> no sweep2 is needed as a sweep1 is needed anyway and sweep2
// from now on happens as part of it (with the OAK-9176 fix)
// 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
// -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
// never ran <= 1.6)
// 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
// -> A (full) sweep2 is needed. This is the main case of OAK-9176.
// In case 3 there is a valid, recent "_sweepRev" - and
// we can go ahead and do a "quick" backgroundSweep() here
// before continuing, to unblock the startup.
// After that, an async/background task is started for sweep2.
// which of cases 1-3 we have is determined via 'sweep2LockIfNecessary'
// and recorded in the settings collection.
// except for this case detection (which acquires a "sweep2 lock" if needed)
// we can otherwise continue normally. That means, a sweep1 can
// be considered as usual.
// Note that for case 3, doing this normal sweep1 can now also
// fix some "_bc" - which before OAK-9176 were missing
// and which sweep2 would separately fix as well - but this is not a problem.
// Note that by setting the SYS_PROP_DISABLE_SWEEP2 system property
// the sweep2 is bypassed and the sweep2 status is explicitly stored as "swept".
final long sweep2Lock;
if (disableSweep2) {
try {
final Sweep2StatusDocument sweep2Status = Sweep2StatusDocument.readFrom(store);
if (sweep2Status == null || !sweep2Status.isSwept()) {
// setting the disableSweep2 flag stores this in the repository
Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId);
}
} catch(Exception e) {
LOG.warn("<init> sweep2 is diabled as instructed by system property ("
+ SYS_PROP_DISABLE_SWEEP2 + "=true) - however, got an Exception"
+ " while storing sweep2 status in the settings collection: " + e, e);
}
sweep2Lock = -1;
} else {
// So: unless sweep2 is disabled: acquire sweep2 if one is (maybe) necessary
sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
}
// perform an initial document sweep if needed
// this may be long running if there is no sweep revision
// for this clusterId (upgrade from Oak <= 1.6).
// it is therefore important the lease thread is running already.
backgroundSweep();
backgroundUpdateThread.start();
backgroundSweepThread.start();
backgroundPurgeThread.start();
if (sweep2Lock >= 0) {
// sweep2 is necessary - so start a sweep2 background task
backgroundSweep2Thread = new Thread(
new BackgroundSweep2Operation(this, isDisposed, sweep2Lock),
"DocumentNodeStore background sweep2 thread " + threadNamePostfix);
backgroundSweep2Thread.setDaemon(true);
backgroundSweep2Thread.start();
}
}
persistentCache = builder.getPersistentCache();
if (!readOnlyMode && persistentCache != null) {
DynamicBroadcastConfig broadcastConfig = new DocumentBroadcastConfig(this);
persistentCache.setBroadcastConfig(broadcastConfig);
}
journalCache = builder.getJournalCache();
this.mbean = createMBean(builder);
LOG.info("ChangeSetBuilder enabled and size set to maxItems: {}, maxDepth: {}", changeSetMaxItems, changeSetMaxDepth);
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}, updateLimit: {} ({})",
clusterId, updateLimit,
getClusterNodeInfoDisplayString());
if (!builder.isBundlingDisabled()) {
bundlingConfigHandler.initialize(this, executor);
}
}