in src/qpid/linearstore/journal/RecoveryManager.cpp [614:823]
bool RecoveryManager::getNextRecordHeader()
{
std::size_t cum_size_read = 0;
void* xidp = 0;
rec_hdr_t h;
bool hdr_ok = false;
uint64_t file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
std::streampos file_pos = 0;
if (inFileStream_.is_open()) {
inFileStream_.clear();
file_pos = inFileStream_.tellg();
}
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
oss << " eof=" << (inFileStream_.eof()?"T":"F") << " good=" << (inFileStream_.good()?"T":"F");
oss << " rdstate=0x" << std::hex << inFileStream_.rdstate() << std::dec;
throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
while (!hdr_ok) {
if (needNextFile()) {
if (!getNextFile(true)) {
lastRecord(file_id, file_pos);
return false;
}
}
file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
file_pos = inFileStream_.tellg();
if (file_pos == std::streampos(-1)) {
std::ostringstream oss;
oss << "tellg() failure: fail=" << (inFileStream_.fail()?"T":"F") << " bad=" << (inFileStream_.bad()?"T":"F");
oss << " eof=" << (inFileStream_.eof()?"T":"F") << " good=" << (inFileStream_.good()?"T":"F");
oss << " rdstate=0x" << std::hex << inFileStream_.rdstate() << std::dec;
throw jexception(jerrno::JERR_RCVM_STREAMBAD, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
inFileStream_.read((char*)&h, sizeof(rec_hdr_t));
if (inFileStream_.gcount() == sizeof(rec_hdr_t)) {
hdr_ok = true;
} else {
if (needNextFile()) {
if (!getNextFile(true)) {
lastRecord(file_id, file_pos);
return false;
}
}
}
}
uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
switch(h._magic) {
case QLS_ENQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
checkJournalAlignment(file_id, file_pos);
return false;
}
enq_rec er;
if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (!er.is_transient()) { // Ignore transient msgs
fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount();
if (er.xid_size()) {
er.get_xid(&xidp);
if (xidp == 0) {
throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, er.xid_size());
transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false, false));
if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
} else {
if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
oss << std::hex << "rid=0x" << h._rid << " _pfid=0x" << start_fid;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
}
}
}
break;
case QLS_DEQ_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
checkJournalAlignment(file_id, file_pos);
return false;
}
deq_rec dr;
if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
if (dr.xid_size()) {
// If the enqueue is part of a pending txn, it will not yet be in emap
enqueueMapRef_.lock(dr.deq_rid()); // ignore not found error
dr.get_xid(&xidp);
if (xidp == 0) {
throw jexception(jerrno::JERR_RCVM_NULLXID, "DEQ", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, dr.xid_size());
transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos,
false, false, dr.is_txn_coml_commit()));
if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
break;
case QLS_TXA_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
checkJournalAlignment(file_id, file_pos);
return false;
}
txn_rec ar;
if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
if (xidp == 0) {
throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, ar.xid_size());
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
fileNumberMap_[itr->fid_]->journalFilePtr_->decrEnqueuedRecordCount();
} else {
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
}
}
break;
case QLS_TXC_MAGIC:
{
//std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
checkJournalAlignment(file_id, file_pos);
return false;
}
txn_rec cr;
if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) {
return false;
}
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
if (xidp == 0) {
throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, cr.xid_size());
txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) { // txn enqueue
//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->fid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
if (enqueueMapRef_.insert_pfid(itr->rid_, itr->fid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
} else { // txn dequeue
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
}
}
}
break;
case QLS_EMPTY_MAGIC:
{
//std::cout << ".x" << std::flush; // DEBUG
uint32_t rec_dblks = jrec::size_dblks(sizeof(::rec_hdr_t));
inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
checkFileStreamOk(false);
if (needNextFile()) {
file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES;
if (!getNextFile(false)) {
lastRecord(start_fid, file_pos);
return false;
}
}
}
break;
case 0:
//std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
default:
//std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
// Stop as this is the overwrite boundary.
checkJournalAlignment(getCurrentFileNumber(), file_pos);
return false;
}
return true;
}