in SenderThread.cpp [625:683]
ErrorCode SenderThread::readNextReceiverCmd() {
int numUnackedBytes = socket_->getUnackedBytes();
int timeToClearSendBuffer = 0;
Clock::time_point startTime = Clock::now();
while (true) {
int numRead = socket_->read(buf_, 1);
if (numRead == 1) {
return OK;
}
if (getThreadAbortCode() != OK) {
return ABORT;
}
if (numRead == 0) {
WTPLOG(ERROR) << "Got unexpected EOF, reconnecting";
return SOCKET_READ_ERROR;
}
WDT_CHECK_LT(numRead, 0);
ErrorCode errCode = socket_->getReadErrCode();
WTLOG(ERROR) << "Failed to read receiver cmd " << numRead << " "
<< errorCodeToStr(errCode);
if (errCode != WDT_TIMEOUT) {
// not timed out
return SOCKET_READ_ERROR;
}
int curUnackedBytes = socket_->getUnackedBytes();
if (numUnackedBytes < 0 || curUnackedBytes < 0) {
WTLOG(ERROR) << "Failed to read number of unacked bytes, reconnecting";
return SOCKET_READ_ERROR;
}
WDT_CHECK_GE(numUnackedBytes, curUnackedBytes);
if (curUnackedBytes == 0) {
timeToClearSendBuffer = durationMillis(Clock::now() - startTime);
break;
}
if (curUnackedBytes == numUnackedBytes) {
WTLOG(ERROR) << "Number of unacked bytes did not change, reconnecting "
<< curUnackedBytes;
return SOCKET_READ_ERROR;
}
WTLOG(INFO) << "Read receiver command failed, but number of unacked "
"bytes decreased, retrying socket read "
<< numUnackedBytes << " " << curUnackedBytes;
numUnackedBytes = curUnackedBytes;
}
// we are assuming that sender and receiver tcp buffer sizes are same. So, we
// expect another timeToClearSendBuffer milliseconds for receiver to clear its
// buffer
int readTimeout = timeToClearSendBuffer + options_.drain_extra_ms;
WTLOG(INFO) << "Send buffer cleared in " << timeToClearSendBuffer
<< "ms, waiting for " << readTimeout
<< "ms for receiver buffer to clear";
// readWithTimeout internally checks for abort periodically
int numRead = socket_->readWithTimeout(buf_, 1, readTimeout);
if (numRead != 1) {
WTLOG(ERROR) << "Failed to read receiver cmd " << numRead;
return SOCKET_READ_ERROR;
}
return OK;
}