in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java [860:1034]
protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates..
// in that case we need to removed references to messages that are not in the journal
final Location lastAppendLocation = journal.getLastAppendLocation();
long undoCounter=0;
// Go through all the destinations to see if they have messages past the lastAppendLocation
for (String key : storedDestinations.keySet()) {
StoredDestination sd = storedDestinations.get(key);
final ArrayList<Long> matches = new ArrayList<>();
// Find all the Locations that are >= than the last Append Location.
sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
@Override
protected void matched(Location key, Long value) {
matches.add(value);
}
});
for (Long sequenceId : matches) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(keys.messageId);
undoCounter++;
decrementAndSubSizeToStoreStat(tx, key, sd, keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
}
}
if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes. To avoid getting
// these the end user should do sync writes to the journal.
if (LOG.isInfoEnabled()) {
long end = System.currentTimeMillis();
LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
}
undoCounter = 0;
start = System.currentTimeMillis();
// Lets be extra paranoid here and verify that all the datafiles being referenced
// by the indexes still exists.
final SequenceSet ss = new SequenceSet();
for (StoredDestination sd : storedDestinations.values()) {
// Use a visitor to cut down the number of pages that we load
sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
@Override
public boolean isInterestedInKeysBetween(Location first, Location second) {
if( first==null ) {
return !ss.contains(0, second.getDataFileId());
} else if( second==null ) {
return true;
} else {
return !ss.contains(first.getDataFileId(), second.getDataFileId());
}
}
@Override
public void visit(List<Location> keys, List<Long> values) {
for (Location l : keys) {
int fileId = l.getDataFileId();
if( last != fileId ) {
ss.add(fileId);
last = fileId;
}
}
}
});
}
HashSet<Integer> missingJournalFiles = new HashSet<>();
while (!ss.isEmpty()) {
missingJournalFiles.add((int) ss.removeFirst());
}
for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
missingJournalFiles.add(entry.getKey());
for (Integer i : entry.getValue()) {
missingJournalFiles.add(i);
}
}
missingJournalFiles.removeAll(journal.getFileMap().keySet());
if (!missingJournalFiles.isEmpty()) {
LOG.warn("Some journal files are missing: " + missingJournalFiles);
}
ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
for (Integer missing : missingJournalFiles) {
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
}
if (checkForCorruptJournalFiles) {
Collection<DataFile> dataFiles = journal.getFileMap().values();
for (DataFile dataFile : dataFiles) {
int id = dataFile.getDataFileId();
// eof to next file id
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
Sequence seq = dataFile.getCorruptedBlocks().getHead();
while (seq != null) {
BTreeVisitor.BetweenVisitor<Location, Long> visitor =
new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
missingPredicates.add(visitor);
knownCorruption.add(visitor);
seq = seq.getNext();
}
}
}
if (!missingPredicates.isEmpty()) {
for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
final StoredDestination sd = sdEntry.getValue();
final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
@Override
protected void matched(Location key, Long value) {
matches.put(value, key);
}
});
// If some message references are affected by the missing data files...
if (!matches.isEmpty()) {
// We either 'gracefully' recover dropping the missing messages or
// we error out.
if( ignoreMissingJournalfiles ) {
// Update the index to remove the references to the missing data
for (Long sequenceId : matches.keySet()) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
undoCounter++;
decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
}
}
}
}
if (!ignoreMissingJournalfiles) {
if (!knownCorruption.isEmpty()) {
LOG.error("Detected corrupt journal files. " + knownCorruption);
throw new IOException("Detected corrupt journal files. " + knownCorruption);
}
if (!missingJournalFiles.isEmpty()) {
LOG.error("Detected missing journal files. " + missingJournalFiles);
throw new IOException("Detected missing journal files. " + missingJournalFiles);
}
}
if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
if (LOG.isInfoEnabled()) {
long end = System.currentTimeMillis();
LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
}
}