in src/qpid/legacystore/MessageStoreImpl.cpp [929:1062]
void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& prepared,
message_index& messages,
long& rcnt,
long& idcnt)
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtok;
size_t readSize = 0;
unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
// breaks the python system test in phase 6 with "Exception: Cannot write lock file"
// Figure out what is breaking.
//bool read = jc->get_enq_cnt() > 0;
bool read = true;
void* dbuff = NULL; size_t dbuffSize = 0;
void* xidbuff = NULL; size_t xidbuffSize = 0;
bool transientFlag = false;
bool externalFlag = false;
dtok.set_wstate(DataTokenImpl::ENQ);
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
mrg::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
readSize = dtok.dsize();
switch (res)
{
case mrg::journal::RHM_IORES_SUCCESS: {
msg_count++;
qpid::broker::RecoverableMessage::shared_ptr msg;
char* data = (char*)dbuff;
unsigned headerSize;
if (externalFlag) {
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
// At some future point if delivery attempts are stored, then this call would
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
msg->computeExpiration();
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
if (i == prepared.end()) { // not in prepared list
rcnt++;
queue->recover(msg);
} else {
u_int64_t rid = dtok.rid();
std::string xid(i->xid);
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
// deq present in prepared list: this xid is part of incomplete txn commit/abort
// or this is a 1PC txn that must be rolled forward
if (citr->second.deq_flag || !citr->second.tpc_flag) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
if (!citr->second.commit_flag) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
// Enqueue and/or dequeue tx
journal::txn_map& tmap = jc->get_txn_map();
journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->_enq_flag && j->_rid == rid) enq = true;
else if (!j->_enq_flag && j->_drid == rid) deq = true;
}
if (enq && !deq && citr->second.commit_flag) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
}
} else {
idcnt++;
messages[rid] = msg;
}
}
dtok.reset();
dtok.set_wstate(DataTokenImpl::ENQ);
if (xidbuff)
::free(xidbuff);
else if (dbuff)
::free(dbuff);
aio_sleep_cnt = 0;
break;
}
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()");
::usleep(AIO_SLEEP_TIME_US);
break;
case mrg::journal::RHM_IORES_EMPTY:
read = false;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << mrg::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
} // while
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
}