in SenderThread.cpp [151:206]
SenderState SenderThread::readLocalCheckPoint() {
WTLOG(INFO) << "entered READ_LOCAL_CHECKPOINT state";
ThreadTransferHistory &transferHistory = getTransferHistory();
std::vector<Checkpoint> checkpoints;
int64_t decodeOffset = 0;
int checkpointLen =
Protocol::getMaxLocalCheckpointLength(threadProtocolVersion_);
int64_t numRead = socket_->read(buf_, checkpointLen);
if (numRead != checkpointLen) {
WTLOG(ERROR) << "read mismatch during reading local checkpoint "
<< checkpointLen << " " << numRead << " port " << port_;
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
numReconnectWithoutProgress_++;
return CONNECT;
}
bool isValidCheckpoint = true;
if (!Protocol::decodeCheckpoints(threadProtocolVersion_, buf_, decodeOffset,
checkpointLen, checkpoints)) {
WTLOG(ERROR) << "checkpoint decode failure "
<< folly::humanify(std::string(buf_, numRead));
isValidCheckpoint = false;
} else if (checkpoints.size() != 1) {
WTLOG(ERROR) << "Illegal local checkpoint, unexpected num checkpoints "
<< checkpoints.size() << " "
<< folly::humanify(std::string(buf_, numRead));
isValidCheckpoint = false;
} else if (checkpoints[0].port != port_) {
WTLOG(ERROR) << "illegal checkpoint, checkpoint " << checkpoints[0]
<< " doesn't match the port " << port_;
isValidCheckpoint = false;
}
if (!isValidCheckpoint) {
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
const Checkpoint &checkpoint = checkpoints[0];
auto numBlocks = checkpoint.numBlocks;
WTVLOG(1) << "received local checkpoint " << checkpoint;
if (numBlocks == -1) {
// Receiver failed while sending DONE cmd
return READ_RECEIVER_CMD;
}
ErrorCode errCode = transferHistory.setLocalCheckpoint(checkpoint);
if (errCode == INVALID_CHECKPOINT) {
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
if (errCode == NO_PROGRESS) {
++numReconnectWithoutProgress_;
} else {
numReconnectWithoutProgress_ = 0;
}
return SEND_SETTINGS;
}