in src/qpid/linearstore/journal/RecoveryManager.cpp [184:290]
bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
std::size_t& dataSize,
void** const xidPtrPtr,
std::size_t& xidSize,
bool& transient,
bool& external,
data_tok* const dtokp,
bool ignore_pending_txns) {
bool foundRecord = false;
do {
if (recordIdListConstItr_ == recordIdList_.end()) {
return false;
}
if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction
++recordIdListConstItr_; // ignore, go to next record
} else {
foundRecord = true;
}
} while (!foundRecord);
if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) {
if (!getFile(recordIdListConstItr_->fileId_, false)) {
std::ostringstream oss;
oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
}
inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg);
if (!inFileStream_.good()) {
std::ostringstream oss;
oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName();
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
::enq_hdr_t enqueueHeader;
inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
std::ostringstream oss;
oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_;
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
if (*xidPtrPtr == 0) {
std::ostringstream oss;
oss << "xidPtr, size=0x" << std::hex << xidSize;
throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
readJournalData((char*)*xidPtrPtr, xidSize);
// read data
dataSize = enqueueHeader._dsize;
*dataPtrPtr = ::malloc(dataSize);
if (*xidPtrPtr == 0) {
std::ostringstream oss;
oss << "dataPtr, size=0x" << std::hex << dataSize;
throw jexception(jerrno::JERR__MALLOC, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
readJournalData((char*)*dataPtrPtr, dataSize);
// Check enqueue record checksum
Checksum checksum;
checksum.addData((const unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (xidSize > 0) {
checksum.addData((const unsigned char*)*xidPtrPtr, xidSize);
}
if (dataSize > 0) {
checksum.addData((const unsigned char*)*dataPtrPtr, dataSize);
}
::rec_tail_t enqueueTail;
readJournalData((char*)&enqueueTail, sizeof(::rec_tail_t));
uint32_t cs = checksum.getChecksum();
uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
if (res != 0) {
std::stringstream oss;
oss << "Bad record tail:" << std::hex;
if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
oss << std::endl << " Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic;
}
if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
oss << std::endl << " Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial;
}
if (res & ::REC_TAIL_RID_ERR_MASK) {
oss << std::endl << " Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid;
}
if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum;
}
throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, log info
}
// Set data token
dtokp->set_wstate(data_tok::ENQ);
dtokp->set_rid(enqueueHeader._rhdr._rid);
dtokp->set_dsize(dataSize);
if (xidSize) {
dtokp->set_xid(*xidPtrPtr, xidSize);
}
++recordIdListConstItr_;
return true;
}