in src/qpid/linearstore/MessageStoreImpl.cpp [888:1025]
void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& prepared,
message_index& messages,
long& rcnt,
long& idcnt)
{
size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
// breaks the python system test in phase 6 with "Exception: Cannot write lock file"
// Figure out what is breaking.
//bool read = jc->get_enq_cnt() > 0;
bool read = true;
void* dbuff = NULL;
size_t dbuffSize = 0;
void* xidbuff = NULL;
size_t xidbuffSize = 0;
bool transientFlag = false;
bool externalFlag = false;
DataTokenImpl dtok;
dtok.set_wstate(DataTokenImpl::NONE);
qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false);
switch (res)
{
case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
msg_count++;
qpid::broker::RecoverableMessage::shared_ptr msg;
char* data = (char*)dbuff;
unsigned headerSize;
if (externalFlag) {
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize);
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
// At some future point if delivery attempts are stored, then this call would
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
msg->computeExpiration();
uint32_t contentOffset = headerSize + preambleLength;
uint64_t contentSize = dbuffSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
if (i == prepared.end()) { // not in prepared list
rcnt++;
queue->recover(msg);
} else {
uint64_t rid = dtok.rid();
std::string xid(i->xid);
qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
if (txn_op_stats.abortCnt > 0) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
// Enqueue and/or dequeue tx
qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) {
if (j->enq_flag_ && j->rid_ == rid)
enq = true;
else if (!j->enq_flag_ && j->drid_ == rid)
deq = true;
}
if (enq && !deq && txn_op_stats.abortCnt == 0) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
}
} else {
idcnt++;
messages[rid] = msg;
}
}
dtok.reset();
dtok.set_wstate(DataTokenImpl::NONE);
if (xidbuff) {
::free(xidbuff);
xidbuff = NULL;
}
if (dbuff) {
::free(dbuff);
dbuff = NULL;
}
aio_sleep_cnt = 0;
break;
}
case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()");
::usleep(AIO_SLEEP_TIME_US);
break;
case qpid::linearstore::journal::RHM_IORES_EMPTY:
read = false;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::linearstore::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
} // while
} catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
}