private SegmentNodeStore register()

in oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java [186:502]


    private SegmentNodeStore register() throws IOException {
        if (cfg.getBlobStore() == null && (cfg.hasCustomBlobStore() || cfg.isSecondarySegmentStore())) {
            cfg.getLogger().info("BlobStore enabled. SegmentNodeStore will be initialized once the blob store becomes available");
            return null;
        }

        if (cfg.getSegmentNodeStorePersistence() == null && cfg.hasCustomSegmentStore()) {
            cfg.getLogger().info("customSegmentStore enabled. SegmentNodeStore will be initialized once the custom segment store becomes available");
            return null;
        }

        // Listen for GCMonitor services
        GCMonitor gcMonitor = GCMonitor.EMPTY;

        if (cfg.isPrimarySegmentStore()) {
            GCMonitorTracker tracker = new GCMonitorTracker();
            tracker.start(cfg.getWhiteboard());
            registerCloseable(tracker);
            gcMonitor = tracker;
        }

        // Create the gc options
        if (cfg.getRetainedGenerations() != cfg.getDefaultRetainedGenerations()) {
            cfg.getLogger().warn(
                "The number of retained generations defaults to {} and can't be " +
                    "changed. This configuration option is considered deprecated " +
                    "and will be removed in the future.",
                RETAINED_GENERATIONS_DEFAULT
            );
        }
        SegmentGCOptions gcOptions = new SegmentGCOptions(cfg.getPauseCompaction(), cfg.getRetryCount(), cfg.getForceCompactionTimeout())
            .setGcSizeDeltaEstimation(cfg.getSizeDeltaEstimation())
            .setMemoryThreshold(cfg.getMemoryThreshold())
            .setEstimationDisabled(cfg.getDisableEstimation())
            .setGCLogInterval(cfg.getGCProcessLog());
        if (cfg.isStandbyInstance()) {
            gcOptions.setRetainedGenerations(1);
        }

        // Build the FileStore
        FileStoreBuilder builder = fileStoreBuilder(cfg.getSegmentDirectory())
            .withSegmentCacheSize(cfg.getSegmentCacheSize())
            .withStringCacheSize(cfg.getStringCacheSize())
            .withTemplateCacheSize(cfg.getTemplateCacheSize())
            .withStringDeduplicationCacheSize(cfg.getStringDeduplicationCacheSize())
            .withTemplateDeduplicationCacheSize(cfg.getTemplateDeduplicationCacheSize())
            .withNodeDeduplicationCacheSize(cfg.getNodeDeduplicationCacheSize())
            .withMaxFileSize(cfg.getMaxFileSize())
            .withMemoryMapping(cfg.getMemoryMapping())
            .withGCMonitor(gcMonitor)
            .withIOMonitor(new MetricsIOMonitor(cfg.getStatisticsProvider()))
            .withRemoteStoreMonitor(new MetricsRemoteStoreMonitor(cfg.getStatisticsProvider()))
            .withStatisticsProvider(cfg.getStatisticsProvider())
            .withGCOptions(gcOptions);

        if (cfg.hasCustomBlobStore() && cfg.getBlobStore() != null) {
            cfg.getLogger().info("Initializing SegmentNodeStore with BlobStore [{}]", cfg.getBlobStore());
            builder.withBlobStore(cfg.getBlobStore());
        }

        if (cfg.hasCustomSegmentStore() && cfg.getSegmentNodeStorePersistence() != null) {
            SegmentNodeStorePersistence customPersistence = cfg.getSegmentNodeStorePersistence();

            if (cfg.hasCachePersistence()) {
                cfg.getLogger().info("Using persistent cache for the custom persistence [{}]", customPersistence);
                customPersistence = new CachingPersistence(cfg.getPersistentCache(), customPersistence);
            }

            if (cfg.hasSplitPersistence()) {
                cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", customPersistence);
                cfg.getSplitPersistenceDirectory().mkdirs();
                SegmentNodeStorePersistence rwPersistence = new TarPersistence(cfg.getSplitPersistenceDirectory());
                SegmentNodeStorePersistence persistence = new SplitPersistence(customPersistence, rwPersistence);
                builder.withCustomPersistence(persistence);
            } else {
                cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", customPersistence);
                builder.withCustomPersistence(customPersistence);
            }
        }

        if (cfg.isStandbyInstance()) {
            builder.withSnfeListener(IGNORE_SNFE);
            builder.withEagerSegmentCaching(true);
        }

        FileStore store;
        try {
            store = builder.build();
        } catch (InvalidFileStoreVersionException e) {
            cfg.getLogger().error("The storage format is not compatible with this version of Oak Segment Tar", e);
            return null;
        }
        registerCloseable(store);

        // Listen for Executor services on the whiteboard

        WhiteboardExecutor executor = new WhiteboardExecutor();
        executor.start(cfg.getWhiteboard());
        registerCloseable(executor);

        // Expose stats about the segment cache

        CacheStatsMBean segmentCacheStats = store.getSegmentCacheStats();
        registerCloseable(registerMBean(
            CacheStatsMBean.class,
            segmentCacheStats,
            CacheStats.TYPE,
            segmentCacheStats.getName()
        ));

        // Expose stats about the string and template caches

        CacheStatsMBean stringCacheStats = store.getStringCacheStats();
        registerCloseable(registerMBean(
            CacheStatsMBean.class,
            stringCacheStats,
            CacheStats.TYPE,
            stringCacheStats.getName()
        ));

        CacheStatsMBean templateCacheStats = store.getTemplateCacheStats();
        registerCloseable(registerMBean(
            CacheStatsMBean.class,
            templateCacheStats,
            CacheStats.TYPE,
            templateCacheStats.getName()
        ));

        WriterCacheManager cacheManager = builder.getCacheManager();
        CacheStatsMBean stringDeduplicationCacheStats = cacheManager.getStringCacheStats();
        if (stringDeduplicationCacheStats != null) {
            registerCloseable(registerMBean(
                CacheStatsMBean.class,
                stringDeduplicationCacheStats,
                CacheStats.TYPE,
                stringDeduplicationCacheStats.getName()
            ));
        }

        CacheStatsMBean templateDeduplicationCacheStats = cacheManager.getTemplateCacheStats();
        if (templateDeduplicationCacheStats != null) {
            registerCloseable(registerMBean(
                CacheStatsMBean.class,
                templateDeduplicationCacheStats,
                CacheStats.TYPE,
                templateDeduplicationCacheStats.getName()
            ));
        }

        CacheStatsMBean nodeDeduplicationCacheStats = cacheManager.getNodeCacheStats();
        if (nodeDeduplicationCacheStats != null) {
            registerCloseable(registerMBean(
                CacheStatsMBean.class,
                nodeDeduplicationCacheStats,
                CacheStats.TYPE,
                nodeDeduplicationCacheStats.getName()
            ));
        }

        // Expose an MBean to managing and monitoring garbage collection

        FileStoreGCMonitor monitor = new FileStoreGCMonitor(Clock.SIMPLE);
        registerCloseable(register(
            GCMonitor.class,
            monitor
        ));
        if (!cfg.isStandbyInstance()) {
            registerCloseable(registerMBean(
                SegmentRevisionGC.class,
                new SegmentRevisionGCMBean(store, gcOptions, monitor),
                SegmentRevisionGC.TYPE,
                "Segment node store revision garbage collection"
            ));
        }

        registerCloseable(registerMBean(
            RevisionGCMBean.class,
            new RevisionGC(store.getGCRunner(), store::cancelGC, monitor::getStatus, executor),
            RevisionGCMBean.TYPE,
            "Revision garbage collection"
        ));

        // Expose statistics about the FileStore

        registerCloseable(registerMBean(
            FileStoreStatsMBean.class,
            store.getStats(),
            FileStoreStatsMBean.TYPE,
            "FileStore statistics"
        ));

        // register segment node store

        SegmentNodeStore.SegmentNodeStoreBuilder segmentNodeStoreBuilder = SegmentNodeStoreBuilders.builder(store).withStatisticsProvider(cfg.getStatisticsProvider());
        segmentNodeStoreBuilder.dispatchChanges(cfg.dispatchChanges());

        Logger log = LoggerFactory.getLogger(LoggingHook.class.getName() + ".writer");
        if (log.isTraceEnabled()) {
            segmentNodeStoreBuilder.withLoggingHook(log::trace);
        }

        SegmentNodeStore segmentNodeStore = segmentNodeStoreBuilder.build();

        if (cfg.isPrimarySegmentStore()) {
            ObserverTracker observerTracker = new ObserverTracker(segmentNodeStore);
            observerTracker.start(cfg.getBundleContext());
            registerCloseable(observerTracker);
        }

        if (cfg.isPrimarySegmentStore()) {
            registerCloseable(registerMBean(
                CheckpointMBean.class,
                new SegmentCheckpointMBean(segmentNodeStore),
                CheckpointMBean.TYPE,
                "Segment node store checkpoint management"
            ));
        }

        if (cfg.registerDescriptors()) {
            // ensure a clusterId is initialized
            // and expose it as 'oak.clusterid' repository descriptor
            GenericDescriptors clusterIdDesc = new GenericDescriptors();
            clusterIdDesc.put(
                ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY,
                new SimpleValueFactory().createValue(getOrCreateId(segmentNodeStore)),
                true,
                false
            );
            registerCloseable(register(Descriptors.class, clusterIdDesc));
            // Register "discovery lite" descriptors
            registerCloseable(register(Descriptors.class, new SegmentDiscoveryLiteDescriptors(segmentNodeStore)));
        }

        // If a shared data store register the repo id in the data store
        if (!cfg.isSecondarySegmentStore() && cfg.hasCustomBlobStore() && isShared(cfg.getBlobStore())) {
            SharedDataStore sharedDataStore = (SharedDataStore) cfg.getBlobStore();
            try {
                sharedDataStore.setRepositoryId(getOrCreateId(segmentNodeStore));
            } catch (Exception e) {
                throw new IOException("Could not register a unique repositoryId", e);
            }
            if (cfg.getBlobStore() instanceof BlobTrackingStore) {
                BlobTrackingStore trackingStore = (BlobTrackingStore) cfg.getBlobStore();
                if (trackingStore.getTracker() != null) {
                    trackingStore.getTracker().close();
                }
                trackingStore.addTracker(BlobIdTracker.build(cfg.getRepositoryHome(), getOrCreateId(segmentNodeStore), cfg.getBlobSnapshotInterval(), sharedDataStore));
            }
        }

        if (!cfg.isSecondarySegmentStore() && cfg.hasCustomBlobStore() && (cfg.getBlobStore() instanceof GarbageCollectableBlobStore)) {
            BlobGarbageCollector gc = new MarkSweepGarbageCollector(
                new SegmentBlobReferenceRetriever(store),
                (GarbageCollectableBlobStore) cfg.getBlobStore(),
                executor,
                TimeUnit.SECONDS.toMillis(cfg.getBlobGcMaxAge()),
                getOrCreateId(segmentNodeStore),
                cfg.getWhiteboard(),
                cfg.getStatisticsProvider()
            );
            registerCloseable(registerMBean(
                BlobGCMBean.class,
                new BlobGC(gc, executor),
                BlobGCMBean.TYPE,
                "Segment node store blob garbage collection"
            ));
        }

        // Expose an MBean for backup/restore operations

        registerCloseable(registerMBean(
            FileStoreBackupRestoreMBean.class,
            new FileStoreBackupRestoreImpl(
                segmentNodeStore,
                store.getRevisions(),
                store.getReader(),
                cfg.getBackupDirectory(),
                executor
            ),
            FileStoreBackupRestoreMBean.TYPE,
            "Segment node store backup/restore"
        ));

        // Expose statistics about the SegmentNodeStore

        registerCloseable(registerMBean(
            SegmentNodeStoreStatsMBean.class,
            segmentNodeStore.getStats(),
            SegmentNodeStoreStatsMBean.TYPE,
            "SegmentNodeStore statistics"
        ));

        if (cfg.isPrimarySegmentStore()) {
            cfg.getLogger().info("Primary SegmentNodeStore initialized");
        } else {
            cfg.getLogger().info("Secondary SegmentNodeStore initialized, role={}", cfg.getRole());
        }

        // Register a factory service to expose the FileStore
        registerCloseable(register(
            SegmentStoreProvider.class,
            new DefaultSegmentStoreProvider(store)
        ));

        if (cfg.isStandbyInstance()) {
            return segmentNodeStore;
        }

        if (cfg.isPrimarySegmentStore()) {
            Map<String, Object> props = new HashMap<>();
            props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
            props.put("oak.nodestore.description", new String[] {"nodeStoreType=segment"});
            registerCloseable(register(NodeStore.class, segmentNodeStore, props));
        }

        return segmentNodeStore;
    }