in src/qpid/legacystore/MessageStoreImpl.cpp [659:758]
void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
{
checkInit();
txn_list prepared;
recoverLockedMappings(prepared);
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
message_index messages;//id->message
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
//recover general-purpose configuration
recoverGeneral(txn, registry);
txn.commit();
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
} catch (...) {
txn.abort();
throw;
}
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
std::string xid = pt.xid;
// Restore data token state in TxnCtxt
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTplTxnFlag = citr->second.deq_flag;
if (citr->second.tpc_flag) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
tpcc->recoverDtok(citr->second.rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (pt.dequeues.get()) {
for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
}
if (incomplTplTxnFlag) {
tpcc->complete(citr->second.commit_flag);
}
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
opcc->recoverDtok(citr->second.rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
if (pt.dequeues.get()) {
for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
if (incomplTplTxnFlag) {
opcc->complete(citr->second.commit_flag);
} else {
completed(*opcc.get(), citr->second.commit_flag);
}
}
}
registry.recoveryComplete();
}