private void openPages()

in logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java [194:310]


    private void openPages() throws IOException {
        final int headPageNum;

        // Upgrade to serialization format V2
        QueueUpgrade.upgradeQueueDirectoryToV2(dirPath);

        Checkpoint headCheckpoint;
        try {
            headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
        } catch (NoSuchFileException e) {
            // if there is no head checkpoint, create a new headpage and checkpoint it and exit method

            logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName());

            this.ensureDiskAvailable(this.maxBytes, 0);

            this.seqNum = 0;
            headPageNum = 0;

            newCheckpointedHeadpage(headPageNum);
            this.closed.set(false);

            return;
        }

        // at this point we have a head checkpoint to figure queue recovery

        // as we load pages, compute actually disk needed substracting existing pages size to the required maxBytes
        long pqSizeBytes = 0;

        // reconstruct all tail pages state upto but excluding the head page
        for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
            final String cpFileName = checkpointIO.tailFileName(pageNum);
            if (!dirPath.resolve(cpFileName).toFile().exists()) {
                continue;
            }
            final Checkpoint cp = this.checkpointIO.read(cpFileName);

            logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp);

            PageIO pageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath);
            // important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case
            // we can purge it and we don't care about its integrity for example if it is of zero-byte file size.
            if (cp.isFullyAcked()) {
                purgeTailPage(cp, pageIO);
            } else {
                pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
                addTailPage(PageFactory.newTailPage(cp, this, pageIO));
                pqSizeBytes += pageIO.getCapacity();
            }

            // track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
            if (cp.maxSeqNum() > this.seqNum) {
                this.seqNum = cp.maxSeqNum();
            }
        }

        // delete zero byte page and recreate checkpoint if corrupted page is detected
        if ( cleanedUpFullyAckedCorruptedPage(headCheckpoint, pqSizeBytes)) { return; }

        // transform the head page into a tail page only if the headpage is non-empty
        // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages

        logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint);

        PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
        pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data

        pqSizeBytes += (long) pageIO.getHead();
        ensureDiskAvailable(this.maxBytes, pqSizeBytes);

        if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
            // the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes

            logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
            logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());

            long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
            if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
                logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
                firstUnackedSeqNum = pageIO.getMinSeqNum();
            }
            headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
        }
        this.headPage = PageFactory.newHeadPage(headCheckpoint, this, pageIO);

        if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
            // head page is empty, let's keep it as-is
            // but checkpoint it to update the firstUnackedPageNum if it changed
            this.headPage.checkpoint();
        } else {
            // head page is non-empty, transform it into a tail page
            this.headPage.behead();

            if (headCheckpoint.isFullyAcked()) {
                purgeTailPage(headCheckpoint, pageIO);
            } else {
                addTailPage(this.headPage);
            }

            // track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
            if (headCheckpoint.maxSeqNum() > this.seqNum) {
                this.seqNum = headCheckpoint.maxSeqNum();
            }

            // create a new empty head page
            headPageNum = headCheckpoint.getPageNum() + 1;
            newCheckpointedHeadpage(headPageNum);
        }

        // only activate the first tail page
        if (tailPages.size() > 0) {
            this.tailPages.get(0).getPageIO().activate();
        }

        // TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
    }