ReceiverThread.cpp (923 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/ReceiverThread.h>
#include <folly/Conv.h>
#include <folly/Memory.h>
#include <folly/ScopeGuard.h>
#include <folly/String.h>
#include <folly/hash/Checksum.h>
#include <folly/lang/Bits.h>
#include <wdt/util/FileWriter.h>
namespace facebook {
namespace wdt {
const static int kTimeoutBufferMillis = 1000;
const static int kWaitTimeoutFactor = 5;
std::ostream &operator<<(std::ostream &os,
const ReceiverThread &receiverThread) {
os << "Thread[" << receiverThread.threadIndex_
<< ", port: " << receiverThread.socket_->getPort() << "] ";
return os;
}
int64_t readAtLeast(IServerSocket &s, char *buf, int64_t max, int64_t atLeast,
int64_t len) {
WVLOG(4) << "readAtLeast len " << len << " max " << max << " atLeast "
<< atLeast << " from " << s.getFd();
WDT_CHECK_GE(len, 0);
WDT_CHECK_GT(atLeast, 0);
WDT_CHECK_LE(atLeast, max);
int count = 0;
while (len < atLeast) {
// because we want to process data as soon as it arrives, tryFull option for
// read is false
int64_t n = s.read(buf + len, max - len, false);
if (n < 0) {
WPLOG(ERROR) << "Read error on " << s.getPort() << " after " << count;
if (len) {
return len;
} else {
return n;
}
}
if (n == 0) {
WVLOG(2) << "Eof on " << s.getPort() << " after " << count << " reads "
<< "got " << len;
return len;
}
len += n;
count++;
}
WVLOG(3) << "Took " << count << " reads to get " << len
<< " from fd : " << s.getFd();
return len;
}
int64_t readAtMost(IServerSocket &s, char *buf, int64_t max, int64_t atMost) {
const int64_t target = atMost < max ? atMost : max;
WVLOG(3) << "readAtMost target " << target;
// because we want to process data as soon as it arrives, tryFull option for
// read is false
int64_t n = s.read(buf, target, false);
if (n < 0) {
WPLOG(ERROR) << "Read error on " << s.getPort() << " target " << target;
return n;
}
if (n == 0) {
WLOG(WARNING) << "Eof on " << s.getFd();
return n;
}
WVLOG(3) << "readAtMost " << n << " / " << atMost << " from " << s.getFd();
return n;
}
const ReceiverThread::StateFunction ReceiverThread::stateMap_[] = {
&ReceiverThread::listen,
&ReceiverThread::acceptFirstConnection,
&ReceiverThread::acceptWithTimeout,
&ReceiverThread::sendLocalCheckpoint,
&ReceiverThread::readNextCmd,
&ReceiverThread::processFileCmd,
&ReceiverThread::processSettingsCmd,
&ReceiverThread::processDoneCmd,
&ReceiverThread::processSizeCmd,
&ReceiverThread::sendFileChunks,
&ReceiverThread::sendGlobalCheckpoint,
&ReceiverThread::sendDoneCmd,
&ReceiverThread::sendAbortCmd,
&ReceiverThread::waitForFinishOrNewCheckpoint,
&ReceiverThread::finishWithError};
ReceiverThread::ReceiverThread(Receiver *wdtParent, int threadIndex,
int32_t port, ThreadsController *controller)
: WdtThread(wdtParent->options_, threadIndex, port,
wdtParent->getProtocolVersion(), controller),
wdtParent_(wdtParent) {
controller_->registerThread(threadIndex_);
threadCtx_->setAbortChecker(&wdtParent_->abortCheckerCallback_);
}
/**LISTEN STATE***/
ReceiverState ReceiverThread::listen() {
WTVLOG(1) << "entered LISTEN state";
const bool doActualWrites = !options_.skip_writes;
int32_t port = socket_->getPort();
WVLOG(1) << "Server Thread for port " << port << " with backlog "
<< socket_->getBackLog() << " on " << wdtParent_->getDirectory()
<< " writes = " << doActualWrites;
for (int retry = 1; retry < options_.max_retries; ++retry) {
ErrorCode code = socket_->listen();
if (code == OK) {
break;
} else if (code == CONN_ERROR) {
threadStats_.setLocalErrorCode(code);
return FINISH_WITH_ERROR;
}
WTLOG(INFO) << "Sleeping after failed attempt " << retry;
/* sleep override */
usleep(options_.sleep_millis * 1000);
}
// one more/last try (stays true if it worked above)
if (socket_->listen() != OK) {
WTLOG(ERROR) << "Unable to listen/bind despite retries";
threadStats_.setLocalErrorCode(CONN_ERROR);
return FINISH_WITH_ERROR;
}
return ACCEPT_FIRST_CONNECTION;
}
/***ACCEPT_FIRST_CONNECTION***/
ReceiverState ReceiverThread::acceptFirstConnection() {
WTVLOG(1) << "entered ACCEPT_FIRST_CONNECTION state";
reset();
socket_->closeNoCheck();
auto timeout = options_.accept_timeout_millis;
int acceptAttempts = 0;
while (true) {
// Move to timeout state if some other thread was successful
// in getting a connection
if (wdtParent_->hasNewTransferStarted()) {
return ACCEPT_WITH_TIMEOUT;
}
switch (wdtParent_->getAcceptMode()) {
case Receiver::AcceptMode::ACCEPT_WITH_RETRIES: {
if (acceptAttempts >= options_.max_accept_retries) {
WTLOG(ERROR) << "Unable to accept after " << acceptAttempts
<< " attempts";
threadStats_.setLocalErrorCode(CONN_ERROR);
return FINISH_WITH_ERROR;
}
break;
}
case Receiver::AcceptMode::ACCEPT_FOREVER: {
WTVLOG(2) << "Receiver is configured to accept for-ever";
break;
}
case Receiver::AcceptMode::STOP_ACCEPTING: {
WTLOG(ERROR) << "Receiver is asked to stop accepting, attempts : "
<< acceptAttempts;
threadStats_.setLocalErrorCode(CONN_ERROR);
return FINISH_WITH_ERROR;
}
}
if (wdtParent_->getCurAbortCode() != OK) {
WTLOG(ERROR) << "Thread marked to abort while trying to accept "
<< "first connection. Num attempts " << acceptAttempts;
threadStats_.setLocalErrorCode(ABORT);
return FINISH_WITH_ERROR;
}
ErrorCode code =
socket_->acceptNextConnection(timeout, curConnectionVerified_);
if (code == OK) {
break;
}
++acceptAttempts;
}
// Make the parent start new global session. This is executed
// only by the first thread that calls this function
controller_->executeAtStart(
[&]() { wdtParent_->startNewGlobalSession(socket_->getPeerIp()); });
return READ_NEXT_CMD;
}
/***ACCEPT_WITH_TIMEOUT STATE***/
ReceiverState ReceiverThread::acceptWithTimeout() {
WTLOG(INFO) << "entered ACCEPT_WITH_TIMEOUT state";
// check socket status
ErrorCode socketErrCode = socket_->getNonRetryableErrCode();
if (socketErrCode != OK) {
WTLOG(ERROR) << "Socket has non-retryable error "
<< errorCodeToStr(socketErrCode);
threadStats_.setLocalErrorCode(socketErrCode);
return END;
}
socket_->closeNoCheck();
blocksWaitingVerification_.clear();
auto timeout = options_.accept_window_millis;
if (senderReadTimeout_ > 0) {
// transfer is in progress and we have already got sender settings
timeout = std::max(senderReadTimeout_, senderWriteTimeout_) +
kTimeoutBufferMillis;
}
ErrorCode code =
socket_->acceptNextConnection(timeout, curConnectionVerified_);
curConnectionVerified_ = false;
if (code != OK) {
WTLOG(ERROR) << "accept() failed with error " << errorCodeToStr(code)
<< " timeout " << timeout;
threadStats_.setLocalErrorCode(code);
return FINISH_WITH_ERROR;
}
numRead_ = off_ = 0;
pendingCheckpointIndex_ = checkpointIndex_;
ReceiverState nextState = READ_NEXT_CMD;
if (threadStats_.getLocalErrorCode() != OK) {
nextState = SEND_LOCAL_CHECKPOINT;
}
// reset thread status
threadStats_.setLocalErrorCode(OK);
return nextState;
}
/***SEND_LOCAL_CHECKPOINT STATE***/
ReceiverState ReceiverThread::sendLocalCheckpoint() {
WTLOG(INFO) << "entered SEND_LOCAL_CHECKPOINT state " << checkpoint_;
std::vector<Checkpoint> checkpoints;
checkpoints.emplace_back(checkpoint_);
int64_t off = 0;
const int checkpointLen =
Protocol::getMaxLocalCheckpointLength(threadProtocolVersion_);
Protocol::encodeCheckpoints(threadProtocolVersion_, buf_, off, checkpointLen,
checkpoints);
int written = socket_->write(buf_, checkpointLen);
if (written != checkpointLen) {
WTLOG(ERROR) << "unable to write local checkpoint. write mismatch "
<< checkpointLen << " " << written;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(checkpointLen);
return READ_NEXT_CMD;
}
/***READ_NEXT_CMD***/
ReceiverState ReceiverThread::readNextCmd() {
WTVLOG(1) << "entered READ_NEXT_CMD state";
oldOffset_ = off_;
// TODO: we shouldn't have off_ here and buffer/size inside buffer.
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
Protocol::kMinBufLength, numRead_);
if (numRead_ < Protocol::kMinBufLength) {
WTLOG(ERROR) << "socket read failure " << Protocol::kMinBufLength << " "
<< numRead_;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[off_++];
if (cmd == Protocol::DONE_CMD) {
return PROCESS_DONE_CMD;
}
if (cmd == Protocol::FILE_CMD) {
return PROCESS_FILE_CMD;
}
if (cmd == Protocol::SETTINGS_CMD) {
return PROCESS_SETTINGS_CMD;
}
if (cmd == Protocol::SIZE_CMD) {
return PROCESS_SIZE_CMD;
}
WTLOG(ERROR) << "received an unknown cmd " << cmd;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
/***PROCESS_SETTINGS_CMD***/
ReceiverState ReceiverThread::processSettingsCmd() {
WTVLOG(1) << "entered PROCESS_SETTINGS_CMD state";
Settings settings;
int senderProtocolVersion;
bool success = Protocol::decodeVersion(
buf_, off_, oldOffset_ + Protocol::kMaxVersion, senderProtocolVersion);
if (!success) {
WTLOG(ERROR) << "Unable to decode version " << threadIndex_;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
if (senderProtocolVersion != threadProtocolVersion_) {
WTLOG(WARNING) << "Receiver and sender protocol version mismatch "
<< senderProtocolVersion << " " << threadProtocolVersion_;
int negotiatedProtocol = Protocol::negotiateProtocol(
senderProtocolVersion, threadProtocolVersion_);
if (negotiatedProtocol == 0) {
WTLOG(WARNING) << "Can not support sender with version "
<< senderProtocolVersion << ", aborting!";
threadStats_.setLocalErrorCode(VERSION_INCOMPATIBLE);
return SEND_ABORT_CMD;
} else {
WLOG_IF(INFO, threadProtocolVersion_ != negotiatedProtocol)
<< *this << "Changing receiver protocol version to "
<< negotiatedProtocol;
threadProtocolVersion_ = negotiatedProtocol;
if (negotiatedProtocol != senderProtocolVersion) {
threadStats_.setLocalErrorCode(VERSION_MISMATCH);
return SEND_ABORT_CMD;
}
}
}
if (threadProtocolVersion_ <
Protocol::PERIODIC_ENCRYPTION_IV_CHANGE_VERSION) {
socket_->disableIvChange();
}
success = Protocol::decodeSettings(
threadProtocolVersion_, buf_, off_,
oldOffset_ + Protocol::kMaxVersion + Protocol::kMaxSettings, settings);
if (!success) {
WTLOG(ERROR) << "Unable to decode settings cmd ";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
auto senderId = settings.transferId;
auto transferId = wdtParent_->getTransferId();
if (transferId != senderId) {
WTLOG(ERROR) << "Receiver and sender id mismatch " << senderId << " "
<< transferId;
threadStats_.setLocalErrorCode(ID_MISMATCH);
return SEND_ABORT_CMD;
}
senderReadTimeout_ = settings.readTimeoutMillis;
senderWriteTimeout_ = settings.writeTimeoutMillis;
isBlockMode_ = !settings.blockModeDisabled;
enableHeartBeat_ = settings.enableHeartBeat;
if (!enableHeartBeat_) {
WTLOG(INFO) << "Disabling heart-beat as sender does not support it";
}
curConnectionVerified_ = true;
// determine footer type
if (settings.enableChecksum) {
footerType_ = CHECKSUM_FOOTER;
} else {
footerType_ = NO_FOOTER;
}
if (settings.sendFileChunks) {
// We only move to SEND_FILE_CHUNKS state, if download resumption is enabled
// in the sender side
numRead_ = off_ = 0;
return SEND_FILE_CHUNKS;
}
auto msgLen = off_ - oldOffset_;
numRead_ -= msgLen;
return READ_NEXT_CMD;
}
void ReceiverThread::sendHeartBeat() {
if (!enableHeartBeat_) {
return;
}
const auto now = Clock::now();
const int timeSinceLastHeartBeatMs = durationMillis(now - lastHeartBeatTime_);
const int heartBeatIntervalMs = (senderReadTimeout_ / kWaitTimeoutFactor);
if (timeSinceLastHeartBeatMs <= heartBeatIntervalMs) {
return;
}
lastHeartBeatTime_ = now;
// time to send a heart beat
char buf = Protocol::HEART_BEAT_CMD;
const int written = socket_->write(&buf, 1);
if (written != 1) {
WTLOG(WARNING) << "Failed to send heart-beat " << written;
}
}
/***PROCESS_FILE_CMD***/
ReceiverState ReceiverThread::processFileCmd() {
WTVLOG(1) << "entered PROCESS_FILE_CMD state";
// following block needs to be executed for the first file cmd. There is no
// harm in executing it more than once. number of blocks equal to 0 is a good
// approximation for first file cmd. Did not want to introduce another boolean
if (options_.enable_download_resumption && threadStats_.getNumBlocks() == 0) {
auto sendChunksFunnel = controller_->getFunnel(SEND_FILE_CHUNKS_FUNNEL);
auto state = sendChunksFunnel->getStatus();
if (state == FUNNEL_START) {
// sender is not in resumption mode
wdtParent_->addTransferLogHeader(isBlockMode_,
/* sender not resuming */ false);
sendChunksFunnel->notifySuccess();
}
}
checkpoint_.resetLastBlockDetails();
BlockDetails blockDetails;
auto guard = folly::makeGuard([&] {
if (threadStats_.getLocalErrorCode() != OK) {
threadStats_.incrFailedAttempts();
}
});
ErrorCode transferStatus = (ErrorCode)buf_[off_++];
if (transferStatus != OK) {
// TODO: use this status information to implement fail fast mode
WTVLOG(1) << "sender entered into error state "
<< errorCodeToStr(transferStatus);
}
int16_t headerLen = folly::loadUnaligned<int16_t>(buf_ + off_);
headerLen = folly::Endian::little(headerLen);
if (headerLen <= 0) {
WTLOG(ERROR) << "Header length must be positive " << headerLen;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
WVLOG(2) << "Processing FILE_CMD, header len " << headerLen;
sendHeartBeat();
if (headerLen > numRead_) {
int64_t end = oldOffset_ + numRead_;
numRead_ =
readAtLeast(*socket_, buf_ + end, bufSize_ - end, headerLen, numRead_);
}
if (numRead_ < headerLen) {
WTLOG(ERROR) << "Unable to read full header " << headerLen << " "
<< numRead_;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
off_ += sizeof(int16_t);
bool success = Protocol::decodeHeader(threadProtocolVersion_, buf_, off_,
numRead_ + oldOffset_, blockDetails);
int64_t headerBytes = off_ - oldOffset_;
// transferred header length must match decoded header length
if (headerLen != headerBytes) {
WTLOG(ERROR) << "Decoded header length: " << headerBytes
<< ", transferred header length: " << headerBytes
<< ", they should be equal.";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
threadStats_.addHeaderBytes(headerBytes);
threadStats_.addEffectiveBytes(headerBytes, 0);
if (!success) {
WTLOG(ERROR) << "Error decoding at"
<< " ooff:" << oldOffset_ << " off_: " << off_
<< " numRead_: " << numRead_;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
if (blockDetails.allocationStatus == TO_BE_DELETED &&
(blockDetails.fileSize != 0 || blockDetails.dataSize != 0)) {
WTLOG(ERROR) << "Invalid file header, file to be deleted, but "
"file-size/block-size not zero "
<< blockDetails.fileName << " file-size "
<< blockDetails.fileSize << " block-size "
<< blockDetails.dataSize;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
// received a well formed file cmd, apply the pending checkpoint update
checkpointIndex_ = pendingCheckpointIndex_;
WTVLOG(1) << "Read id:" << blockDetails.fileName
<< " size:" << blockDetails.dataSize << " ooff:" << oldOffset_
<< " off_: " << off_ << " numRead_: " << numRead_;
auto &fileCreator = wdtParent_->getFileCreator();
FileWriter writer(*threadCtx_, &blockDetails, fileCreator.get());
const auto encryptionType = socket_->getEncryptionType();
auto writtenGuard = folly::makeGuard([&] {
if (!encryptionTypeToTagLen(encryptionType) && footerType_ == NO_FOOTER) {
// if encryption doesn't have tag verification and checksum verification
// is disabled, we can consider bytes received before connection break as
// valid
checkpoint_.setLastBlockDetails(blockDetails.seqId, blockDetails.offset,
writer.getTotalWritten());
threadStats_.addEffectiveBytes(headerBytes, writer.getTotalWritten());
}
});
sendHeartBeat();
// writer.open() deletes files if status == TO_BE_DELETED
// therefore if !(!delete_extra_files && status == TO_BE_DELETED)
// we should skip writer.open() call altogether
if (options_.delete_extra_files ||
blockDetails.allocationStatus != TO_BE_DELETED) {
if (writer.open() != OK) {
threadStats_.setLocalErrorCode(FILE_WRITE_ERROR);
return SEND_ABORT_CMD;
}
}
int32_t checksum = 0;
int64_t remainingData = numRead_ + oldOffset_ - off_;
int64_t toWrite = remainingData;
WDT_CHECK(remainingData >= 0);
if (remainingData >= blockDetails.dataSize) {
toWrite = blockDetails.dataSize;
}
threadStats_.addDataBytes(toWrite);
if (footerType_ == CHECKSUM_FOOTER) {
checksum = folly::crc32c((const uint8_t *)(buf_ + off_), toWrite, checksum);
}
auto throttler = wdtParent_->getThrottler();
if (throttler) {
// We might be reading more than we require for this file but
// throttling should make sense for any additional bytes received
// on the network
throttler->limit(*threadCtx_, toWrite + headerBytes);
}
sendHeartBeat();
ErrorCode code = ERROR;
if (toWrite > 0) {
code = writer.write(buf_ + off_, toWrite);
if (code != OK) {
threadStats_.setLocalErrorCode(code);
return SEND_ABORT_CMD;
}
}
off_ += toWrite;
remainingData -= toWrite;
// also means no leftOver so it's ok we use buf_ from start
while (writer.getTotalWritten() < blockDetails.dataSize) {
if (wdtParent_->getCurAbortCode() != OK) {
WTLOG(ERROR) << "Thread marked for abort while processing "
<< blockDetails.fileName << " " << blockDetails.seqId
<< " port : " << socket_->getPort();
threadStats_.setLocalErrorCode(ABORT);
return FINISH_WITH_ERROR;
}
sendHeartBeat();
int64_t nres = readAtMost(*socket_, buf_, bufSize_,
blockDetails.dataSize - writer.getTotalWritten());
if (nres <= 0) {
break;
}
if (throttler) {
// We only know how much we have read after we are done calling
// readAtMost. Call throttler with the bytes read off_ the wire.
throttler->limit(*threadCtx_, nres);
}
threadStats_.addDataBytes(nres);
if (footerType_ == CHECKSUM_FOOTER) {
checksum = folly::crc32c((const uint8_t *)buf_, nres, checksum);
}
sendHeartBeat();
code = writer.write(buf_, nres);
if (code != OK) {
WTLOG(ERROR) << "failed to write to " << blockDetails.fileName;
threadStats_.setLocalErrorCode(code);
return SEND_ABORT_CMD;
}
}
// Sync the writer to disk and close it. We need to check for error code each
// time, otherwise we would move forward with corrupted files.
const ErrorCode syncCode = writer.sync();
if (syncCode != OK) {
WTLOG(ERROR) << "could not sync " << blockDetails.fileName << " to disk";
threadStats_.setLocalErrorCode(syncCode);
return SEND_ABORT_CMD;
}
const ErrorCode closeCode = writer.close();
if (closeCode != OK) {
WTLOG(ERROR) << "could not close " << blockDetails.fileName;
threadStats_.setLocalErrorCode(closeCode);
return SEND_ABORT_CMD;
}
if (writer.getTotalWritten() != blockDetails.dataSize) {
// This can only happen if there are transmission errors
// Write errors to disk are already taken care of above
WTLOG(ERROR) << "could not read entire content for "
<< blockDetails.fileName << " port " << socket_->getPort();
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
writtenGuard.dismiss();
WVLOG(2) << "completed " << blockDetails.fileName << " off: " << off_
<< " numRead: " << numRead_;
// Transfer of the file is complete here, mark the bytes effective
WDT_CHECK(remainingData >= 0) << "Negative remainingData " << remainingData;
if (remainingData > 0) {
// if we need to read more anyway, let's move the data
numRead_ = remainingData;
if ((remainingData < Protocol::kMaxHeader) && (off_ > (bufSize_ / 2))) {
// rare so inefficient is ok
WVLOG(3) << "copying extra " << remainingData << " leftover bytes @ "
<< off_;
memmove(/* dst */ buf_,
/* from */ buf_ + off_,
/* how much */ remainingData);
off_ = 0;
} else {
// otherwise just continue from the offset
WVLOG(3) << "Using remaining extra " << remainingData
<< " leftover bytes starting @ " << off_;
}
} else {
numRead_ = off_ = 0;
}
if (footerType_ == CHECKSUM_FOOTER) {
sendHeartBeat();
// have to read footer cmd
oldOffset_ = off_;
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
Protocol::kMinBufLength, numRead_);
if (numRead_ < Protocol::kMinBufLength) {
WTLOG(ERROR) << "socket read failure " << Protocol::kMinBufLength << " "
<< numRead_;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[off_++];
if (cmd != Protocol::FOOTER_CMD) {
WTLOG(ERROR) << "Expecting footer cmd, but received " << cmd;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
int32_t receivedChecksum;
bool ok = Protocol::decodeFooter(
buf_, off_, oldOffset_ + Protocol::kMaxFooter, receivedChecksum);
if (!ok) {
WTLOG(ERROR) << "Unable to decode footer cmd";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
if (checksum != receivedChecksum) {
WTLOG(ERROR) << "Checksum mismatch " << checksum << " "
<< receivedChecksum << " port " << socket_->getPort()
<< " file " << blockDetails.fileName;
threadStats_.setLocalErrorCode(CHECKSUM_MISMATCH);
return ACCEPT_WITH_TIMEOUT;
}
markBlockVerified(blockDetails);
int64_t msgLen = off_ - oldOffset_;
numRead_ -= msgLen;
} else {
WDT_CHECK(footerType_ == NO_FOOTER);
if (encryptionTypeToTagLen(encryptionType)) {
blocksWaitingVerification_.emplace_back(blockDetails);
} else {
markBlockVerified(blockDetails);
}
}
return READ_NEXT_CMD;
}
void ReceiverThread::markBlockVerified(const BlockDetails &blockDetails) {
threadStats_.addEffectiveBytes(0, blockDetails.dataSize);
threadStats_.incrNumBlocks();
checkpoint_.incrNumBlocks();
if (!options_.isLogBasedResumption()) {
return;
}
TransferLogManager &transferLogManager = wdtParent_->getTransferLogManager();
if (blockDetails.allocationStatus == TO_BE_DELETED) {
transferLogManager.addFileInvalidationEntry(blockDetails.seqId);
return;
}
transferLogManager.addBlockWriteEntry(blockDetails.seqId, blockDetails.offset,
blockDetails.dataSize);
}
void ReceiverThread::markReceivedBlocksVerified() {
for (const BlockDetails &blockDetails : blocksWaitingVerification_) {
markBlockVerified(blockDetails);
}
blocksWaitingVerification_.clear();
}
ReceiverState ReceiverThread::processDoneCmd() {
WTVLOG(1) << "entered PROCESS_DONE_CMD state";
if (numRead_ != Protocol::kMinBufLength) {
WTLOG(ERROR) << "Unexpected state for done command"
<< " off_: " << off_ << " numRead_: " << numRead_;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
ErrorCode senderStatus = (ErrorCode)buf_[off_++];
int64_t numBlocksSend = -1;
int64_t totalSenderBytes = -1;
bool success = Protocol::decodeDone(threadProtocolVersion_, buf_, off_,
oldOffset_ + Protocol::kMaxDone,
numBlocksSend, totalSenderBytes);
if (!success) {
WTLOG(ERROR) << "Unable to decode done cmd";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
threadStats_.setNumBlocksSend(numBlocksSend);
threadStats_.setTotalSenderBytes(totalSenderBytes);
threadStats_.setRemoteErrorCode(senderStatus);
// received a valid command, applying pending checkpoint write update
checkpointIndex_ = pendingCheckpointIndex_;
return WAIT_FOR_FINISH_OR_NEW_CHECKPOINT;
}
ReceiverState ReceiverThread::processSizeCmd() {
WTVLOG(1) << "entered PROCESS_SIZE_CMD state";
int64_t totalSenderBytes;
bool success = Protocol::decodeSize(
buf_, off_, oldOffset_ + Protocol::kMaxSize, totalSenderBytes);
if (!success) {
WTLOG(ERROR) << "Unable to decode size cmd";
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return FINISH_WITH_ERROR;
}
WVLOG(1) << "Number of bytes to receive " << totalSenderBytes;
threadStats_.setTotalSenderBytes(totalSenderBytes);
auto msgLen = off_ - oldOffset_;
numRead_ -= msgLen;
return READ_NEXT_CMD;
}
ReceiverState ReceiverThread::sendFileChunks() {
WTLOG(INFO) << "entered SEND_FILE_CHUNKS state";
WDT_CHECK(senderReadTimeout_ > 0); // must have received settings
int waitingTimeMillis = senderReadTimeout_ / kWaitTimeoutFactor;
auto execFunnel = controller_->getFunnel(SEND_FILE_CHUNKS_FUNNEL);
while (true) {
auto status = execFunnel->getStatus();
switch (status) {
case FUNNEL_END: {
buf_[0] = Protocol::ACK_CMD;
int toWrite = 1;
int written = socket_->write(buf_, toWrite);
if (written != toWrite) {
WTLOG(ERROR) << "socket write error " << toWrite << " " << written;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(toWrite);
return READ_NEXT_CMD;
}
case FUNNEL_PROGRESS: {
buf_[0] = Protocol::WAIT_CMD;
int toWrite = 1;
int written = socket_->write(buf_, toWrite);
if (written != toWrite) {
WTLOG(ERROR) << "socket write error " << toWrite << " " << written;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(toWrite);
execFunnel->wait(waitingTimeMillis, *threadCtx_);
break;
}
case FUNNEL_START: {
int64_t off = 0;
buf_[off++] = Protocol::CHUNKS_CMD;
const auto &fileChunksInfo = wdtParent_->getFileChunksInfo();
const int64_t numParsedChunksInfo = fileChunksInfo.size();
Protocol::encodeChunksCmd(buf_, off, /* size of buf_ */ bufSize_,
/* param to send */ bufSize_,
numParsedChunksInfo);
int written = socket_->write(buf_, off);
if (written > 0) {
threadStats_.addHeaderBytes(written);
}
if (written != off) {
WTLOG(ERROR) << "socket write err " << off << " " << written;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
execFunnel->notifyFail();
return ACCEPT_WITH_TIMEOUT;
}
int64_t numEntriesWritten = 0;
// we try to encode as many chunks as possible in the buffer. If a
// single
// chunk can not fit in the buffer, it is ignored. Format of encoding :
// <data-size><chunk1><chunk2>...
while (numEntriesWritten < numParsedChunksInfo) {
off = sizeof(int32_t);
int64_t numEntriesEncoded = Protocol::encodeFileChunksInfoList(
buf_, off, bufSize_, numEntriesWritten, fileChunksInfo);
int32_t dataSize = folly::Endian::little(off - sizeof(int32_t));
folly::storeUnaligned<int32_t>(buf_, dataSize);
written = socket_->write(buf_, off);
if (written > 0) {
threadStats_.addHeaderBytes(written);
}
if (written != off) {
break;
}
numEntriesWritten += numEntriesEncoded;
}
if (numEntriesWritten != numParsedChunksInfo) {
WTLOG(ERROR) << "Could not write all the file chunks "
<< numParsedChunksInfo << " " << numEntriesWritten;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
execFunnel->notifyFail();
return ACCEPT_WITH_TIMEOUT;
}
// try to read ack
int64_t toRead = 1;
int64_t numRead = socket_->read(buf_, toRead);
if (numRead != toRead) {
WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
execFunnel->notifyFail();
return ACCEPT_WITH_TIMEOUT;
}
wdtParent_->addTransferLogHeader(isBlockMode_,
/* sender resuming */ true);
execFunnel->notifySuccess();
return READ_NEXT_CMD;
}
}
}
}
ReceiverState ReceiverThread::sendGlobalCheckpoint() {
WTLOG(INFO) << "entered SEND_GLOBAL_CHECKPOINTS state";
buf_[0] = Protocol::ERR_CMD;
off_ = 1;
// leave space for length
off_ += sizeof(int16_t);
auto oldOffset = off_;
Protocol::encodeCheckpoints(threadProtocolVersion_, buf_, off_, bufSize_,
newCheckpoints_);
int16_t length = off_ - oldOffset;
folly::storeUnaligned<int16_t>(buf_ + 1, folly::Endian::little(length));
int written = socket_->write(buf_, off_);
if (written != off_) {
WTLOG(ERROR) << "unable to write error checkpoints";
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
} else {
threadStats_.addHeaderBytes(off_);
pendingCheckpointIndex_ = checkpointIndex_ + newCheckpoints_.size();
numRead_ = off_ = 0;
return READ_NEXT_CMD;
}
}
ReceiverState ReceiverThread::sendAbortCmd() {
WTLOG(INFO) << "entered SEND_ABORT_CMD state";
int64_t offset = 0;
buf_[offset++] = Protocol::ABORT_CMD;
Protocol::encodeAbort(buf_, offset, bufSize_, threadProtocolVersion_,
threadStats_.getLocalErrorCode(),
threadStats_.getNumFiles());
socket_->write(buf_, offset);
// No need to check if we were successful in sending ABORT
// This thread will simply disconnect and sender thread on the
// other side will timeout
socket_->closeConnection();
threadStats_.addHeaderBytes(offset);
if (threadStats_.getLocalErrorCode() == VERSION_MISMATCH) {
// Receiver should try again expecting sender to have changed its version
return ACCEPT_WITH_TIMEOUT;
}
return FINISH_WITH_ERROR;
}
ReceiverState ReceiverThread::sendDoneCmd() {
WTVLOG(1) << "entered SEND_DONE_CMD state";
buf_[0] = Protocol::DONE_CMD;
if (socket_->write(buf_, 1) != 1) {
WTPLOG(ERROR) << "unable to send DONE " << threadIndex_;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(1);
auto read = socket_->read(buf_, 1);
if (read != 1 || buf_[0] != Protocol::DONE_CMD) {
WTLOG(ERROR) << "did not receive ack for DONE";
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
ErrorCode code = socket_->expectEndOfStream();
if (code != OK) {
WTLOG(ERROR) << "error while processing logical end of stream "
<< errorCodeToStr(code);
threadStats_.setLocalErrorCode(code);
return ACCEPT_WITH_TIMEOUT;
}
markReceivedBlocksVerified();
threadStats_.setLocalErrorCode(socket_->closeConnection());
WTLOG(INFO) << "got ack for DONE and logical eof. Transfer finished";
return END;
}
ReceiverState ReceiverThread::finishWithError() {
WTLOG(INFO) << "entered FINISH_WITH_ERROR state";
// should only be in this state if there is some error
WDT_CHECK(threadStats_.getLocalErrorCode() != OK);
// close the socket, so that sender receives an error during connect
// When we are doing a single session, close the listening socket as soon
// as we are done
if (wdtParent_->isJoinable_) {
socket_->closeAllNoCheck();
} else {
socket_->closeNoCheck();
}
auto cv = controller_->getCondition(WAIT_FOR_FINISH_OR_CHECKPOINT_CV);
auto guard = cv->acquire();
wdtParent_->addCheckpoint(checkpoint_);
controller_->markState(threadIndex_, FINISHED);
guard.notifyOne();
return END;
}
ReceiverState ReceiverThread::checkForFinishOrNewCheckpoints() {
auto checkpoints = wdtParent_->getNewCheckpoints(checkpointIndex_);
if (!checkpoints.empty()) {
newCheckpoints_ = std::move(checkpoints);
controller_->markState(threadIndex_, RUNNING);
return SEND_GLOBAL_CHECKPOINTS;
}
bool existActiveThreads = controller_->hasThreads(threadIndex_, RUNNING);
if (!existActiveThreads) {
controller_->markState(threadIndex_, FINISHED);
return SEND_DONE_CMD;
}
return WAIT_FOR_FINISH_OR_NEW_CHECKPOINT;
}
ReceiverState ReceiverThread::waitForFinishOrNewCheckpoint() {
WTLOG(INFO) << "entered WAIT_FOR_FINISH_OR_NEW_CHECKPOINT state";
// should only be called if the are no errors
WDT_CHECK(threadStats_.getLocalErrorCode() == OK);
auto cv = controller_->getCondition(WAIT_FOR_FINISH_OR_CHECKPOINT_CV);
int timeoutMillis = senderReadTimeout_ / kWaitTimeoutFactor;
controller_->markState(threadIndex_, WAITING);
while (true) {
WDT_CHECK(senderReadTimeout_ > 0); // must have received settings
{
auto guard = cv->acquire();
auto state = checkForFinishOrNewCheckpoints();
if (state != WAIT_FOR_FINISH_OR_NEW_CHECKPOINT) {
guard.notifyOne();
return state;
}
{
PerfStatCollector statCollector(*threadCtx_,
PerfStatReport::RECEIVER_WAIT_SLEEP);
guard.wait(timeoutMillis, *threadCtx_);
}
state = checkForFinishOrNewCheckpoints();
if (state != WAIT_FOR_FINISH_OR_NEW_CHECKPOINT) {
guard.notifyOne();
return state;
}
}
// send WAIT cmd to keep sender thread alive
buf_[0] = Protocol::WAIT_CMD;
if (socket_->write(buf_, 1) != 1) {
WTPLOG(ERROR) << "unable to write WAIT";
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
controller_->markState(threadIndex_, RUNNING);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(1);
}
}
void ReceiverThread::start() {
if (buf_ == nullptr) {
WTLOG(ERROR) << "Unable to allocate buffer";
threadStats_.setLocalErrorCode(MEMORY_ALLOCATION_ERROR);
return;
}
ReceiverState state = LISTEN;
while (true) {
ErrorCode abortCode = wdtParent_->getCurAbortCode();
if (abortCode != OK) {
WTLOG(ERROR) << "Transfer aborted " << socket_->getPort() << " "
<< errorCodeToStr(abortCode);
threadStats_.setLocalErrorCode(ABORT);
break;
}
if (state == END) {
break;
}
state = (this->*stateMap_[state])();
}
controller_->deRegisterThread(threadIndex_);
controller_->executeAtEnd([&]() { wdtParent_->endCurGlobalSession(); });
WDT_CHECK(socket_.get());
threadStats_.setEncryptionType(socket_->getEncryptionType());
WTLOG(INFO) << threadStats_;
}
int32_t ReceiverThread::getPort() const {
return socket_->getPort();
}
ErrorCode ReceiverThread::init() {
const EncryptionParams &encryptionData =
wdtParent_->transferRequest_.encryptionData;
Func tagVerificationSuccessCallback = [this] {
this->markReceivedBlocksVerified();
};
if (wdtParent_->socketCreator_) {
socket_ = wdtParent_->socketCreator_->makeServerSocket(
*threadCtx_, port_, wdtParent_->backlog_, encryptionData,
wdtParent_->transferRequest_.ivChangeInterval,
std::move(tagVerificationSuccessCallback),
wdtParent_->transferRequest_.tls);
} else {
socket_ = std::make_unique<ServerSocket>(
*threadCtx_, port_, wdtParent_->backlog_, encryptionData,
wdtParent_->transferRequest_.ivChangeInterval,
std::move(tagVerificationSuccessCallback));
}
if (!socket_) {
return ERROR;
}
int max_retries = options_.max_retries;
for (int retries = 0; retries < max_retries; retries++) {
if (socket_->listen() == OK) {
break;
}
}
if (socket_->listen() != OK) {
WTLOG(ERROR) << "Couldn't listen on port " << socket_->getPort();
return ERROR;
}
checkpoint_.port = socket_->getPort();
WTLOG(INFO) << "Listening on port " << socket_->getPort();
return OK;
}
void ReceiverThread::reset() {
numRead_ = off_ = 0;
checkpointIndex_ = pendingCheckpointIndex_ = 0;
senderReadTimeout_ = senderWriteTimeout_ = -1;
curConnectionVerified_ = false;
threadStats_.reset();
checkpoints_.clear();
newCheckpoints_.clear();
checkpoint_ = Checkpoint(socket_->getPort());
}
ReceiverThread::~ReceiverThread() {
}
} // namespace wdt
} // namespace facebook