in ReceiverThread.cpp [738:832]
ReceiverState ReceiverThread::sendFileChunks() {
WTLOG(INFO) << "entered SEND_FILE_CHUNKS state";
WDT_CHECK(senderReadTimeout_ > 0); // must have received settings
int waitingTimeMillis = senderReadTimeout_ / kWaitTimeoutFactor;
auto execFunnel = controller_->getFunnel(SEND_FILE_CHUNKS_FUNNEL);
while (true) {
auto status = execFunnel->getStatus();
switch (status) {
case FUNNEL_END: {
buf_[0] = Protocol::ACK_CMD;
int toWrite = 1;
int written = socket_->write(buf_, toWrite);
if (written != toWrite) {
WTLOG(ERROR) << "socket write error " << toWrite << " " << written;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(toWrite);
return READ_NEXT_CMD;
}
case FUNNEL_PROGRESS: {
buf_[0] = Protocol::WAIT_CMD;
int toWrite = 1;
int written = socket_->write(buf_, toWrite);
if (written != toWrite) {
WTLOG(ERROR) << "socket write error " << toWrite << " " << written;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
threadStats_.addHeaderBytes(toWrite);
execFunnel->wait(waitingTimeMillis, *threadCtx_);
break;
}
case FUNNEL_START: {
int64_t off = 0;
buf_[off++] = Protocol::CHUNKS_CMD;
const auto &fileChunksInfo = wdtParent_->getFileChunksInfo();
const int64_t numParsedChunksInfo = fileChunksInfo.size();
Protocol::encodeChunksCmd(buf_, off, /* size of buf_ */ bufSize_,
/* param to send */ bufSize_,
numParsedChunksInfo);
int written = socket_->write(buf_, off);
if (written > 0) {
threadStats_.addHeaderBytes(written);
}
if (written != off) {
WTLOG(ERROR) << "socket write err " << off << " " << written;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
execFunnel->notifyFail();
return ACCEPT_WITH_TIMEOUT;
}
int64_t numEntriesWritten = 0;
// we try to encode as many chunks as possible in the buffer. If a
// single
// chunk can not fit in the buffer, it is ignored. Format of encoding :
// <data-size><chunk1><chunk2>...
while (numEntriesWritten < numParsedChunksInfo) {
off = sizeof(int32_t);
int64_t numEntriesEncoded = Protocol::encodeFileChunksInfoList(
buf_, off, bufSize_, numEntriesWritten, fileChunksInfo);
int32_t dataSize = folly::Endian::little(off - sizeof(int32_t));
folly::storeUnaligned<int32_t>(buf_, dataSize);
written = socket_->write(buf_, off);
if (written > 0) {
threadStats_.addHeaderBytes(written);
}
if (written != off) {
break;
}
numEntriesWritten += numEntriesEncoded;
}
if (numEntriesWritten != numParsedChunksInfo) {
WTLOG(ERROR) << "Could not write all the file chunks "
<< numParsedChunksInfo << " " << numEntriesWritten;
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
execFunnel->notifyFail();
return ACCEPT_WITH_TIMEOUT;
}
// try to read ack
int64_t toRead = 1;
int64_t numRead = socket_->read(buf_, toRead);
if (numRead != toRead) {
WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
execFunnel->notifyFail();
return ACCEPT_WITH_TIMEOUT;
}
wdtParent_->addTransferLogHeader(isBlockMode_,
/* sender resuming */ true);
execFunnel->notifySuccess();
return READ_NEXT_CMD;
}
}
}
}