in ReceiverThread.cpp [390:667]
ReceiverState ReceiverThread::processFileCmd() {
WTVLOG(1) << "entered PROCESS_FILE_CMD state";
// following block needs to be executed for the first file cmd. There is no
// harm in executing it more than once. number of blocks equal to 0 is a good
// approximation for first file cmd. Did not want to introduce another boolean
if (options_.enable_download_resumption && threadStats_.getNumBlocks() == 0) {
auto sendChunksFunnel = controller_->getFunnel(SEND_FILE_CHUNKS_FUNNEL);
auto state = sendChunksFunnel->getStatus();
if (state == FUNNEL_START) {
// sender is not in resumption mode
wdtParent_->addTransferLogHeader(isBlockMode_,
/* sender not resuming */ false);
sendChunksFunnel->notifySuccess();
}
}
checkpoint_.resetLastBlockDetails();
BlockDetails blockDetails;
auto guard = folly::makeGuard([&] {
if (threadStats_.getLocalErrorCode() != OK) {
threadStats_.incrFailedAttempts();
}
});
ErrorCode transferStatus = (ErrorCode)buf_[off_++];
if (transferStatus != OK) {
// TODO: use this status information to implement fail fast mode
WTVLOG(1) << "sender entered into error state "
<< errorCodeToStr(transferStatus);
}
int16_t headerLen = folly::loadUnaligned<int16_t>(buf_ + off_);
headerLen = folly::Endian::little(headerLen);
if (headerLen <= 0) {
WTLOG(ERROR) << "Header length must be positive " << headerLen;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
WVLOG(2) << "Processing FILE_CMD, header len " << headerLen;
sendHeartBeat();
if (headerLen > numRead_) {
int64_t end = oldOffset_ + numRead_;
numRead_ =
readAtLeast(*socket_, buf_ + end, bufSize_ - end, headerLen, numRead_);
}
if (numRead_ < headerLen) {
WTLOG(ERROR) << "Unable to read full header " << headerLen << " "
<< numRead_;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
off_ += sizeof(int16_t);
bool success = Protocol::decodeHeader(threadProtocolVersion_, buf_, off_,
numRead_ + oldOffset_, blockDetails);
int64_t headerBytes = off_ - oldOffset_;
// transferred header length must match decoded header length
if (headerLen != headerBytes) {
WTLOG(ERROR) << "Decoded header length: " << headerBytes
<< ", transferred header length: " << headerBytes
<< ", they should be equal.";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
threadStats_.addHeaderBytes(headerBytes);
threadStats_.addEffectiveBytes(headerBytes, 0);
if (!success) {
WTLOG(ERROR) << "Error decoding at"
<< " ooff:" << oldOffset_ << " off_: " << off_
<< " numRead_: " << numRead_;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
if (blockDetails.allocationStatus == TO_BE_DELETED &&
(blockDetails.fileSize != 0 || blockDetails.dataSize != 0)) {
WTLOG(ERROR) << "Invalid file header, file to be deleted, but "
"file-size/block-size not zero "
<< blockDetails.fileName << " file-size "
<< blockDetails.fileSize << " block-size "
<< blockDetails.dataSize;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
// received a well formed file cmd, apply the pending checkpoint update
checkpointIndex_ = pendingCheckpointIndex_;
WTVLOG(1) << "Read id:" << blockDetails.fileName
<< " size:" << blockDetails.dataSize << " ooff:" << oldOffset_
<< " off_: " << off_ << " numRead_: " << numRead_;
auto &fileCreator = wdtParent_->getFileCreator();
FileWriter writer(*threadCtx_, &blockDetails, fileCreator.get());
const auto encryptionType = socket_->getEncryptionType();
auto writtenGuard = folly::makeGuard([&] {
if (!encryptionTypeToTagLen(encryptionType) && footerType_ == NO_FOOTER) {
// if encryption doesn't have tag verification and checksum verification
// is disabled, we can consider bytes received before connection break as
// valid
checkpoint_.setLastBlockDetails(blockDetails.seqId, blockDetails.offset,
writer.getTotalWritten());
threadStats_.addEffectiveBytes(headerBytes, writer.getTotalWritten());
}
});
sendHeartBeat();
// writer.open() deletes files if status == TO_BE_DELETED
// therefore if !(!delete_extra_files && status == TO_BE_DELETED)
// we should skip writer.open() call altogether
if (options_.delete_extra_files ||
blockDetails.allocationStatus != TO_BE_DELETED) {
if (writer.open() != OK) {
threadStats_.setLocalErrorCode(FILE_WRITE_ERROR);
return SEND_ABORT_CMD;
}
}
int32_t checksum = 0;
int64_t remainingData = numRead_ + oldOffset_ - off_;
int64_t toWrite = remainingData;
WDT_CHECK(remainingData >= 0);
if (remainingData >= blockDetails.dataSize) {
toWrite = blockDetails.dataSize;
}
threadStats_.addDataBytes(toWrite);
if (footerType_ == CHECKSUM_FOOTER) {
checksum = folly::crc32c((const uint8_t *)(buf_ + off_), toWrite, checksum);
}
auto throttler = wdtParent_->getThrottler();
if (throttler) {
// We might be reading more than we require for this file but
// throttling should make sense for any additional bytes received
// on the network
throttler->limit(*threadCtx_, toWrite + headerBytes);
}
sendHeartBeat();
ErrorCode code = ERROR;
if (toWrite > 0) {
code = writer.write(buf_ + off_, toWrite);
if (code != OK) {
threadStats_.setLocalErrorCode(code);
return SEND_ABORT_CMD;
}
}
off_ += toWrite;
remainingData -= toWrite;
// also means no leftOver so it's ok we use buf_ from start
while (writer.getTotalWritten() < blockDetails.dataSize) {
if (wdtParent_->getCurAbortCode() != OK) {
WTLOG(ERROR) << "Thread marked for abort while processing "
<< blockDetails.fileName << " " << blockDetails.seqId
<< " port : " << socket_->getPort();
threadStats_.setLocalErrorCode(ABORT);
return FINISH_WITH_ERROR;
}
sendHeartBeat();
int64_t nres = readAtMost(*socket_, buf_, bufSize_,
blockDetails.dataSize - writer.getTotalWritten());
if (nres <= 0) {
break;
}
if (throttler) {
// We only know how much we have read after we are done calling
// readAtMost. Call throttler with the bytes read off_ the wire.
throttler->limit(*threadCtx_, nres);
}
threadStats_.addDataBytes(nres);
if (footerType_ == CHECKSUM_FOOTER) {
checksum = folly::crc32c((const uint8_t *)buf_, nres, checksum);
}
sendHeartBeat();
code = writer.write(buf_, nres);
if (code != OK) {
WTLOG(ERROR) << "failed to write to " << blockDetails.fileName;
threadStats_.setLocalErrorCode(code);
return SEND_ABORT_CMD;
}
}
// Sync the writer to disk and close it. We need to check for error code each
// time, otherwise we would move forward with corrupted files.
const ErrorCode syncCode = writer.sync();
if (syncCode != OK) {
WTLOG(ERROR) << "could not sync " << blockDetails.fileName << " to disk";
threadStats_.setLocalErrorCode(syncCode);
return SEND_ABORT_CMD;
}
const ErrorCode closeCode = writer.close();
if (closeCode != OK) {
WTLOG(ERROR) << "could not close " << blockDetails.fileName;
threadStats_.setLocalErrorCode(closeCode);
return SEND_ABORT_CMD;
}
if (writer.getTotalWritten() != blockDetails.dataSize) {
// This can only happen if there are transmission errors
// Write errors to disk are already taken care of above
WTLOG(ERROR) << "could not read entire content for "
<< blockDetails.fileName << " port " << socket_->getPort();
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
writtenGuard.dismiss();
WVLOG(2) << "completed " << blockDetails.fileName << " off: " << off_
<< " numRead: " << numRead_;
// Transfer of the file is complete here, mark the bytes effective
WDT_CHECK(remainingData >= 0) << "Negative remainingData " << remainingData;
if (remainingData > 0) {
// if we need to read more anyway, let's move the data
numRead_ = remainingData;
if ((remainingData < Protocol::kMaxHeader) && (off_ > (bufSize_ / 2))) {
// rare so inefficient is ok
WVLOG(3) << "copying extra " << remainingData << " leftover bytes @ "
<< off_;
memmove(/* dst */ buf_,
/* from */ buf_ + off_,
/* how much */ remainingData);
off_ = 0;
} else {
// otherwise just continue from the offset
WVLOG(3) << "Using remaining extra " << remainingData
<< " leftover bytes starting @ " << off_;
}
} else {
numRead_ = off_ = 0;
}
if (footerType_ == CHECKSUM_FOOTER) {
sendHeartBeat();
// have to read footer cmd
oldOffset_ = off_;
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
Protocol::kMinBufLength, numRead_);
if (numRead_ < Protocol::kMinBufLength) {
WTLOG(ERROR) << "socket read failure " << Protocol::kMinBufLength << " "
<< numRead_;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[off_++];
if (cmd != Protocol::FOOTER_CMD) {
WTLOG(ERROR) << "Expecting footer cmd, but received " << cmd;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
int32_t receivedChecksum;
bool ok = Protocol::decodeFooter(
buf_, off_, oldOffset_ + Protocol::kMaxFooter, receivedChecksum);
if (!ok) {
WTLOG(ERROR) << "Unable to decode footer cmd";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
if (checksum != receivedChecksum) {
WTLOG(ERROR) << "Checksum mismatch " << checksum << " "
<< receivedChecksum << " port " << socket_->getPort()
<< " file " << blockDetails.fileName;
threadStats_.setLocalErrorCode(CHECKSUM_MISMATCH);
return ACCEPT_WITH_TIMEOUT;
}
markBlockVerified(blockDetails);
int64_t msgLen = off_ - oldOffset_;
numRead_ -= msgLen;
} else {
WDT_CHECK(footerType_ == NO_FOOTER);
if (encryptionTypeToTagLen(encryptionType)) {
blocksWaitingVerification_.emplace_back(blockDetails);
} else {
markBlockVerified(blockDetails);
}
}
return READ_NEXT_CMD;
}