bool TcrConnection::initTcrConnection()

in cppcache/src/TcrConnection.cpp [109:476]


bool TcrConnection::initTcrConnection(
    std::shared_ptr<TcrEndpoint> endpointObj,
    synchronized_set<std::unordered_set<uint16_t>>& ports,
    bool isClientNotification, bool isSecondary,
    std::chrono::microseconds connectTimeout) {
  endpointObj_ = endpointObj;
  poolDM_ = dynamic_cast<ThinClientPoolDM*>(endpointObj_->getPoolHADM());
  hasServerQueue_ = NON_REDUNDANT_SERVER;
  queueSize_ = 0;
  lastAccessed_ = creationTime_ = std::chrono::steady_clock::now();
  connectionId = INITIAL_CONNECTION_ID;
  auto cacheImpl = poolDM_->getConnectionManager().getCacheImpl();
  const auto& distributedSystem = cacheImpl->getDistributedSystem();
  const auto& sysProp = distributedSystem.getSystemProperties();

  bool isPool = false;
  isBeingUsed_ = false;
  // Precondition:
  // 1. isSecondary ==> isClientNotification

  // Create TcpConn object which manages a socket connection with the endpoint.
  if (endpointObj && endpointObj->getPoolHADM()) {
    createConnection(endpointObj_->name().c_str(), connectTimeout,
                     static_cast<int32_t>(
                         endpointObj->getPoolHADM()->getSocketBufferSize()));
    isPool = true;
  } else {
    createConnection(endpointObj_->name().c_str(), connectTimeout,
                     sysProp.maxSocketBufferSize());
  }

  auto handShakeMsg = cacheImpl->createDataOutput();
  bool isNotificationChannel = false;
  // Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100;
  // Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101;
  if (isClientNotification) {
    isNotificationChannel = true;
    if (isSecondary) {
      handShakeMsg.write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT));
    } else {
      handShakeMsg.write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT));
    }
  } else {
    handShakeMsg.write(static_cast<int8_t>(CLIENT_TO_SERVER));
  }

  Version::write(handShakeMsg, Version::current());
  LOGFINE("Client version ordinal is %d", Version::current().getOrdinal());

  handShakeMsg.write(static_cast<int8_t>(REPLY_OK));

  // Send byte REPLY_OK = (byte)58;
  if (!isClientNotification) {
    port_ = conn_->getPort();
    ports.insert(port_);
  } else {
    auto&& lock = ports.make_lock();
    handShakeMsg.writeInt(static_cast<int32_t>(ports.size()));
    for (const auto& port : ports) {
      handShakeMsg.writeInt(static_cast<int32_t>(port));
    }
  }

  //  Writing handshake readtimeout value for CSVER_51+.
  if (!isClientNotification) {
    // SW: The timeout has been artificially raised to the highest
    // permissible value for bug #232 for now.
    //  minus 10 sec because the GFE 5.7 gridDev branch adds a
    // 5 sec buffer which was causing an int overflow.
    handShakeMsg.writeInt(static_cast<int32_t>(0x7fffffff) - 10000);
  }

  // Write header for byte FixedID since GFE 5.7
  handShakeMsg.write(static_cast<int8_t>(DSCode::FixedIDByte));
  // Writing byte for ClientProxyMembershipID class id=38 as registered on the
  // java server.
  handShakeMsg.write(static_cast<int8_t>(DSFid::ClientProxyMembershipId));
  if (endpointObj->getPoolHADM()) {
    ClientProxyMembershipID* memId =
        endpointObj->getPoolHADM()->getMembershipId();
    const auto& dsMemberId = memId->getDSMemberId();
    handShakeMsg.writeBytes(
        reinterpret_cast<const uint8_t*>(dsMemberId.c_str()),
        static_cast<int32_t>(dsMemberId.size()));
  } else {
    // Add 3 durable Subcription properties to ClientProxyMembershipID
    auto&& durableId = sysProp.durableClientId();
    auto&& durableTimeOut = sysProp.durableTimeout();

    // Write ClientProxyMembershipID serialized object.
    auto memId = cacheImpl->getClientProxyMembershipIDFactory().create(
        durableId.c_str(), durableTimeOut);
    const auto& dsMemberId = memId->getDSMemberId();
    handShakeMsg.writeBytes(
        reinterpret_cast<const uint8_t*>(dsMemberId.c_str()),
        static_cast<int32_t>(dsMemberId.size()));
  }
  handShakeMsg.writeInt(static_cast<int32_t>(1));

  bool requireServerAuth = false;
  std::shared_ptr<Properties> credentials;
  std::shared_ptr<CacheableBytes> serverChallenge;

  // Write overrides (just conflation for now)
  handShakeMsg.write(getOverrides(&sysProp));

  bool tmpIsSecurityOn = nullptr != cacheImpl->getAuthInitialize();

  if (endpointObj_) {
    tmpIsSecurityOn = tmpIsSecurityOn || endpointObj_->isMultiUserMode();
  }

  LOGDEBUG(
      "TcrConnection tmpIsSecurityOn = %d isNotificationChannel = "
      "%d ",
      tmpIsSecurityOn, isNotificationChannel);
  bool doIneedToSendCreds = true;
  if (isNotificationChannel && endpointObj_ &&
      this->endpointObj_->isMultiUserMode()) {
    tmpIsSecurityOn = false;
    doIneedToSendCreds = false;
  }

  if (isNotificationChannel && !doIneedToSendCreds) {
    handShakeMsg.write(
        static_cast<uint8_t>(SECURITY_MULTIUSER_NOTIFICATIONCHANNEL));
  } else if (tmpIsSecurityOn) {
    handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL));
  } else {
    handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE));
  }

  if (tmpIsSecurityOn) {
    try {
      LOGFINER("TcrConnection: about to invoke authloader");
      const auto& tmpSecurityProperties = sysProp.getSecurityProperties();
      if (tmpSecurityProperties == nullptr) {
        LOGWARN("TcrConnection: security properties not found.");
      }
      // only for backward connection
      if (isClientNotification) {
        if (const auto& authInitialize = cacheImpl->getAuthInitialize()) {
          LOGFINER(
              "TcrConnection: acquired handle to authLoader, "
              "invoking getCredentials");

          const auto& tmpAuthIniSecurityProperties =
              authInitialize->getCredentials(tmpSecurityProperties,
                                             endpointObj_->name().c_str());
          LOGFINER("TcrConnection: after getCredentials ");
          credentials = tmpAuthIniSecurityProperties;
        }
      }
      if (isClientNotification) {
        credentials->toData(handShakeMsg);
      }
    } catch (const AuthenticationRequiredException&) {
      LOGDEBUG("AuthenticationRequiredException got");
      throw;
    } catch (const AuthenticationFailedException&) {
      LOGDEBUG("AuthenticationFailedException got");
      throw;
    } catch (const Exception& ex) {
      LOGWARN("TcrConnection: failed to acquire handle to authLoader: [%s] %s",
              ex.getName().c_str(), ex.what());
      auto message =
          std::string("TcrConnection: failed to load authInit library: ") +
          ex.what();
      throwException(AuthenticationFailedException(message));
    }
  }

  size_t msgLength;
  auto data = reinterpret_cast<char*>(
      const_cast<uint8_t*>(handShakeMsg.getBuffer(&msgLength)));
  LOGFINE("Attempting handshake with endpoint %s for %s%s connection",
          endpointObj->name().c_str(),
          isClientNotification ? (isSecondary ? "secondary " : "primary ") : "",
          isClientNotification ? "subscription" : "client");
  LOGDEBUG("%s(%p): Handshake bytes: (%d): %s", __GNFN__, this, msgLength,
           Utils::convertBytesToString(data, msgLength).c_str());

  ConnErrType error = sendData(data, msgLength, connectTimeout);

  if (error == CONN_NOERR) {
    std::vector<int8_t> recdBytes;

    auto acceptanceCode = readHandshakeData(1, connectTimeout);
    recdBytes.push_back(acceptanceCode[0]);

    if (acceptanceCode[0] == REPLY_SSL_ENABLED && !sysProp.sslEnabled()) {
      LOGERROR("SSL is enabled on server, enable SSL in client as well");
      AuthenticationRequiredException ex(
          "SSL is enabled on server, enable SSL in client as well");
      conn_.reset();
      throwException(ex);
    }

    auto serverQueueStatus = readHandshakeData(1, connectTimeout);
    recdBytes.push_back(serverQueueStatus[0]);

    //  TESTING: Durable clients - set server queue status.
    // 0 - Non-Redundant , 1- Redundant , 2- Primary
    if (serverQueueStatus[0] == 1) {
      hasServerQueue_ = REDUNDANT_SERVER;
    } else if (serverQueueStatus[0] == 2) {
      hasServerQueue_ = PRIMARY_SERVER;
    } else {
      hasServerQueue_ = NON_REDUNDANT_SERVER;
    }
    auto queueSizeMsg = readHandshakeData(4, connectTimeout);
    recdBytes.insert(std::end(recdBytes), std::begin(queueSizeMsg),
                     std::end(queueSizeMsg));
    queueSize_ = static_cast<int32_t>(queueSizeMsg[0]) << 24 |
                 static_cast<int32_t>(queueSizeMsg[1]) << 16 |
                 static_cast<int32_t>(queueSizeMsg[2]) << 8 |
                 static_cast<int32_t>(queueSizeMsg[3]);
    queueSize_ = queueSize_ > 0 ? queueSize_ : 0;

    endpointObj_->setServerQueueStatus(hasServerQueue_, queueSize_);

    ////////////////////////// Set Pool Specific Q Size when
    ///////////////////////////////////
    ////////////////////////// 1. ServerQStatus is Primary or
    ///////////////////////////////////
    ////////////////////////// 2. ServerQStatus is Non-Redundant but
    ///////////////////////////////////
    ////////////////////////// 3. ONLY when handshake is for subscription
    ///////////////////////////////////
    if ((hasServerQueue_ == PRIMARY_SERVER ||
         hasServerQueue_ == NON_REDUNDANT_SERVER) &&
        isClientNotification) {
      poolDM_->setPrimaryServerQueueSize(queueSize_);
    }

    if (!isClientNotification) {
      // Read the DistributedMember object
      auto recvMsgLen = readHandshakeArraySize(connectTimeout);
      recdBytes.push_back((recvMsgLen & 0xff000000) >> 24);
      recdBytes.push_back((recvMsgLen & 0x00ff0000) >> 16);
      recdBytes.push_back((recvMsgLen & 0x0000ff00) >> 8);
      recdBytes.push_back(recvMsgLen & 0x000000ff);

      auto recvMessage = readHandshakeData(recvMsgLen, connectTimeout);
      recdBytes.insert(std::end(recdBytes), std::begin(recvMessage),
                       std::end(recvMessage));

      // If the distributed member has not been set yet, set it.
      if (getEndpointObject()->getDistributedMemberID() == 0) {
        LOGDEBUG("Deserializing distributed member Id");
        auto dataInputForClient = cacheImpl->createDataInput(
            reinterpret_cast<const uint8_t*>(recvMessage.data()),
            recvMessage.size());
        auto member = std::dynamic_pointer_cast<ClientProxyMembershipID>(
            dataInputForClient.readObject());
        auto memId = cacheImpl->getMemberListForVersionStamp()->add(member);
        getEndpointObject()->setDistributedMemberID(memId);
        LOGDEBUG("Deserialized distributed member Id %d", memId);
      }
    }

    auto recvMsgLenBytes = readHandshakeData(2, connectTimeout);
    uint16_t recvMsgLen2 = static_cast<int16_t>(recvMsgLenBytes[0]) << 8 |
                           static_cast<int16_t>(recvMsgLenBytes[1]);
    recdBytes.insert(std::end(recdBytes), std::begin(recvMsgLenBytes),
                     std::end(recvMsgLenBytes));

    auto recvMessage = readHandshakeData(recvMsgLen2, connectTimeout);
    recdBytes.insert(std::end(recdBytes), std::begin(recvMessage),
                     std::end(recvMessage));

    if (!isClientNotification) {
      auto deltaEnabledMsg = readHandshakeData(1, connectTimeout);
      recdBytes.push_back(deltaEnabledMsg[0]);
      ThinClientBaseDM::setDeltaEnabledOnServer(deltaEnabledMsg[0] ? true
                                                                   : false);
    }

    LOGDEBUG(
        "%s(%p): isClientNotification=%s, Handshake response bytes: (%d) %s",
        __GNFN__, this, isClientNotification ? "true" : "false",
        recdBytes.size(),
        Utils::convertBytesToString(recdBytes.data(), recdBytes.size())
            .c_str());

    switch (acceptanceCode[0]) {
      case REPLY_OK:
      case SUCCESSFUL_SERVER_TO_CLIENT:
        LOGFINER("Handshake reply: %u,%u,%u", acceptanceCode[0],
                 serverQueueStatus[0], recvMsgLen2);
        if (isClientNotification) {
          readHandshakeInstantiatorMsg(connectTimeout);
        }
        break;
      case REPLY_AUTHENTICATION_FAILED: {
        AuthenticationFailedException ex(
            reinterpret_cast<char*>(recvMessage.data()));
        conn_.reset();
        throwException(ex);
      }
      case REPLY_AUTHENTICATION_REQUIRED: {
        AuthenticationRequiredException ex(
            reinterpret_cast<char*>(recvMessage.data()));
        conn_.reset();
        throwException(ex);
      }
      case REPLY_DUPLICATE_DURABLE_CLIENT: {
        DuplicateDurableClientException ex(
            reinterpret_cast<char*>(recvMessage.data()));
        conn_.reset();
        throwException(ex);
      }
      case REPLY_REFUSED:
      case REPLY_INVALID:
      case UNSUCCESSFUL_SERVER_TO_CLIENT: {
        LOGERROR("Handshake rejected by server[%s]: %s",
                 endpointObj_->name().c_str(),
                 reinterpret_cast<char*>(recvMessage.data()));
        auto message = std::string("TcrConnection::TcrConnection: ") +
                       "Handshake rejected by server: " +
                       reinterpret_cast<char*>(recvMessage.data());
        CacheServerException ex(message);
        conn_.reset();
        throw ex;
      }
      default: {
        LOGERROR(
            "Unknown error[%d] received from server [%s] in handshake: "
            "%s",
            acceptanceCode[0], endpointObj_->name().c_str(),
            recvMessage.data());
        auto message =
            std::string("TcrConnection::TcrConnection: Unknown error") +
            " received from server in handshake: " +
            reinterpret_cast<char*>(recvMessage.data());
        MessageException ex(message);
        conn_.reset();
        throw ex;
      }
    }

  } else {
    conn_.reset();
    if (error & CONN_TIMEOUT) {
      throw TimeoutException(
          "TcrConnection::TcrConnection: "
          "connection timed out during handshake");
    } else {
      throw GeodeIOException(
          "TcrConnection::TcrConnection: "
          "Handshake failure");
    }
  }

  // TODO: we can authenticate endpoint here if pool is not in multiuser mode.
  // for backward connection we send credentials to server in handshake itself.
  // for forward connection we need to send credentail to server
  //---if pool in not in multiuser node
  //---or old endpoint case.

  if (this->endpointObj_ && !isNotificationChannel && tmpIsSecurityOn &&
      (!isPool || !this->endpointObj_->isMultiUserMode())) {
    // this->endpointObj_->authenticateEndpoint(this);
    return true;
  }

  return false;
}