in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java [149:278]
public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator)
throws IOException {
long writeCacheMaxSize = getLongVariableOrDefault(conf, WRITE_CACHE_MAX_SIZE_MB,
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB,
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false);
this.allocator = allocator;
this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
log.info("Started Db Ledger Storage");
log.info(" - Number of directories: {}", numberOfDirs);
log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
if (readCacheMaxSize + writeCacheMaxSize > PlatformDependent.estimateMaxDirectMemory()) {
throw new IOException("Read and write cache sizes exceed the configured max direct memory size");
}
if (ledgerDirsManager.getAllLedgerDirs().size() != indexDirsManager.getAllLedgerDirs().size()) {
throw new IOException("ledger and index dirs size not matched");
}
long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);
ledgerStorageList = Lists.newArrayList();
for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
// Create a ledger dirs manager for the single directory
File[] lDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
lDirs[0] = ledgerDir.getParentFile();
LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(),
NullStatsLogger.INSTANCE);
// Create a index dirs manager for the single directory
File[] iDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
iDirs[0] = indexDir.getParentFile();
LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(),
NullStatsLogger.INSTANCE);
EntryLogger entrylogger;
if (directIOEntryLogger) {
long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
int readBufferSize = MB * (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerWrite"));
entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerFlush"));
int numReadThreads = conf.getNumReadWorkerThreads();
if (numReadThreads == 0) {
numReadThreads = conf.getServerNumIOThreads();
}
entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog),
new NativeIOImpl(),
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
conf.getEntryLogSizeLimit(),
conf.getNettyMaxFrameSizeBytes() - 500,
perDirectoryTotalWriteBufferSize,
perDirectoryTotalReadBufferSize,
readBufferSize,
numReadThreads,
maxFdCacheTimeSeconds,
slog, statsLogger);
} else {
entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
}
ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
idm, entrylogger,
statsLogger, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
}
}
// parent DbLedgerStorage stats (not per directory)
readaheadBatchSizeGauge = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return readAheadCacheBatchSize;
}
@Override
public Integer getSample() {
return readAheadCacheBatchSize;
}
};
statsLogger.registerGauge(MAX_READAHEAD_BATCH_SIZE, readaheadBatchSizeGauge);
writeCacheSizeGauge = new Gauge<Long>() {
@Override
public Long getDefaultValue() {
return perDirectoryWriteCacheSize;
}
@Override
public Long getSample() {
return perDirectoryWriteCacheSize;
}
};
statsLogger.registerGauge(MAX_WRITE_CACHE_SIZE, writeCacheSizeGauge);
}