void MessageStoreImpl::recoverMessages()

in src/qpid/legacystore/MessageStoreImpl.cpp [929:1062]


void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
                                      qpid::broker::RecoveryManager& recovery,
                                      qpid::broker::RecoverableQueue::shared_ptr& queue,
                                      txn_list& prepared,
                                      message_index& messages,
                                      long& rcnt,
                                      long& idcnt)
{
    size_t preambleLength = sizeof(u_int32_t)/*header size*/;

    JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
    DataTokenImpl dtok;
    size_t readSize = 0;
    unsigned msg_count = 0;

    // TODO: This optimization to skip reading if there are no enqueued messages to read
    // breaks the python system test in phase 6 with "Exception: Cannot write lock file"
    // Figure out what is breaking.
    //bool read = jc->get_enq_cnt() > 0;
    bool read = true;

    void* dbuff = NULL; size_t dbuffSize = 0;
    void* xidbuff = NULL; size_t xidbuffSize = 0;
    bool transientFlag = false;
    bool externalFlag = false;

    dtok.set_wstate(DataTokenImpl::ENQ);

    // Read the message from the Journal.
    try {
        unsigned aio_sleep_cnt = 0;
        while (read) {
            mrg::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
            readSize = dtok.dsize();

            switch (res)
            {
              case mrg::journal::RHM_IORES_SUCCESS: {
                msg_count++;
                qpid::broker::RecoverableMessage::shared_ptr msg;
                char* data = (char*)dbuff;

                unsigned headerSize;
                if (externalFlag) {
                    msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
                } else {
                    headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
                    qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
                    msg = recovery.recoverMessage(headerBuff);
                }
                msg->setPersistenceId(dtok.rid());
                // At some future point if delivery attempts are stored, then this call would
                // become optional depending on that information.
                msg->setRedelivered();
		// Reset the TTL for the recovered message
		msg->computeExpiration();

                u_int32_t contentOffset = headerSize + preambleLength;
                u_int64_t contentSize = readSize - contentOffset;
                if (msg->loadContent(contentSize) && !externalFlag) {
                    //now read the content
                    qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
                    msg->decodeContent(contentBuff);
                }

                PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
                if (i == prepared.end()) { // not in prepared list
                    rcnt++;
                    queue->recover(msg);
                } else {
                    u_int64_t rid = dtok.rid();
                    std::string xid(i->xid);
                    TplRecoverMapCitr citr = tplRecoverMap.find(xid);
                    if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");

                    // deq present in prepared list: this xid is part of incomplete txn commit/abort
                    // or this is a 1PC txn that must be rolled forward
                    if (citr->second.deq_flag || !citr->second.tpc_flag) {
                        if (jc->is_enqueued(rid, true)) {
                            // Enqueue is non-tx, dequeue tx
                            assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
                            if (!citr->second.commit_flag) {
                                rcnt++;
                                queue->recover(msg); // recover message in abort case only
                            }
                        } else {
                            // Enqueue and/or dequeue tx
                            journal::txn_map& tmap = jc->get_txn_map();
                            journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
                            bool enq = false;
                            bool deq = false;
                            for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
                                if (j->_enq_flag && j->_rid == rid) enq = true;
                                else if (!j->_enq_flag && j->_drid == rid) deq = true;
                            }
                            if (enq && !deq && citr->second.commit_flag) {
                                rcnt++;
                                queue->recover(msg); // recover txn message in commit case only
                            }
                        }
                    } else {
                        idcnt++;
                        messages[rid] = msg;
                    }
                }

                dtok.reset();
                dtok.set_wstate(DataTokenImpl::ENQ);

                if (xidbuff)
                    ::free(xidbuff);
                else if (dbuff)
                    ::free(dbuff);
                aio_sleep_cnt = 0;
                break;
              }
              case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
                if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
                    THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()");
                ::usleep(AIO_SLEEP_TIME_US);
                break;
              case mrg::journal::RHM_IORES_EMPTY:
                read = false;
                break; // done with all messages. (add call in jrnl to test that _emap is empty.)
              default:
                std::ostringstream oss;
                oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << mrg::journal::iores_str(res);
                THROW_STORE_EXCEPTION(oss.str());
            } // switch
        } // while
    } catch (const journal::jexception& e) {
        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
    }
}