private void recover()

in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java [697:797]


    private void recover() throws IllegalStateException, IOException {
        this.indexLock.writeLock().lock();
        try {

            long start = System.currentTimeMillis();
            boolean requiresJournalReplay = recoverProducerAudit();
            requiresJournalReplay |= recoverAckMessageFileMap();
            Location lastIndoubtPosition = getRecoveryPosition();
            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
            if (recoveryPosition != null) {
                int redoCounter = 0;
                int dataFileRotationTracker = recoveryPosition.getDataFileId();
                LOG.info("Recovering from the journal @" + recoveryPosition);
                while (recoveryPosition != null) {
                    try {
                        JournalCommand<?> message = load(recoveryPosition);
                        metadata.lastUpdate = recoveryPosition;
                        process(message, recoveryPosition, lastIndoubtPosition);
                        redoCounter++;
                    } catch (IOException failedRecovery) {
                        if (isIgnoreMissingJournalfiles()) {
                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
                            // track this dud location
                            journal.corruptRecoveryLocation(recoveryPosition);
                        } else {
                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
                        }
                    }
                    recoveryPosition = journal.getNextLocation(recoveryPosition);
                    // hold on to the minimum number of open files during recovery
                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
                        dataFileRotationTracker = recoveryPosition.getDataFileId();
                        journal.cleanup();
                    }
                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
                    }
                }
                if (LOG.isInfoEnabled()) {
                    long end = System.currentTimeMillis();
                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
                }
            }

            // We may have to undo some index updates.
            pageFile.tx().execute(new Transaction.Closure<IOException>() {
                @Override
                public void execute(Transaction tx) throws IOException {
                    recoverIndex(tx);
                }
            });

            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
            Set<TransactionId> toRollback = new HashSet<>();
            Set<TransactionId> toDiscard = new HashSet<>();
            synchronized (inflightTransactions) {
                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
                    TransactionId id = it.next();
                    if (id.isLocalTransaction()) {
                        toRollback.add(id);
                    } else {
                        toDiscard.add(id);
                    }
                }
                for (TransactionId tx: toRollback) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
                    }
                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
                }
                for (TransactionId tx: toDiscard) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
                    }
                    inflightTransactions.remove(tx);
                }
            }

            synchronized (preparedTransactions) {
                Set<TransactionId> txIds = new LinkedHashSet<TransactionId>(preparedTransactions.keySet());
                for (TransactionId txId : txIds) {
                    switch (purgeRecoveredXATransactionStrategy){
                        case NEVER:
                            LOG.warn("Recovered prepared XA TX: [{}]", txId);
                            break;
                        case COMMIT:
                            store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
                            LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId);
                            break;
                        case ROLLBACK:
                            store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
                            LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId);
                            break;
                    }
                }
            }

        } finally {
            this.indexLock.writeLock().unlock();
        }
    }