void MessageStoreImpl::recover()

in src/qpid/linearstore/MessageStoreImpl.cpp [606:714]


void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
{
    checkInit();
    txn_list prepared;
    recoverLockedMappings(prepared);

    std::ostringstream oss;
    oss << "Recovered transaction prepared list:";
    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
        oss << std::endl << "     " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid);
    }
    QLS_LOG(debug, oss.str());

    queue_index queues;//id->queue
    exchange_index exchanges;//id->exchange
    message_index messages;//id->message

    TxnCtxt txn;
    txn.begin(dbenv.get(), false);
    try {
        //read all queues, calls recoversMessages for each queue
        recoverQueues(txn, registry_, queues, prepared, messages);

        //recover exchange & bindings:
        recoverExchanges(txn, registry_, exchanges);
        recoverBindings(txn, exchanges, queues);

        //recover general-purpose configuration
        recoverGeneral(txn, registry_);

        txn.commit();
    } catch (const DbException& e) {
        txn.abort();
        THROW_STORE_EXCEPTION_2("Error on recovery", e);
    } catch (...) {
        txn.abort();
        throw;
    }

    //recover transactions:
    qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
        const PreparedTransaction pt = *i;
        if (mgmtObject.get() != 0) {
            mgmtObject->inc_tplTransactionDepth();
            mgmtObject->inc_tplTxnPrepares();
        }

        std::string xid = pt.xid;
        qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
        if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
        qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
        bool commitFlag = txn_op_stats.abortCnt == 0;

        // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
        // was interrupted part way through committing/aborting the impacted queues. Complete this process.
        bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;

        if (txn_op_stats.tpcCnt > 0) {
            // Dtx (2PC) transaction
            TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
            std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
            tpcc->recoverDtok(txn_op_stats.rid, xid);
            tpcc->prepare(tplStorePtr.get());

            qpid::broker::RecoverableTransaction::shared_ptr dtx;
            if (!incomplTplTxnFlag) dtx = registry_.recoverTransaction(xid, txn);
            if (pt.enqueues.get()) {
                for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                    if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
                }
            }
            if (pt.dequeues.get()) {
                for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                    if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
                }
            }

            if (incomplTplTxnFlag) {
                tpcc->complete(commitFlag);
            }
        } else {
            // Local (1PC) transaction
            boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
            opcc->recoverDtok(txn_op_stats.rid, xid);
            opcc->prepare(tplStorePtr.get());

            if (pt.enqueues.get()) {
                for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                }
            }
            if (pt.dequeues.get()) {
                for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                }
            }
            if (incomplTplTxnFlag) {
                opcc->complete(commitFlag);
            } else {
                completed(*opcc.get(), commitFlag);
            }
        }

    }
    registry_.recoveryComplete();
}