protected void recoverIndex()

in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java [860:1034]


    protected void recoverIndex(Transaction tx) throws IOException {
        long start = System.currentTimeMillis();
        // It is possible index updates got applied before the journal updates..
        // in that case we need to removed references to messages that are not in the journal
        final Location lastAppendLocation = journal.getLastAppendLocation();
        long undoCounter=0;

        // Go through all the destinations to see if they have messages past the lastAppendLocation
        for (String key : storedDestinations.keySet()) {
            StoredDestination sd = storedDestinations.get(key);

            final ArrayList<Long> matches = new ArrayList<>();
            // Find all the Locations that are >= than the last Append Location.
            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
                @Override
                protected void matched(Location key, Long value) {
                    matches.add(value);
                }
            });

            for (Long sequenceId : matches) {
                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                if (keys != null) {
                    sd.locationIndex.remove(tx, keys.location);
                    sd.messageIdIndex.remove(tx, keys.messageId);
                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
                    undoCounter++;
                    decrementAndSubSizeToStoreStat(tx, key, sd, keys.location.getSize());
                    // TODO: do we need to modify the ack positions for the pub sub case?
                }
            }
        }

        if (undoCounter > 0) {
            // The rolledback operations are basically in flight journal writes.  To avoid getting
            // these the end user should do sync writes to the journal.
            if (LOG.isInfoEnabled()) {
                long end = System.currentTimeMillis();
                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
            }
        }

        undoCounter = 0;
        start = System.currentTimeMillis();

        // Lets be extra paranoid here and verify that all the datafiles being referenced
        // by the indexes still exists.

        final SequenceSet ss = new SequenceSet();
        for (StoredDestination sd : storedDestinations.values()) {
            // Use a visitor to cut down the number of pages that we load
            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                int last=-1;

                @Override
                public boolean isInterestedInKeysBetween(Location first, Location second) {
                    if( first==null ) {
                        return !ss.contains(0, second.getDataFileId());
                    } else if( second==null ) {
                        return true;
                    } else {
                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
                    }
                }

                @Override
                public void visit(List<Location> keys, List<Long> values) {
                    for (Location l : keys) {
                        int fileId = l.getDataFileId();
                        if( last != fileId ) {
                            ss.add(fileId);
                            last = fileId;
                        }
                    }
                }

            });
        }
        HashSet<Integer> missingJournalFiles = new HashSet<>();
        while (!ss.isEmpty()) {
            missingJournalFiles.add((int) ss.removeFirst());
        }

        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
            missingJournalFiles.add(entry.getKey());
            for (Integer i : entry.getValue()) {
                missingJournalFiles.add(i);
            }
        }

        missingJournalFiles.removeAll(journal.getFileMap().keySet());

        if (!missingJournalFiles.isEmpty()) {
            LOG.warn("Some journal files are missing: " + missingJournalFiles);
        }

        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
        for (Integer missing : missingJournalFiles) {
            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
        }

        if (checkForCorruptJournalFiles) {
            Collection<DataFile> dataFiles = journal.getFileMap().values();
            for (DataFile dataFile : dataFiles) {
                int id = dataFile.getDataFileId();
                // eof to next file id
                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
                Sequence seq = dataFile.getCorruptedBlocks().getHead();
                while (seq != null) {
                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
                        new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
                    missingPredicates.add(visitor);
                    knownCorruption.add(visitor);
                    seq = seq.getNext();
                }
            }
        }

        if (!missingPredicates.isEmpty()) {
            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
                final StoredDestination sd = sdEntry.getValue();
                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
                    @Override
                    protected void matched(Location key, Long value) {
                        matches.put(value, key);
                    }
                });

                // If some message references are affected by the missing data files...
                if (!matches.isEmpty()) {

                    // We either 'gracefully' recover dropping the missing messages or
                    // we error out.
                    if( ignoreMissingJournalfiles ) {
                        // Update the index to remove the references to the missing data
                        for (Long sequenceId : matches.keySet()) {
                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                            sd.locationIndex.remove(tx, keys.location);
                            sd.messageIdIndex.remove(tx, keys.messageId);
                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
                            undoCounter++;
                            decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize());
                            // TODO: do we need to modify the ack positions for the pub sub case?
                        }
                    } else {
                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
                    }
                }
            }
        }

        if (!ignoreMissingJournalfiles) {
            if (!knownCorruption.isEmpty()) {
                LOG.error("Detected corrupt journal files. " + knownCorruption);
                throw new IOException("Detected corrupt journal files. " + knownCorruption);
            }

            if (!missingJournalFiles.isEmpty()) {
                LOG.error("Detected missing journal files. " + missingJournalFiles);
                throw new IOException("Detected missing journal files. " + missingJournalFiles);
            }
        }

        if (undoCounter > 0) {
            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
            // should do sync writes to the journal.
            if (LOG.isInfoEnabled()) {
                long end = System.currentTimeMillis();
                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
            }
        }
    }