bool RecoveryManager::getNextRecordHeader()

in src/qpid/linearstore/journal/RecoveryManager.cpp [614:823]


bool RecoveryManager::getNextRecordHeader()
{
    std::size_t cum_size_read = 0;
    void* xidp = 0;
    rec_hdr_t h;

    bool hdr_ok = false;
    uint64_t file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
    std::streampos file_pos = 0;
    if (inFileStream_.is_open()) {
        inFileStream_.clear();
        file_pos = inFileStream_.tellg();
    }
    if (file_pos == std::streampos(-1)) {
        std::ostringstream oss;
        oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
        oss << " eof=" << (inFileStream_.eof()?"T":"F") << " good=" << (inFileStream_.good()?"T":"F");
        oss << " rdstate=0x" << std::hex << inFileStream_.rdstate() << std::dec;
        throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader");
    }
    while (!hdr_ok) {
        if (needNextFile()) {
            if (!getNextFile(true)) {
                lastRecord(file_id, file_pos);
                return false;
            }
        }
        file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
        file_pos = inFileStream_.tellg();
        if (file_pos == std::streampos(-1)) {
            std::ostringstream oss;
            oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
            oss << " eof=" << (inFileStream_.eof()?"T":"F") << " good=" << (inFileStream_.good()?"T":"F");
            oss << " rdstate=0x" << std::hex << inFileStream_.rdstate() << std::dec;
            throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader");
        }
        inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
        if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
            hdr_ok = true;
        } else {
            if (needNextFile()) {
                if (!getNextFile(true)) {
                    lastRecord(file_id, file_pos);
                    return false;
                }
            }
        }
    }

    uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
    switch(h._magic) {
        case QLS_ENQ_MAGIC:
            {
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
                if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
                    checkJournalAlignment(file_id, file_pos);
                    return false;
                }
                enq_rec er;
                if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) {
                    return false;
                }
                if (!er.is_transient()) { // Ignore transient msgs
                    fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount();
                    if (er.xid_size()) {
                        er.get_xid(&xidp);
                        if (xidp == 0) {
                            throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
                        }
                        std::string xid((char*)xidp, er.xid_size());
                        transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false, false));
                        if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
                            std::ostringstream oss;
                            oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
                        }
                    } else {
                        if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                            std::ostringstream oss;
                            oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
                        }
                    }
                }
            }
            break;
        case QLS_DEQ_MAGIC:
            {
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
                if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
                    checkJournalAlignment(file_id, file_pos);
                    return false;
                }
                deq_rec dr;
                if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) {
                    return false;
                }
                if (dr.xid_size()) {
                    // If the enqueue is part of a pending txn, it will not yet be in emap
                    enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
                    dr.get_xid(&xidp);
                    if (xidp == 0) {
                        throw jexception(jerrno::JERR_RCVM_NULLXID, "DEQ", "RecoveryManager", "getNextRecordHeader");
                    }
                    std::string xid((char*)xidp, dr.xid_size());
                    transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos,
                                                       false, false, dr.is_txn_coml_commit()));
                    if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
                        std::ostringstream oss;
                        oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
                        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
                    }
                } else {
                    uint64_t enq_fid;
                    if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
                        fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
                    }
                }
            }
            break;
        case QLS_TXA_MAGIC:
            {
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
                if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
                    checkJournalAlignment(file_id, file_pos);
                    return false;
                }
                txn_rec ar;
                if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) {
                    return false;
                }
                // Delete this txn from tmap, unlock any locked records in emap
                ar.get_xid(&xidp);
                if (xidp == 0) {
                    throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
                }
                std::string xid((char*)xidp, ar.xid_size());
                txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                    if (itr->enq_flag_) {
                        fileNumberMap_[itr->fid_]->journalFilePtr_->decrEnqueuedRecordCount();
                    } else {
                        enqueueMapRef_.unlock(itr->drid_); // ignore not found error
                    }
                }
            }
            break;
        case QLS_TXC_MAGIC:
            {
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
                if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
                    checkJournalAlignment(file_id, file_pos);
                    return false;
                }
                txn_rec cr;
                if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) {
                    return false;
                }
                // Delete this txn from tmap, process records into emap
                cr.get_xid(&xidp);
                if (xidp == 0) {
                    throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
                }
                std::string xid((char*)xidp, cr.xid_size());
                txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                    if (itr->enq_flag_) { // txn enqueue
//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->fid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
                        if (enqueueMapRef_.insert_pfid(itr->rid_, itr->fid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
                            // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                            std::ostringstream oss;
                            oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_;
                            throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
                        }
                    } else { // txn dequeue
                        uint64_t enq_fid;
                        if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
                            fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
                    }
                }
            }
            break;
        case QLS_EMPTY_MAGIC:
            {
//std::cout << ".x" << std::flush; // DEBUG
                uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
                inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
                checkFileStreamOk(false);
                if (needNextFile()) {
                    file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES;
                    if (!getNextFile(false)) {
                        lastRecord(start_fid, file_pos);
                        return false;
                    }
                }
            }
            break;
        case 0:
//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
            checkJournalAlignment(getCurrentFileNumber(), file_pos);
            return false;
        default:
//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
            // Stop as this is the overwrite boundary.
            checkJournalAlignment(getCurrentFileNumber(), file_pos);
            return false;
    }
    return true;
}