SenderState SenderThread::processVersionMismatch()

in SenderThread.cpp [869:942]


SenderState SenderThread::processVersionMismatch() {
  WTLOG(INFO) << "entered PROCESS_VERSION_MISMATCH state ";
  WDT_CHECK(threadStats_.getLocalErrorCode() == ABORT);
  auto negotiationStatus = wdtParent_->getNegotiationStatus();
  WDT_CHECK_NE(negotiationStatus, V_MISMATCH_FAILED)
      << "Thread should have ended in case of version mismatch";
  if (negotiationStatus == V_MISMATCH_RESOLVED) {
    WTLOG(WARNING) << "Protocol version already negotiated, but "
                      "transfer still aborted due to version mismatch";
    return END;
  }
  WDT_CHECK_EQ(negotiationStatus, V_MISMATCH_WAIT);
  // Need a barrier here to make sure all the negotiated protocol versions
  // have been collected
  auto barrier = controller_->getBarrier(VERSION_MISMATCH_BARRIER);
  barrier->execute();
  WTVLOG(1) << "cleared the protocol version barrier";
  auto execFunnel = controller_->getFunnel(VERSION_MISMATCH_FUNNEL);
  while (true) {
    auto status = execFunnel->getStatus();
    switch (status) {
      case FUNNEL_START: {
        WTLOG(INFO) << "started the funnel for version mismatch";
        wdtParent_->setProtoNegotiationStatus(V_MISMATCH_FAILED);
        if (transferHistoryController_->handleVersionMismatch() != OK) {
          execFunnel->notifySuccess();
          return END;
        }
        int negotiatedProtocol = 0;
        for (int threadProtocolVersion : wdtParent_->getNegotiatedProtocols()) {
          if (threadProtocolVersion > 0) {
            if (negotiatedProtocol > 0 &&
                negotiatedProtocol != threadProtocolVersion) {
              WTLOG(ERROR)
                  << "Different threads negotiated different protocols "
                  << negotiatedProtocol << " " << threadProtocolVersion;
              execFunnel->notifySuccess();
              return END;
            }
            negotiatedProtocol = threadProtocolVersion;
          }
        }
        WDT_CHECK_GT(negotiatedProtocol, 0);
        WLOG_IF(INFO, negotiatedProtocol != threadProtocolVersion_)
            << *this << " Changing protocol version to " << negotiatedProtocol
            << ", previous version " << threadProtocolVersion_;
        wdtParent_->setProtocolVersion(negotiatedProtocol);
        threadProtocolVersion_ = wdtParent_->getProtocolVersion();
        setFooterType();
        threadStats_.setRemoteErrorCode(OK);
        wdtParent_->setProtoNegotiationStatus(V_MISMATCH_RESOLVED);
        wdtParent_->clearAbort();
        execFunnel->notifySuccess();
        return CONNECT;
      }
      case FUNNEL_PROGRESS: {
        execFunnel->wait();
        break;
      }
      case FUNNEL_END: {
        negotiationStatus = wdtParent_->getNegotiationStatus();
        WDT_CHECK_NE(negotiationStatus, V_MISMATCH_WAIT);
        if (negotiationStatus == V_MISMATCH_FAILED) {
          return END;
        }
        if (negotiationStatus == V_MISMATCH_RESOLVED) {
          threadProtocolVersion_ = wdtParent_->getProtocolVersion();
          threadStats_.setRemoteErrorCode(OK);
          return CONNECT;
        }
      }
    }
  }
}