in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java [697:797]
private void recover() throws IllegalStateException, IOException {
this.indexLock.writeLock().lock();
try {
long start = System.currentTimeMillis();
boolean requiresJournalReplay = recoverProducerAudit();
requiresJournalReplay |= recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
if (recoveryPosition != null) {
int redoCounter = 0;
int dataFileRotationTracker = recoveryPosition.getDataFileId();
LOG.info("Recovering from the journal @" + recoveryPosition);
while (recoveryPosition != null) {
try {
JournalCommand<?> message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
process(message, recoveryPosition, lastIndoubtPosition);
redoCounter++;
} catch (IOException failedRecovery) {
if (isIgnoreMissingJournalfiles()) {
LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
// track this dud location
journal.corruptRecoveryLocation(recoveryPosition);
} else {
throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
}
}
recoveryPosition = journal.getNextLocation(recoveryPosition);
// hold on to the minimum number of open files during recovery
if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
dataFileRotationTracker = recoveryPosition.getDataFileId();
journal.cleanup();
}
if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
}
}
if (LOG.isInfoEnabled()) {
long end = System.currentTimeMillis();
LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
}
// We may have to undo some index updates.
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
recoverIndex(tx);
}
});
// rollback any recovered inflight local transactions, and discard any inflight XA transactions.
Set<TransactionId> toRollback = new HashSet<>();
Set<TransactionId> toDiscard = new HashSet<>();
synchronized (inflightTransactions) {
for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
TransactionId id = it.next();
if (id.isLocalTransaction()) {
toRollback.add(id);
} else {
toDiscard.add(id);
}
}
for (TransactionId tx: toRollback) {
if (LOG.isDebugEnabled()) {
LOG.debug("rolling back recovered indoubt local transaction " + tx);
}
store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
}
for (TransactionId tx: toDiscard) {
if (LOG.isDebugEnabled()) {
LOG.debug("discarding recovered in-flight XA transaction " + tx);
}
inflightTransactions.remove(tx);
}
}
synchronized (preparedTransactions) {
Set<TransactionId> txIds = new LinkedHashSet<TransactionId>(preparedTransactions.keySet());
for (TransactionId txId : txIds) {
switch (purgeRecoveredXATransactionStrategy){
case NEVER:
LOG.warn("Recovered prepared XA TX: [{}]", txId);
break;
case COMMIT:
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId);
break;
case ROLLBACK:
store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId);
break;
}
}
}
} finally {
this.indexLock.writeLock().unlock();
}
}