ReceiverState ReceiverThread::sendFileChunks()

in ReceiverThread.cpp [738:832]


ReceiverState ReceiverThread::sendFileChunks() {
  WTLOG(INFO) << "entered SEND_FILE_CHUNKS state";
  WDT_CHECK(senderReadTimeout_ > 0);  // must have received settings
  int waitingTimeMillis = senderReadTimeout_ / kWaitTimeoutFactor;
  auto execFunnel = controller_->getFunnel(SEND_FILE_CHUNKS_FUNNEL);
  while (true) {
    auto status = execFunnel->getStatus();
    switch (status) {
      case FUNNEL_END: {
        buf_[0] = Protocol::ACK_CMD;
        int toWrite = 1;
        int written = socket_->write(buf_, toWrite);
        if (written != toWrite) {
          WTLOG(ERROR) << "socket write error " << toWrite << " " << written;
          threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
          return ACCEPT_WITH_TIMEOUT;
        }
        threadStats_.addHeaderBytes(toWrite);
        return READ_NEXT_CMD;
      }
      case FUNNEL_PROGRESS: {
        buf_[0] = Protocol::WAIT_CMD;
        int toWrite = 1;
        int written = socket_->write(buf_, toWrite);
        if (written != toWrite) {
          WTLOG(ERROR) << "socket write error " << toWrite << " " << written;
          threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
          return ACCEPT_WITH_TIMEOUT;
        }
        threadStats_.addHeaderBytes(toWrite);
        execFunnel->wait(waitingTimeMillis, *threadCtx_);
        break;
      }
      case FUNNEL_START: {
        int64_t off = 0;
        buf_[off++] = Protocol::CHUNKS_CMD;
        const auto &fileChunksInfo = wdtParent_->getFileChunksInfo();
        const int64_t numParsedChunksInfo = fileChunksInfo.size();
        Protocol::encodeChunksCmd(buf_, off, /* size of buf_ */ bufSize_,
                                  /* param to send */ bufSize_,
                                  numParsedChunksInfo);
        int written = socket_->write(buf_, off);
        if (written > 0) {
          threadStats_.addHeaderBytes(written);
        }
        if (written != off) {
          WTLOG(ERROR) << "socket write err " << off << " " << written;
          threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
          execFunnel->notifyFail();
          return ACCEPT_WITH_TIMEOUT;
        }
        int64_t numEntriesWritten = 0;
        // we try to encode as many chunks as possible in the buffer. If a
        // single
        // chunk can not fit in the buffer, it is ignored. Format of encoding :
        // <data-size><chunk1><chunk2>...
        while (numEntriesWritten < numParsedChunksInfo) {
          off = sizeof(int32_t);
          int64_t numEntriesEncoded = Protocol::encodeFileChunksInfoList(
              buf_, off, bufSize_, numEntriesWritten, fileChunksInfo);
          int32_t dataSize = folly::Endian::little(off - sizeof(int32_t));
          folly::storeUnaligned<int32_t>(buf_, dataSize);
          written = socket_->write(buf_, off);
          if (written > 0) {
            threadStats_.addHeaderBytes(written);
          }
          if (written != off) {
            break;
          }
          numEntriesWritten += numEntriesEncoded;
        }
        if (numEntriesWritten != numParsedChunksInfo) {
          WTLOG(ERROR) << "Could not write all the file chunks "
                       << numParsedChunksInfo << " " << numEntriesWritten;
          threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
          execFunnel->notifyFail();
          return ACCEPT_WITH_TIMEOUT;
        }
        // try to read ack
        int64_t toRead = 1;
        int64_t numRead = socket_->read(buf_, toRead);
        if (numRead != toRead) {
          WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
          threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
          execFunnel->notifyFail();
          return ACCEPT_WITH_TIMEOUT;
        }
        wdtParent_->addTransferLogHeader(isBlockMode_,
                                         /* sender resuming */ true);
        execFunnel->notifySuccess();
        return READ_NEXT_CMD;
      }
    }
  }
}