in oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java [186:502]
private SegmentNodeStore register() throws IOException {
if (cfg.getBlobStore() == null && (cfg.hasCustomBlobStore() || cfg.isSecondarySegmentStore())) {
cfg.getLogger().info("BlobStore enabled. SegmentNodeStore will be initialized once the blob store becomes available");
return null;
}
if (cfg.getSegmentNodeStorePersistence() == null && cfg.hasCustomSegmentStore()) {
cfg.getLogger().info("customSegmentStore enabled. SegmentNodeStore will be initialized once the custom segment store becomes available");
return null;
}
// Listen for GCMonitor services
GCMonitor gcMonitor = GCMonitor.EMPTY;
if (cfg.isPrimarySegmentStore()) {
GCMonitorTracker tracker = new GCMonitorTracker();
tracker.start(cfg.getWhiteboard());
registerCloseable(tracker);
gcMonitor = tracker;
}
// Create the gc options
if (cfg.getRetainedGenerations() != cfg.getDefaultRetainedGenerations()) {
cfg.getLogger().warn(
"The number of retained generations defaults to {} and can't be " +
"changed. This configuration option is considered deprecated " +
"and will be removed in the future.",
RETAINED_GENERATIONS_DEFAULT
);
}
SegmentGCOptions gcOptions = new SegmentGCOptions(cfg.getPauseCompaction(), cfg.getRetryCount(), cfg.getForceCompactionTimeout())
.setGcSizeDeltaEstimation(cfg.getSizeDeltaEstimation())
.setMemoryThreshold(cfg.getMemoryThreshold())
.setEstimationDisabled(cfg.getDisableEstimation())
.setGCLogInterval(cfg.getGCProcessLog());
if (cfg.isStandbyInstance()) {
gcOptions.setRetainedGenerations(1);
}
// Build the FileStore
FileStoreBuilder builder = fileStoreBuilder(cfg.getSegmentDirectory())
.withSegmentCacheSize(cfg.getSegmentCacheSize())
.withStringCacheSize(cfg.getStringCacheSize())
.withTemplateCacheSize(cfg.getTemplateCacheSize())
.withStringDeduplicationCacheSize(cfg.getStringDeduplicationCacheSize())
.withTemplateDeduplicationCacheSize(cfg.getTemplateDeduplicationCacheSize())
.withNodeDeduplicationCacheSize(cfg.getNodeDeduplicationCacheSize())
.withMaxFileSize(cfg.getMaxFileSize())
.withMemoryMapping(cfg.getMemoryMapping())
.withGCMonitor(gcMonitor)
.withIOMonitor(new MetricsIOMonitor(cfg.getStatisticsProvider()))
.withRemoteStoreMonitor(new MetricsRemoteStoreMonitor(cfg.getStatisticsProvider()))
.withStatisticsProvider(cfg.getStatisticsProvider())
.withGCOptions(gcOptions);
if (cfg.hasCustomBlobStore() && cfg.getBlobStore() != null) {
cfg.getLogger().info("Initializing SegmentNodeStore with BlobStore [{}]", cfg.getBlobStore());
builder.withBlobStore(cfg.getBlobStore());
}
if (cfg.hasCustomSegmentStore() && cfg.getSegmentNodeStorePersistence() != null) {
SegmentNodeStorePersistence customPersistence = cfg.getSegmentNodeStorePersistence();
if (cfg.hasCachePersistence()) {
cfg.getLogger().info("Using persistent cache for the custom persistence [{}]", customPersistence);
customPersistence = new CachingPersistence(cfg.getPersistentCache(), customPersistence);
}
if (cfg.hasSplitPersistence()) {
cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", customPersistence);
cfg.getSplitPersistenceDirectory().mkdirs();
SegmentNodeStorePersistence rwPersistence = new TarPersistence(cfg.getSplitPersistenceDirectory());
SegmentNodeStorePersistence persistence = new SplitPersistence(customPersistence, rwPersistence);
builder.withCustomPersistence(persistence);
} else {
cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", customPersistence);
builder.withCustomPersistence(customPersistence);
}
}
if (cfg.isStandbyInstance()) {
builder.withSnfeListener(IGNORE_SNFE);
builder.withEagerSegmentCaching(true);
}
FileStore store;
try {
store = builder.build();
} catch (InvalidFileStoreVersionException e) {
cfg.getLogger().error("The storage format is not compatible with this version of Oak Segment Tar", e);
return null;
}
registerCloseable(store);
// Listen for Executor services on the whiteboard
WhiteboardExecutor executor = new WhiteboardExecutor();
executor.start(cfg.getWhiteboard());
registerCloseable(executor);
// Expose stats about the segment cache
CacheStatsMBean segmentCacheStats = store.getSegmentCacheStats();
registerCloseable(registerMBean(
CacheStatsMBean.class,
segmentCacheStats,
CacheStats.TYPE,
segmentCacheStats.getName()
));
// Expose stats about the string and template caches
CacheStatsMBean stringCacheStats = store.getStringCacheStats();
registerCloseable(registerMBean(
CacheStatsMBean.class,
stringCacheStats,
CacheStats.TYPE,
stringCacheStats.getName()
));
CacheStatsMBean templateCacheStats = store.getTemplateCacheStats();
registerCloseable(registerMBean(
CacheStatsMBean.class,
templateCacheStats,
CacheStats.TYPE,
templateCacheStats.getName()
));
WriterCacheManager cacheManager = builder.getCacheManager();
CacheStatsMBean stringDeduplicationCacheStats = cacheManager.getStringCacheStats();
if (stringDeduplicationCacheStats != null) {
registerCloseable(registerMBean(
CacheStatsMBean.class,
stringDeduplicationCacheStats,
CacheStats.TYPE,
stringDeduplicationCacheStats.getName()
));
}
CacheStatsMBean templateDeduplicationCacheStats = cacheManager.getTemplateCacheStats();
if (templateDeduplicationCacheStats != null) {
registerCloseable(registerMBean(
CacheStatsMBean.class,
templateDeduplicationCacheStats,
CacheStats.TYPE,
templateDeduplicationCacheStats.getName()
));
}
CacheStatsMBean nodeDeduplicationCacheStats = cacheManager.getNodeCacheStats();
if (nodeDeduplicationCacheStats != null) {
registerCloseable(registerMBean(
CacheStatsMBean.class,
nodeDeduplicationCacheStats,
CacheStats.TYPE,
nodeDeduplicationCacheStats.getName()
));
}
// Expose an MBean to managing and monitoring garbage collection
FileStoreGCMonitor monitor = new FileStoreGCMonitor(Clock.SIMPLE);
registerCloseable(register(
GCMonitor.class,
monitor
));
if (!cfg.isStandbyInstance()) {
registerCloseable(registerMBean(
SegmentRevisionGC.class,
new SegmentRevisionGCMBean(store, gcOptions, monitor),
SegmentRevisionGC.TYPE,
"Segment node store revision garbage collection"
));
}
registerCloseable(registerMBean(
RevisionGCMBean.class,
new RevisionGC(store.getGCRunner(), store::cancelGC, monitor::getStatus, executor),
RevisionGCMBean.TYPE,
"Revision garbage collection"
));
// Expose statistics about the FileStore
registerCloseable(registerMBean(
FileStoreStatsMBean.class,
store.getStats(),
FileStoreStatsMBean.TYPE,
"FileStore statistics"
));
// register segment node store
SegmentNodeStore.SegmentNodeStoreBuilder segmentNodeStoreBuilder = SegmentNodeStoreBuilders.builder(store).withStatisticsProvider(cfg.getStatisticsProvider());
segmentNodeStoreBuilder.dispatchChanges(cfg.dispatchChanges());
Logger log = LoggerFactory.getLogger(LoggingHook.class.getName() + ".writer");
if (log.isTraceEnabled()) {
segmentNodeStoreBuilder.withLoggingHook(log::trace);
}
SegmentNodeStore segmentNodeStore = segmentNodeStoreBuilder.build();
if (cfg.isPrimarySegmentStore()) {
ObserverTracker observerTracker = new ObserverTracker(segmentNodeStore);
observerTracker.start(cfg.getBundleContext());
registerCloseable(observerTracker);
}
if (cfg.isPrimarySegmentStore()) {
registerCloseable(registerMBean(
CheckpointMBean.class,
new SegmentCheckpointMBean(segmentNodeStore),
CheckpointMBean.TYPE,
"Segment node store checkpoint management"
));
}
if (cfg.registerDescriptors()) {
// ensure a clusterId is initialized
// and expose it as 'oak.clusterid' repository descriptor
GenericDescriptors clusterIdDesc = new GenericDescriptors();
clusterIdDesc.put(
ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY,
new SimpleValueFactory().createValue(getOrCreateId(segmentNodeStore)),
true,
false
);
registerCloseable(register(Descriptors.class, clusterIdDesc));
// Register "discovery lite" descriptors
registerCloseable(register(Descriptors.class, new SegmentDiscoveryLiteDescriptors(segmentNodeStore)));
}
// If a shared data store register the repo id in the data store
if (!cfg.isSecondarySegmentStore() && cfg.hasCustomBlobStore() && isShared(cfg.getBlobStore())) {
SharedDataStore sharedDataStore = (SharedDataStore) cfg.getBlobStore();
try {
sharedDataStore.setRepositoryId(getOrCreateId(segmentNodeStore));
} catch (Exception e) {
throw new IOException("Could not register a unique repositoryId", e);
}
if (cfg.getBlobStore() instanceof BlobTrackingStore) {
BlobTrackingStore trackingStore = (BlobTrackingStore) cfg.getBlobStore();
if (trackingStore.getTracker() != null) {
trackingStore.getTracker().close();
}
trackingStore.addTracker(BlobIdTracker.build(cfg.getRepositoryHome(), getOrCreateId(segmentNodeStore), cfg.getBlobSnapshotInterval(), sharedDataStore));
}
}
if (!cfg.isSecondarySegmentStore() && cfg.hasCustomBlobStore() && (cfg.getBlobStore() instanceof GarbageCollectableBlobStore)) {
BlobGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) cfg.getBlobStore(),
executor,
TimeUnit.SECONDS.toMillis(cfg.getBlobGcMaxAge()),
getOrCreateId(segmentNodeStore),
cfg.getWhiteboard(),
cfg.getStatisticsProvider()
);
registerCloseable(registerMBean(
BlobGCMBean.class,
new BlobGC(gc, executor),
BlobGCMBean.TYPE,
"Segment node store blob garbage collection"
));
}
// Expose an MBean for backup/restore operations
registerCloseable(registerMBean(
FileStoreBackupRestoreMBean.class,
new FileStoreBackupRestoreImpl(
segmentNodeStore,
store.getRevisions(),
store.getReader(),
cfg.getBackupDirectory(),
executor
),
FileStoreBackupRestoreMBean.TYPE,
"Segment node store backup/restore"
));
// Expose statistics about the SegmentNodeStore
registerCloseable(registerMBean(
SegmentNodeStoreStatsMBean.class,
segmentNodeStore.getStats(),
SegmentNodeStoreStatsMBean.TYPE,
"SegmentNodeStore statistics"
));
if (cfg.isPrimarySegmentStore()) {
cfg.getLogger().info("Primary SegmentNodeStore initialized");
} else {
cfg.getLogger().info("Secondary SegmentNodeStore initialized, role={}", cfg.getRole());
}
// Register a factory service to expose the FileStore
registerCloseable(register(
SegmentStoreProvider.class,
new DefaultSegmentStoreProvider(store)
));
if (cfg.isStandbyInstance()) {
return segmentNodeStore;
}
if (cfg.isPrimarySegmentStore()) {
Map<String, Object> props = new HashMap<>();
props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
props.put("oak.nodestore.description", new String[] {"nodeStoreType=segment"});
registerCloseable(register(NodeStore.class, segmentNodeStore, props));
}
return segmentNodeStore;
}