public void initialize()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java [149:278]


    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
                           LedgerDirsManager indexDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator)
            throws IOException {
        long writeCacheMaxSize = getLongVariableOrDefault(conf, WRITE_CACHE_MAX_SIZE_MB,
                DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
        long readCacheMaxSize = getLongVariableOrDefault(conf, READ_AHEAD_CACHE_MAX_SIZE_MB,
                DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
        boolean directIOEntryLogger = getBooleanVariableOrDefault(conf, DIRECT_IO_ENTRYLOGGER, false);

        this.allocator = allocator;
        this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();

        log.info("Started Db Ledger Storage");
        log.info(" - Number of directories: {}", numberOfDirs);
        log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
        log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);

        if (readCacheMaxSize + writeCacheMaxSize > PlatformDependent.estimateMaxDirectMemory()) {
            throw new IOException("Read and write cache sizes exceed the configured max direct memory size");
        }

        if (ledgerDirsManager.getAllLedgerDirs().size() != indexDirsManager.getAllLedgerDirs().size()) {
            throw new IOException("ledger and index dirs size not matched");
        }

        long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
        long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
        int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
        long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
                DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);

        ledgerStorageList = Lists.newArrayList();
        for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
            File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
            File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
            // Create a ledger dirs manager for the single directory
            File[] lDirs = new File[1];
            // Remove the `/current` suffix which will be appended again by LedgersDirManager
            lDirs[0] = ledgerDir.getParentFile();
            LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(),
                    NullStatsLogger.INSTANCE);

            // Create a index dirs manager for the single directory
            File[] iDirs = new File[1];
            // Remove the `/current` suffix which will be appended again by LedgersDirManager
            iDirs[0] = indexDir.getParentFile();
            LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(),
                    NullStatsLogger.INSTANCE);

            EntryLogger entrylogger;
            if (directIOEntryLogger) {
                long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
                    conf,
                    DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
                    DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
                long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
                    conf,
                    DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
                    DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
                int readBufferSize = MB * (int) getLongVariableOrDefault(
                    conf,
                    DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
                    DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
                int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
                    conf,
                    DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
                    DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
                Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
                entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
                    new DefaultThreadFactory("EntryLoggerWrite"));
                entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
                    new DefaultThreadFactory("EntryLoggerFlush"));

                int numReadThreads = conf.getNumReadWorkerThreads();
                if (numReadThreads == 0) {
                    numReadThreads = conf.getServerNumIOThreads();
                }

                entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog),
                    new NativeIOImpl(),
                    allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
                    conf.getEntryLogSizeLimit(),
                    conf.getNettyMaxFrameSizeBytes() - 500,
                    perDirectoryTotalWriteBufferSize,
                    perDirectoryTotalReadBufferSize,
                    readBufferSize,
                    numReadThreads,
                    maxFdCacheTimeSeconds,
                    slog, statsLogger);
            } else {
                entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
            }
            ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
                idm, entrylogger,
                statsLogger, perDirectoryWriteCacheSize,
                perDirectoryReadCacheSize,
                readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
            ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
            if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
                idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
            }
        }

        // parent DbLedgerStorage stats (not per directory)
        readaheadBatchSizeGauge = new Gauge<Integer>() {
            @Override
            public Integer getDefaultValue() {
                return readAheadCacheBatchSize;
            }

            @Override
            public Integer getSample() {
                return readAheadCacheBatchSize;
            }
        };
        statsLogger.registerGauge(MAX_READAHEAD_BATCH_SIZE, readaheadBatchSizeGauge);

        writeCacheSizeGauge = new Gauge<Long>() {
            @Override
            public Long getDefaultValue() {
                return perDirectoryWriteCacheSize;
            }

            @Override
            public Long getSample() {
                return perDirectoryWriteCacheSize;
            }
        };
        statsLogger.registerGauge(MAX_WRITE_CACHE_SIZE, writeCacheSizeGauge);
    }