void MessageStoreImpl::recover()

in src/qpid/legacystore/MessageStoreImpl.cpp [659:758]


void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
{
    checkInit();
    txn_list prepared;
    recoverLockedMappings(prepared);

    queue_index queues;//id->queue
    exchange_index exchanges;//id->exchange
    message_index messages;//id->message

    TxnCtxt txn;
    txn.begin(dbenv.get(), false);
    try {
        //read all queues, calls recoversMessages
        recoverQueues(txn, registry, queues, prepared, messages);

        //recover exchange & bindings:
        recoverExchanges(txn, registry, exchanges);
        recoverBindings(txn, exchanges, queues);

        //recover general-purpose configuration
        recoverGeneral(txn, registry);

        txn.commit();
    } catch (const DbException& e) {
        txn.abort();
        THROW_STORE_EXCEPTION_2("Error on recovery", e);
    } catch (...) {
        txn.abort();
        throw;
    }

    //recover transactions:
    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
        const PreparedTransaction pt = *i;
        if (mgmtObject.get() != 0) {
            mgmtObject->inc_tplTransactionDepth();
            mgmtObject->inc_tplTxnPrepares();
        }

        std::string xid = pt.xid;

        // Restore data token state in TxnCtxt
        TplRecoverMapCitr citr = tplRecoverMap.find(xid);
        if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");

        // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
        // was interrupted part way through committing/aborting the impacted queues. Complete this process.
        bool incomplTplTxnFlag = citr->second.deq_flag;

        if (citr->second.tpc_flag) {
            // Dtx (2PC) transaction
            TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
            std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
            tpcc->recoverDtok(citr->second.rid, xid);
            tpcc->prepare(tplStorePtr.get());

            qpid::broker::RecoverableTransaction::shared_ptr dtx;
            if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
            if (pt.enqueues.get()) {
                for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                    if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
                }
            }
            if (pt.dequeues.get()) {
                for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                    if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
                }
            }

            if (incomplTplTxnFlag) {
                tpcc->complete(citr->second.commit_flag);
            }
        } else {
            // Local (1PC) transaction
            boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
            opcc->recoverDtok(citr->second.rid, xid);
            opcc->prepare(tplStorePtr.get());

            if (pt.enqueues.get()) {
                for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                }
            }
            if (pt.dequeues.get()) {
                for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                }
            }
            if (incomplTplTxnFlag) {
                opcc->complete(citr->second.commit_flag);
            } else {
                completed(*opcc.get(), citr->second.commit_flag);
            }
        }
    }
    registry.recoveryComplete();
}