in src/qpid/linearstore/MessageStoreImpl.cpp [606:714]
void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_)
{
checkInit();
txn_list prepared;
recoverLockedMappings(prepared);
std::ostringstream oss;
oss << "Recovered transaction prepared list:";
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
oss << std::endl << " " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid);
}
QLS_LOG(debug, oss.str());
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
message_index messages;//id->message
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
//read all queues, calls recoversMessages for each queue
recoverQueues(txn, registry_, queues, prepared, messages);
//recover exchange & bindings:
recoverExchanges(txn, registry_, exchanges);
recoverBindings(txn, exchanges, queues);
//recover general-purpose configuration
recoverGeneral(txn, registry_);
txn.commit();
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
} catch (...) {
txn.abort();
throw;
}
//recover transactions:
qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
std::string xid = pt.xid;
qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
bool commitFlag = txn_op_stats.abortCnt == 0;
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;
if (txn_op_stats.tpcCnt > 0) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
tpcc->recoverDtok(txn_op_stats.rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
if (!incomplTplTxnFlag) dtx = registry_.recoverTransaction(xid, txn);
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (pt.dequeues.get()) {
for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
}
if (incomplTplTxnFlag) {
tpcc->complete(commitFlag);
}
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
opcc->recoverDtok(txn_op_stats.rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
if (pt.dequeues.get()) {
for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
if (incomplTplTxnFlag) {
opcc->complete(commitFlag);
} else {
completed(*opcc.get(), commitFlag);
}
}
}
registry_.recoveryComplete();
}