bool RecoveryManager::readNextRemainingRecord()

in src/qpid/linearstore/journal/RecoveryManager.cpp [184:290]


bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
                                              std::size_t& dataSize,
                                              void** const xidPtrPtr,
                                              std::size_t& xidSize,
                                              bool& transient,
                                              bool& external,
                                              data_tok* const dtokp,
                                              bool ignore_pending_txns) {
    bool foundRecord = false;
    do {
        if (recordIdListConstItr_ == recordIdList_.end()) {
            return false;
        }
        if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction
            ++recordIdListConstItr_; // ignore, go to next record
        } else {
            foundRecord = true;
        }
    } while (!foundRecord);

    if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) {
        if (!getFile(recordIdListConstItr_->fileId_, false)) {
            std::ostringstream oss;
            oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
            throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
        }
    }
    inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg);
    if (!inFileStream_.good()) {
        std::ostringstream oss;
        oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName();
        throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
    }

    ::enq_hdr_t enqueueHeader;
    inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
    if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
        std::ostringstream oss;
        oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_;
        throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
    }
    // check flags
    transient = ::is_enq_transient(&enqueueHeader);
    external = ::is_enq_external(&enqueueHeader);

    // read xid
    xidSize = enqueueHeader._xidsize;
    *xidPtrPtr = ::malloc(xidSize);
    if (*xidPtrPtr == 0) {
        std::ostringstream oss;
        oss << "xidPtr, size=0x" << std::hex << xidSize;
        throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
    }
    readJournalData((char*)*xidPtrPtr, xidSize);

    // read data
    dataSize = enqueueHeader._dsize;
    *dataPtrPtr = ::malloc(dataSize);
    if (*xidPtrPtr == 0) {
        std::ostringstream oss;
        oss << "dataPtr, size=0x" << std::hex << dataSize;
        throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
    }
    readJournalData((char*)*dataPtrPtr, dataSize);

    // Check enqueue record checksum
    Checksum checksum;
    checksum.addData((const unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
    if (xidSize > 0) {
        checksum.addData((const unsigned char*)*xidPtrPtr, xidSize);
    }
    if (dataSize > 0) {
        checksum.addData((const unsigned char*)*dataPtrPtr, dataSize);
    }
    ::rec_tail_t enqueueTail;
    readJournalData((char*)&enqueueTail, sizeof(::rec_tail_t));
    uint32_t cs = checksum.getChecksum();
    uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
    if (res != 0) {
        std::stringstream oss;
        oss << "Bad record tail:" << std::hex;
        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
            oss << std::endl << "  Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic;
        }
        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
            oss << std::endl << "  Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial;
        }
        if (res & ::REC_TAIL_RID_ERR_MASK) {
            oss << std::endl << "  Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid;
        }
        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum;
        }
        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, log info
    }

    // Set data token
    dtokp->set_wstate(data_tok::ENQ);
    dtokp->set_rid(enqueueHeader._rhdr._rid);
    dtokp->set_dsize(dataSize);
    if (xidSize) {
        dtokp->set_xid(*xidPtrPtr, xidSize);
    }

    ++recordIdListConstItr_;
    return true;
}