in SenderThread.cpp [869:942]
SenderState SenderThread::processVersionMismatch() {
WTLOG(INFO) << "entered PROCESS_VERSION_MISMATCH state ";
WDT_CHECK(threadStats_.getLocalErrorCode() == ABORT);
auto negotiationStatus = wdtParent_->getNegotiationStatus();
WDT_CHECK_NE(negotiationStatus, V_MISMATCH_FAILED)
<< "Thread should have ended in case of version mismatch";
if (negotiationStatus == V_MISMATCH_RESOLVED) {
WTLOG(WARNING) << "Protocol version already negotiated, but "
"transfer still aborted due to version mismatch";
return END;
}
WDT_CHECK_EQ(negotiationStatus, V_MISMATCH_WAIT);
// Need a barrier here to make sure all the negotiated protocol versions
// have been collected
auto barrier = controller_->getBarrier(VERSION_MISMATCH_BARRIER);
barrier->execute();
WTVLOG(1) << "cleared the protocol version barrier";
auto execFunnel = controller_->getFunnel(VERSION_MISMATCH_FUNNEL);
while (true) {
auto status = execFunnel->getStatus();
switch (status) {
case FUNNEL_START: {
WTLOG(INFO) << "started the funnel for version mismatch";
wdtParent_->setProtoNegotiationStatus(V_MISMATCH_FAILED);
if (transferHistoryController_->handleVersionMismatch() != OK) {
execFunnel->notifySuccess();
return END;
}
int negotiatedProtocol = 0;
for (int threadProtocolVersion : wdtParent_->getNegotiatedProtocols()) {
if (threadProtocolVersion > 0) {
if (negotiatedProtocol > 0 &&
negotiatedProtocol != threadProtocolVersion) {
WTLOG(ERROR)
<< "Different threads negotiated different protocols "
<< negotiatedProtocol << " " << threadProtocolVersion;
execFunnel->notifySuccess();
return END;
}
negotiatedProtocol = threadProtocolVersion;
}
}
WDT_CHECK_GT(negotiatedProtocol, 0);
WLOG_IF(INFO, negotiatedProtocol != threadProtocolVersion_)
<< *this << " Changing protocol version to " << negotiatedProtocol
<< ", previous version " << threadProtocolVersion_;
wdtParent_->setProtocolVersion(negotiatedProtocol);
threadProtocolVersion_ = wdtParent_->getProtocolVersion();
setFooterType();
threadStats_.setRemoteErrorCode(OK);
wdtParent_->setProtoNegotiationStatus(V_MISMATCH_RESOLVED);
wdtParent_->clearAbort();
execFunnel->notifySuccess();
return CONNECT;
}
case FUNNEL_PROGRESS: {
execFunnel->wait();
break;
}
case FUNNEL_END: {
negotiationStatus = wdtParent_->getNegotiationStatus();
WDT_CHECK_NE(negotiationStatus, V_MISMATCH_WAIT);
if (negotiationStatus == V_MISMATCH_FAILED) {
return END;
}
if (negotiationStatus == V_MISMATCH_RESOLVED) {
threadProtocolVersion_ = wdtParent_->getProtocolVersion();
threadStats_.setRemoteErrorCode(OK);
return CONNECT;
}
}
}
}
}