SenderState SenderThread::readLocalCheckPoint()

in SenderThread.cpp [151:206]


SenderState SenderThread::readLocalCheckPoint() {
  WTLOG(INFO) << "entered READ_LOCAL_CHECKPOINT state";
  ThreadTransferHistory &transferHistory = getTransferHistory();
  std::vector<Checkpoint> checkpoints;
  int64_t decodeOffset = 0;
  int checkpointLen =
      Protocol::getMaxLocalCheckpointLength(threadProtocolVersion_);
  int64_t numRead = socket_->read(buf_, checkpointLen);
  if (numRead != checkpointLen) {
    WTLOG(ERROR) << "read mismatch during reading local checkpoint "
                 << checkpointLen << " " << numRead << " port " << port_;
    threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
    numReconnectWithoutProgress_++;
    return CONNECT;
  }
  bool isValidCheckpoint = true;
  if (!Protocol::decodeCheckpoints(threadProtocolVersion_, buf_, decodeOffset,
                                   checkpointLen, checkpoints)) {
    WTLOG(ERROR) << "checkpoint decode failure "
                 << folly::humanify(std::string(buf_, numRead));
    isValidCheckpoint = false;
  } else if (checkpoints.size() != 1) {
    WTLOG(ERROR) << "Illegal local checkpoint, unexpected num checkpoints "
                 << checkpoints.size() << " "
                 << folly::humanify(std::string(buf_, numRead));
    isValidCheckpoint = false;
  } else if (checkpoints[0].port != port_) {
    WTLOG(ERROR) << "illegal checkpoint, checkpoint " << checkpoints[0]
                 << " doesn't match the port " << port_;
    isValidCheckpoint = false;
  }
  if (!isValidCheckpoint) {
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return END;
  }
  const Checkpoint &checkpoint = checkpoints[0];
  auto numBlocks = checkpoint.numBlocks;
  WTVLOG(1) << "received local checkpoint " << checkpoint;

  if (numBlocks == -1) {
    // Receiver failed while sending DONE cmd
    return READ_RECEIVER_CMD;
  }

  ErrorCode errCode = transferHistory.setLocalCheckpoint(checkpoint);
  if (errCode == INVALID_CHECKPOINT) {
    threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
    return END;
  }
  if (errCode == NO_PROGRESS) {
    ++numReconnectWithoutProgress_;
  } else {
    numReconnectWithoutProgress_ = 0;
  }
  return SEND_SETTINGS;
}