SenderState SenderThread::readFileChunks()

in SenderThread.cpp [506:623]


SenderState SenderThread::readFileChunks() {
  WTLOG(INFO) << "entered READ_FILE_CHUNKS state ";
  int64_t numRead = socket_->read(buf_, 1);
  if (numRead != 1) {
    WTLOG(ERROR) << "Socket read error 1 " << numRead;
    threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
    return CHECK_FOR_ABORT;
  }
  threadStats_.addHeaderBytes(numRead);
  Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[0];
  if (cmd == Protocol::ABORT_CMD) {
    return PROCESS_ABORT_CMD;
  }
  if (cmd == Protocol::WAIT_CMD) {
    return READ_FILE_CHUNKS;
  }
  if (cmd == Protocol::ACK_CMD) {
    if (!wdtParent_->isFileChunksReceived()) {
      WTLOG(ERROR) << "Sender has not yet received file chunks, but receiver "
                   << "thinks it has already sent it";
      threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
      return END;
    }
    return SEND_BLOCKS;
  }
  if (cmd == Protocol::LOCAL_CHECKPOINT_CMD) {
    ErrorCode errCode = readAndVerifySpuriousCheckpoint();
    if (errCode == SOCKET_READ_ERROR) {
      return CONNECT;
    }
    if (errCode == PROTOCOL_ERROR) {
      return END;
    }
    WDT_CHECK_EQ(OK, errCode);
    return READ_FILE_CHUNKS;
  }
  if (cmd != Protocol::CHUNKS_CMD) {
    WTLOG(ERROR) << "Unexpected cmd " << cmd;
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return END;
  }
  int64_t toRead = Protocol::kChunksCmdLen;
  numRead = socket_->read(buf_, toRead);
  if (numRead != toRead) {
    WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
    threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
    return CHECK_FOR_ABORT;
  }
  threadStats_.addHeaderBytes(numRead);
  int64_t off = 0;
  int64_t bufSize, numFiles;
  Protocol::decodeChunksCmd(buf_, off, bufSize_, bufSize, numFiles);
  WTLOG(INFO) << "File chunk list has " << numFiles
              << " entries and is broken in buffers of length " << bufSize;
  if (bufSize < 0 || numFiles < 0) {
    WLOG(ERROR) << "Decoded bogus size for file chunks list bufSize = "
                << bufSize << " num files " << numFiles;
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return END;
  }
  std::unique_ptr<char[]> chunkBuffer(new char[bufSize]);
  std::vector<FileChunksInfo> fileChunksInfoList;
  while (true) {
    int64_t numFileChunks = fileChunksInfoList.size();
    if (numFileChunks > numFiles) {
      // We should never be able to read more file chunks than mentioned in the
      // chunks cmd. Chunks cmd has buffer size used to transfer chunks and also
      // number of chunks. This chunks are read and parsed and added to
      // fileChunksInfoList. Number of chunks we decode should match with the
      // number mentioned in the Chunks cmd.
      WTLOG(ERROR) << "Number of file chunks received is more than the number "
                      "mentioned in CHUNKS_CMD "
                   << numFileChunks << " " << numFiles;
      threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
      return END;
    }
    if (numFileChunks == numFiles) {
      break;
    }
    toRead = sizeof(int32_t);
    numRead = socket_->read(buf_, toRead);
    if (numRead != toRead) {
      WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
      threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
      return CHECK_FOR_ABORT;
    }
    toRead = folly::loadUnaligned<int32_t>(buf_);
    toRead = folly::Endian::little(toRead);
    numRead = socket_->read(chunkBuffer.get(), toRead);
    if (numRead != toRead) {
      WTLOG(ERROR) << "Socket read error " << toRead << " " << numRead;
      threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
      return CHECK_FOR_ABORT;
    }
    threadStats_.addHeaderBytes(numRead);
    off = 0;
    // decode function below adds decoded file chunks to fileChunksInfoList
    bool success = Protocol::decodeFileChunksInfoList(
        chunkBuffer.get(), off, toRead, fileChunksInfoList);
    if (!success) {
      WTLOG(ERROR) << "Unable to decode file chunks list";
      threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
      return END;
    }
  }
  wdtParent_->setFileChunksInfo(fileChunksInfoList);
  // send ack for file chunks list
  buf_[0] = Protocol::ACK_CMD;
  int64_t toWrite = 1;
  int64_t written = socket_->write(buf_, toWrite);
  if (toWrite != written) {
    WTLOG(ERROR) << "Socket write error " << toWrite << " " << written;
    threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
    return CHECK_FOR_ABORT;
  }
  threadStats_.addHeaderBytes(written);
  return SEND_BLOCKS;
}