public synchronized void start()

in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java [251:361]


    public synchronized void start() throws IOException {
        if (started) {
            return;
        }

        long start = System.currentTimeMillis();
        accessorPool = new DataFileAccessorPool(this);
        started = true;

        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);

        File[] files = directory.listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String n) {
                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
            }
        });

        if (files != null) {
            for (File file : files) {
                try {
                    String n = file.getName();
                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
                    int num = Integer.parseInt(numStr);
                    DataFile dataFile = new DataFile(file, num);
                    fileMap.put(dataFile.getDataFileId(), dataFile);
                    totalLength.addAndGet(dataFile.getLength());
                } catch (NumberFormatException e) {
                    // Ignore file that do not match the pattern.
                }
            }

            // Sort the list so that we can link the DataFiles together in the
            // right order.
            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
            Collections.sort(l);
            for (DataFile df : l) {
                if (df.getLength() == 0) {
                    // possibly the result of a previous failed write
                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
                    continue;
                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
                    continue;
                }
                dataFiles.addLast(df);
                fileByFileMap.put(df.getFile(), df);

                if( isCheckForCorruptionOnStartup() ) {
                    lastAppendLocation.set(recoveryCheck(df));
                }
            }
        }

        if (preallocationScope != PreallocationScope.NONE) {
            switch (preallocationStrategy) {
                case SPARSE_FILE:
                    break;
                case OS_KERNEL_COPY: {
                    osKernelCopyTemplateFile = createJournalTemplateFile();
                }
                break;
                case CHUNKED_ZEROS: {
                    preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
                }
                break;
                case ZEROS: {
                    preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
                }
                break;
            }
        }
        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread schedulerThread = new Thread(r);
                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
                schedulerThread.setDaemon(true);
                return schedulerThread;
            }
        });

        // init current write file
        if (dataFiles.isEmpty()) {
            nextDataFileId = 1;
            rotateWriteFile();
        } else {
            currentDataFile.set(dataFiles.getTail());
            nextDataFileId = currentDataFile.get().dataFileId + 1;
        }

        if( lastAppendLocation.get()==null ) {
            DataFile df = dataFiles.getTail();
            lastAppendLocation.set(recoveryCheck(df));
        }

        // ensure we don't report unused space of last journal file in size metric
        int lastFileLength = dataFiles.getTail().getLength();
        if (totalLength.get() > lastFileLength && lastAppendLocation.get().getOffset() > 0) {
            totalLength.addAndGet(lastAppendLocation.get().getOffset() - lastFileLength);
        }

        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                cleanup();
            }
        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);

        long end = System.currentTimeMillis();
        LOG.trace("Startup took: "+(end-start)+" ms");
    }