ReceiverState ReceiverThread::processFileCmd()

in ReceiverThread.cpp [390:667]


ReceiverState ReceiverThread::processFileCmd() {
  WTVLOG(1) << "entered PROCESS_FILE_CMD state";
  // following block needs to be executed for the first file cmd. There is no
  // harm in executing it more than once. number of blocks equal to 0 is a good
  // approximation for first file cmd. Did not want to introduce another boolean
  if (options_.enable_download_resumption && threadStats_.getNumBlocks() == 0) {
    auto sendChunksFunnel = controller_->getFunnel(SEND_FILE_CHUNKS_FUNNEL);
    auto state = sendChunksFunnel->getStatus();
    if (state == FUNNEL_START) {
      // sender is not in resumption mode
      wdtParent_->addTransferLogHeader(isBlockMode_,
                                       /* sender not resuming */ false);
      sendChunksFunnel->notifySuccess();
    }
  }
  checkpoint_.resetLastBlockDetails();
  BlockDetails blockDetails;
  auto guard = folly::makeGuard([&] {
    if (threadStats_.getLocalErrorCode() != OK) {
      threadStats_.incrFailedAttempts();
    }
  });

  ErrorCode transferStatus = (ErrorCode)buf_[off_++];
  if (transferStatus != OK) {
    // TODO: use this status information to implement fail fast mode
    WTVLOG(1) << "sender entered into error state "
              << errorCodeToStr(transferStatus);
  }
  int16_t headerLen = folly::loadUnaligned<int16_t>(buf_ + off_);
  headerLen = folly::Endian::little(headerLen);
  if (headerLen <= 0) {
    WTLOG(ERROR) << "Header length must be positive " << headerLen;
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return FINISH_WITH_ERROR;
  }

  WVLOG(2) << "Processing FILE_CMD, header len " << headerLen;

  sendHeartBeat();

  if (headerLen > numRead_) {
    int64_t end = oldOffset_ + numRead_;
    numRead_ =
        readAtLeast(*socket_, buf_ + end, bufSize_ - end, headerLen, numRead_);
  }
  if (numRead_ < headerLen) {
    WTLOG(ERROR) << "Unable to read full header " << headerLen << " "
                 << numRead_;
    threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
    return ACCEPT_WITH_TIMEOUT;
  }
  off_ += sizeof(int16_t);
  bool success = Protocol::decodeHeader(threadProtocolVersion_, buf_, off_,
                                        numRead_ + oldOffset_, blockDetails);
  int64_t headerBytes = off_ - oldOffset_;
  // transferred header length must match decoded header length
  if (headerLen != headerBytes) {
    WTLOG(ERROR) << "Decoded header length: " << headerBytes
                 << ", transferred header length: " << headerBytes
                 << ", they should be equal.";
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return FINISH_WITH_ERROR;
  }

  threadStats_.addHeaderBytes(headerBytes);
  threadStats_.addEffectiveBytes(headerBytes, 0);
  if (!success) {
    WTLOG(ERROR) << "Error decoding at"
                 << " ooff:" << oldOffset_ << " off_: " << off_
                 << " numRead_: " << numRead_;
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return FINISH_WITH_ERROR;
  }
  if (blockDetails.allocationStatus == TO_BE_DELETED &&
      (blockDetails.fileSize != 0 || blockDetails.dataSize != 0)) {
    WTLOG(ERROR) << "Invalid file header, file to be deleted, but "
                    "file-size/block-size not zero "
                 << blockDetails.fileName << " file-size "
                 << blockDetails.fileSize << " block-size "
                 << blockDetails.dataSize;
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return FINISH_WITH_ERROR;
  }

  // received a well formed file cmd, apply the pending checkpoint update
  checkpointIndex_ = pendingCheckpointIndex_;
  WTVLOG(1) << "Read id:" << blockDetails.fileName
            << " size:" << blockDetails.dataSize << " ooff:" << oldOffset_
            << " off_: " << off_ << " numRead_: " << numRead_;
  auto &fileCreator = wdtParent_->getFileCreator();
  FileWriter writer(*threadCtx_, &blockDetails, fileCreator.get());
  const auto encryptionType = socket_->getEncryptionType();
  auto writtenGuard = folly::makeGuard([&] {
    if (!encryptionTypeToTagLen(encryptionType) && footerType_ == NO_FOOTER) {
      // if encryption doesn't have tag verification and checksum verification
      // is disabled, we can consider bytes received before connection break as
      // valid
      checkpoint_.setLastBlockDetails(blockDetails.seqId, blockDetails.offset,
                                      writer.getTotalWritten());
      threadStats_.addEffectiveBytes(headerBytes, writer.getTotalWritten());
    }
  });

  sendHeartBeat();

  // writer.open() deletes files if status == TO_BE_DELETED
  // therefore if !(!delete_extra_files && status == TO_BE_DELETED)
  // we should skip writer.open() call altogether
  if (options_.delete_extra_files ||
      blockDetails.allocationStatus != TO_BE_DELETED) {
    if (writer.open() != OK) {
      threadStats_.setLocalErrorCode(FILE_WRITE_ERROR);
      return SEND_ABORT_CMD;
    }
  }

  int32_t checksum = 0;
  int64_t remainingData = numRead_ + oldOffset_ - off_;
  int64_t toWrite = remainingData;
  WDT_CHECK(remainingData >= 0);
  if (remainingData >= blockDetails.dataSize) {
    toWrite = blockDetails.dataSize;
  }
  threadStats_.addDataBytes(toWrite);
  if (footerType_ == CHECKSUM_FOOTER) {
    checksum = folly::crc32c((const uint8_t *)(buf_ + off_), toWrite, checksum);
  }
  auto throttler = wdtParent_->getThrottler();
  if (throttler) {
    // We might be reading more than we require for this file but
    // throttling should make sense for any additional bytes received
    // on the network
    throttler->limit(*threadCtx_, toWrite + headerBytes);
  }

  sendHeartBeat();

  ErrorCode code = ERROR;
  if (toWrite > 0) {
    code = writer.write(buf_ + off_, toWrite);
    if (code != OK) {
      threadStats_.setLocalErrorCode(code);
      return SEND_ABORT_CMD;
    }
  }
  off_ += toWrite;
  remainingData -= toWrite;
  // also means no leftOver so it's ok we use buf_ from start
  while (writer.getTotalWritten() < blockDetails.dataSize) {
    if (wdtParent_->getCurAbortCode() != OK) {
      WTLOG(ERROR) << "Thread marked for abort while processing "
                   << blockDetails.fileName << " " << blockDetails.seqId
                   << " port : " << socket_->getPort();
      threadStats_.setLocalErrorCode(ABORT);
      return FINISH_WITH_ERROR;
    }

    sendHeartBeat();

    int64_t nres = readAtMost(*socket_, buf_, bufSize_,
                              blockDetails.dataSize - writer.getTotalWritten());
    if (nres <= 0) {
      break;
    }
    if (throttler) {
      // We only know how much we have read after we are done calling
      // readAtMost. Call throttler with the bytes read off_ the wire.
      throttler->limit(*threadCtx_, nres);
    }
    threadStats_.addDataBytes(nres);
    if (footerType_ == CHECKSUM_FOOTER) {
      checksum = folly::crc32c((const uint8_t *)buf_, nres, checksum);
    }

    sendHeartBeat();

    code = writer.write(buf_, nres);
    if (code != OK) {
      WTLOG(ERROR) << "failed to write to " << blockDetails.fileName;
      threadStats_.setLocalErrorCode(code);
      return SEND_ABORT_CMD;
    }
  }

  // Sync the writer to disk and close it. We need to check for error code each
  // time, otherwise we would move forward with corrupted files.
  const ErrorCode syncCode = writer.sync();
  if (syncCode != OK) {
    WTLOG(ERROR) << "could not sync " << blockDetails.fileName << " to disk";
    threadStats_.setLocalErrorCode(syncCode);
    return SEND_ABORT_CMD;
  }
  const ErrorCode closeCode = writer.close();
  if (closeCode != OK) {
    WTLOG(ERROR) << "could not close " << blockDetails.fileName;
    threadStats_.setLocalErrorCode(closeCode);
    return SEND_ABORT_CMD;
  }

  if (writer.getTotalWritten() != blockDetails.dataSize) {
    // This can only happen if there are transmission errors
    // Write errors to disk are already taken care of above
    WTLOG(ERROR) << "could not read entire content for "
                 << blockDetails.fileName << " port " << socket_->getPort();
    threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
    return ACCEPT_WITH_TIMEOUT;
  }
  writtenGuard.dismiss();
  WVLOG(2) << "completed " << blockDetails.fileName << " off: " << off_
           << " numRead: " << numRead_;
  // Transfer of the file is complete here, mark the bytes effective
  WDT_CHECK(remainingData >= 0) << "Negative remainingData " << remainingData;
  if (remainingData > 0) {
    // if we need to read more anyway, let's move the data
    numRead_ = remainingData;
    if ((remainingData < Protocol::kMaxHeader) && (off_ > (bufSize_ / 2))) {
      // rare so inefficient is ok
      WVLOG(3) << "copying extra " << remainingData << " leftover bytes @ "
               << off_;
      memmove(/* dst      */ buf_,
              /* from     */ buf_ + off_,
              /* how much */ remainingData);
      off_ = 0;
    } else {
      // otherwise just continue from the offset
      WVLOG(3) << "Using remaining extra " << remainingData
               << " leftover bytes starting @ " << off_;
    }
  } else {
    numRead_ = off_ = 0;
  }
  if (footerType_ == CHECKSUM_FOOTER) {
    sendHeartBeat();
    // have to read footer cmd
    oldOffset_ = off_;
    numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
                           Protocol::kMinBufLength, numRead_);
    if (numRead_ < Protocol::kMinBufLength) {
      WTLOG(ERROR) << "socket read failure " << Protocol::kMinBufLength << " "
                   << numRead_;
      threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
      return ACCEPT_WITH_TIMEOUT;
    }
    Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[off_++];
    if (cmd != Protocol::FOOTER_CMD) {
      WTLOG(ERROR) << "Expecting footer cmd, but received " << cmd;
      threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
      return FINISH_WITH_ERROR;
    }
    int32_t receivedChecksum;
    bool ok = Protocol::decodeFooter(
        buf_, off_, oldOffset_ + Protocol::kMaxFooter, receivedChecksum);
    if (!ok) {
      WTLOG(ERROR) << "Unable to decode footer cmd";
      threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
      return FINISH_WITH_ERROR;
    }
    if (checksum != receivedChecksum) {
      WTLOG(ERROR) << "Checksum mismatch " << checksum << " "
                   << receivedChecksum << " port " << socket_->getPort()
                   << " file " << blockDetails.fileName;
      threadStats_.setLocalErrorCode(CHECKSUM_MISMATCH);
      return ACCEPT_WITH_TIMEOUT;
    }
    markBlockVerified(blockDetails);
    int64_t msgLen = off_ - oldOffset_;
    numRead_ -= msgLen;
  } else {
    WDT_CHECK(footerType_ == NO_FOOTER);
    if (encryptionTypeToTagLen(encryptionType)) {
      blocksWaitingVerification_.emplace_back(blockDetails);
    } else {
      markBlockVerified(blockDetails);
    }
  }
  return READ_NEXT_CMD;
}