in SenderThread.cpp [506:623]
SenderState SenderThread::readFileChunks() {
WTLOG(INFO) << "entered READ_FILE_CHUNKS state ";
int64_t numRead = socket_->read(buf_, 1);
if (numRead != 1) {
WTLOG(ERROR) << "Socket read error 1 " << numRead;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return CHECK_FOR_ABORT;
}
threadStats_.addHeaderBytes(numRead);
Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[0];
if (cmd == Protocol::ABORT_CMD) {
return PROCESS_ABORT_CMD;
}
if (cmd == Protocol::WAIT_CMD) {
return READ_FILE_CHUNKS;
}
if (cmd == Protocol::ACK_CMD) {
if (!wdtParent_->isFileChunksReceived()) {
WTLOG(ERROR) << "Sender has not yet received file chunks, but receiver "
<< "thinks it has already sent it";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
return SEND_BLOCKS;
}
if (cmd == Protocol::LOCAL_CHECKPOINT_CMD) {
ErrorCode errCode = readAndVerifySpuriousCheckpoint();
if (errCode == SOCKET_READ_ERROR) {
return CONNECT;
}
if (errCode == PROTOCOL_ERROR) {
return END;
}
WDT_CHECK_EQ(OK, errCode);
return READ_FILE_CHUNKS;
}
if (cmd != Protocol::CHUNKS_CMD) {
WTLOG(ERROR) << "Unexpected cmd " << cmd;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
int64_t toRead = Protocol::kChunksCmdLen;
numRead = socket_->read(buf_, toRead);
if (numRead != toRead) {
WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return CHECK_FOR_ABORT;
}
threadStats_.addHeaderBytes(numRead);
int64_t off = 0;
int64_t bufSize, numFiles;
Protocol::decodeChunksCmd(buf_, off, bufSize_, bufSize, numFiles);
WTLOG(INFO) << "File chunk list has " << numFiles
<< " entries and is broken in buffers of length " << bufSize;
if (bufSize < 0 || numFiles < 0) {
WLOG(ERROR) << "Decoded bogus size for file chunks list bufSize = "
<< bufSize << " num files " << numFiles;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
std::unique_ptr<char[]> chunkBuffer(new char[bufSize]);
std::vector<FileChunksInfo> fileChunksInfoList;
while (true) {
int64_t numFileChunks = fileChunksInfoList.size();
if (numFileChunks > numFiles) {
// We should never be able to read more file chunks than mentioned in the
// chunks cmd. Chunks cmd has buffer size used to transfer chunks and also
// number of chunks. This chunks are read and parsed and added to
// fileChunksInfoList. Number of chunks we decode should match with the
// number mentioned in the Chunks cmd.
WTLOG(ERROR) << "Number of file chunks received is more than the number "
"mentioned in CHUNKS_CMD "
<< numFileChunks << " " << numFiles;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
if (numFileChunks == numFiles) {
break;
}
toRead = sizeof(int32_t);
numRead = socket_->read(buf_, toRead);
if (numRead != toRead) {
WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return CHECK_FOR_ABORT;
}
toRead = folly::loadUnaligned<int32_t>(buf_);
toRead = folly::Endian::little(toRead);
numRead = socket_->read(chunkBuffer.get(), toRead);
if (numRead != toRead) {
WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return CHECK_FOR_ABORT;
}
threadStats_.addHeaderBytes(numRead);
off = 0;
// decode function below adds decoded file chunks to fileChunksInfoList
bool success = Protocol::decodeFileChunksInfoList(
chunkBuffer.get(), off, toRead, fileChunksInfoList);
if (!success) {
WTLOG(ERROR) << "Unable to decode file chunks list";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
}
wdtParent_->setFileChunksInfo(fileChunksInfoList);
// send ack for file chunks list
buf_[0] = Protocol::ACK_CMD;
int64_t toWrite = 1;
int64_t written = socket_->write(buf_, toWrite);
if (toWrite != written) {
WTLOG(ERROR) << "Socket write error " << toWrite << " " << written;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return CHECK_FOR_ABORT;
}
threadStats_.addHeaderBytes(written);
return SEND_BLOCKS;
}