GfErrType ThinClientPoolDM::sendSyncRequest()

in cppcache/src/ThinClientPoolDM.cpp [1265:1553]


GfErrType ThinClientPoolDM::sendSyncRequest(
    TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
    bool isBGThread,
    const std::shared_ptr<BucketServerLocation>& serverLocation) {
  LOGDEBUG("ThinClientPoolDM::sendSyncRequest: ....%d %s",
           request.getMessageType(), m_poolName.c_str());
  // Increment clientOps
  getStats().setCurClientOps(++m_clientOps);

  GfErrType error = GF_NOTCON;

  std::shared_ptr<UserAttributes> userAttr = nullptr;
  reply.setDM(this);

  int32_t type = request.getMessageType();

  if (!(type == TcrMessage::QUERY ||
        type == TcrMessage::QUERY_WITH_PARAMETERS ||
        type == TcrMessage::PUTALL ||
        type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
        type == TcrMessage::EXECUTE_FUNCTION ||
        type == TcrMessage::EXECUTE_REGION_FUNCTION ||
        type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
        type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE)) {
    // set only when message is not query, putall and executeCQ
    reply.setTimeout(getReadTimeout());
    request.setTimeout(getReadTimeout());
  }

  bool retryAllEPsOnce = false;
  if (m_attrs->getRetryAttempts() == -1) {
    retryAllEPsOnce = true;
  }
  auto retry = m_attrs->getRetryAttempts() + 1;
  TcrConnection* conn = nullptr;
  std::set<ServerLocation> excludeServers;
  type = request.getMessageType();
  bool isAuthRequireExcep = false;
  int isAuthRequireExcepMaxTry = 2;
  bool firstTry = true;
  LOGFINE("sendSyncRequest:: retry = %d", retry);
  while (retryAllEPsOnce || retry-- ||
         (isAuthRequireExcep && isAuthRequireExcepMaxTry >= 0)) {
    isAuthRequireExcep = false;
    if (!firstTry) request.updateHeaderForRetry();
    // if it's a query or putall and we had a timeout, just return with the
    // newly selected endpoint without failover-retry
    if ((type == TcrMessage::QUERY ||
         type == TcrMessage::QUERY_WITH_PARAMETERS ||
         type == TcrMessage::PUTALL ||
         type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
         type == TcrMessage::EXECUTE_FUNCTION ||
         type == TcrMessage::EXECUTE_REGION_FUNCTION ||
         type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
         type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
        error == GF_TIMEOUT) {
      return error;
    }

    GfErrType queueErr = GF_NOERR;
    auto lastExcludeSize = static_cast<uint32_t>(excludeServers.size());
    int8_t version = 0;

    bool isUserNeedToReAuthenticate = false;
    bool singleHopConnFound = false;
    bool connFound = false;
    if (!m_isMultiUserMode || (!TcrMessage::isUserInitiativeOps(request))) {
      conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread,
                                     request, version, singleHopConnFound,
                                     connFound, serverLocation);
    } else {
      userAttr = UserAttributes::threadLocalUserAttributes;
      if (userAttr == nullptr) {
        LOGWARN("Attempted operation type %d without credentials",
                request.getMessageType());
        return GF_NOT_AUTHORIZED_EXCEPTION;
      }
      // Can i assume here that we will always get connection here
      conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread,
                                     request, version, singleHopConnFound,
                                     connFound, serverLocation);

      if (conn != nullptr) {  // need to chk whether user is already
                              // authenticated to this endpoint or not.
        isUserNeedToReAuthenticate =
            !(userAttr->isEndpointAuthenticated(conn->getEndpointObject()));
      }
    }

    if (queueErr == GF_CLIENT_WAIT_TIMEOUT) {
      LOGFINE("Request timeout at client only");
      return GF_CLIENT_WAIT_TIMEOUT;
    } else if (queueErr == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
      // need to refresh meta data
      auto region =
          m_connManager.getCacheImpl()->getRegion(request.getRegionName());

      if (region != nullptr) {
        LOGFINE(
            "Need to refresh pr-meta-data timeout in client only  with "
            "refresh "
            "metadata");
        auto* tcrRegion = dynamic_cast<ThinClientRegion*>(region.get());
        tcrRegion->setMetaDataRefreshed(false);
        m_clientMetadataService->enqueueForMetadataRefresh(
            region->getFullPath(), reply.getserverGroupVersion());
      }
      return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
    }

    LOGDEBUG(
        "ThinClientPoolDM::sendSyncRequest: isUserNeedToReAuthenticate = %d ",
        isUserNeedToReAuthenticate);
    LOGDEBUG(
        "ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d  conn = "
        "%p  "
        "type = %d",
        m_isMultiUserMode, conn, type);

    if (!conn) {
      // lets assume all connection are in use will happen
      if (queueErr == GF_NOERR) {
        queueErr = GF_ALL_CONNECTIONS_IN_USE_EXCEPTION;
        getStats().setCurClientOps(--m_clientOps);
        getStats().incFailedClientOps();
        return queueErr;
      } else if (queueErr == GF_IOERR) {
        error = GF_NOTCON;
      } else {
        error = queueErr;
      }
    }
    if (conn) {
      TcrEndpoint* ep = conn->getEndpointObject();
      LOGDEBUG(
          "ThinClientPoolDM::sendSyncRequest: sendSyncReq "
          "ep->isAuthenticated() = %d ",
          ep->isAuthenticated());
      GfErrType userCredMsgErr = GF_NOERR;
      bool isServerException = false;
      if (TcrMessage::isUserInitiativeOps(request) &&
          (m_isSecurityOn || m_isMultiUserMode)) {
        if (!m_isMultiUserMode && !ep->isAuthenticated()) {
          // first authenticate him on this endpoint
          userCredMsgErr = sendUserCredentials(getCredentials(ep), conn,
                                               isBGThread, isServerException);
        } else if (isUserNeedToReAuthenticate) {
          userCredMsgErr = sendUserCredentials(userAttr->getCredentials(), conn,
                                               isBGThread, isServerException);
        }
      }

      if (userCredMsgErr == GF_NOERR) {
        error = ep->sendRequestConnWithRetry(request, reply, conn);
        error = handleEPError(ep, reply, error);
      } else {
        error = userCredMsgErr;
      }

      if (!isServerException) {
        if (error == GF_NOERR) {
          LOGDEBUG("putting connection back in queue");
          putInQueue(conn,
                     isBGThread ||
                         request.getMessageType() == TcrMessage::GET_ALL_70 ||
                         request.getMessageType() ==
                             TcrMessage::GET_ALL_WITH_CALLBACK ||
                         request.getMessageType() ==
                             TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP,
                     request.forTransaction());  // connFound is only relevant
                                                 // for Sticky conn.
          LOGDEBUG("putting connection back in queue DONE");
        } else {
          if (error != GF_TIMEOUT) removeEPConnections(ep);
          // Update stats for the connection that failed.
          removeEPConnections(1, false);
          setStickyNull(isBGThread ||
                        request.getMessageType() == TcrMessage::GET_ALL_70 ||
                        request.getMessageType() ==
                            TcrMessage::GET_ALL_WITH_CALLBACK ||
                        request.getMessageType() ==
                            TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP);
          if (conn) {
            try {
              GF_SAFE_DELETE_CON(conn);
            } catch (...) {
            }
          }
          excludeServers.insert(ServerLocation(ep->name()));
          removeEPFromMetadataIfError(error, ep);
        }
      } else {
        return error;  // server exception while sending credential message to
      }
      // server...
    }

    if (error == GF_NOERR) {
      if ((m_isSecurityOn || m_isMultiUserMode)) {
        if (reply.getMessageType() == TcrMessage::EXCEPTION) {
          if (isAuthRequireException(reply.getException())) {
            TcrEndpoint* ep = conn->getEndpointObject();
            if (!m_isMultiUserMode) {
              ep->setAuthenticated(false);
            } else if (userAttr != nullptr) {
              userAttr->unAuthenticateEP(ep);
            }
            LOGFINEST(
                "After getting AuthenticationRequiredException trying "
                "again.");
            isAuthRequireExcepMaxTry--;
            isAuthRequireExcep = true;
            continue;
          } else if (isNotAuthorizedException(reply.getException())) {
            LOGDEBUG("received NotAuthorizedException");
            // TODO should we try again?
          }
        }
      }
      LOGFINER(
          "reply Metadata version is %d & bsl version is %d "
          "reply.isFEAnotherHop()=%d",
          reply.getMetaDataVersion(), version, reply.isFEAnotherHop());
      if (m_clientMetadataService && request.forSingleHop() &&
          (reply.getMetaDataVersion() != 0 ||
           (request.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION &&
            request.getKeyRef() != nullptr && reply.isFEAnotherHop()))) {
        // Need to get direct access to Region's name to avoid referencing
        // temp data and causing crashes
        auto region =
            m_connManager.getCacheImpl()->getRegion(request.getRegionName());

        if (region != nullptr) {
          if (!connFound)  // max limit case then don't refresh otherwise
                           // always refresh
          {
            LOGFINE("Need to refresh pr-meta-data");
            auto* tcrRegion = dynamic_cast<ThinClientRegion*>(region.get());
            tcrRegion->setMetaDataRefreshed(false);
          }
          m_clientMetadataService->enqueueForMetadataRefresh(
              region->getFullPath(), reply.getserverGroupVersion());
        }
      }
    }

    if (excludeServers.size() == lastExcludeSize) {
      excludeServers.clear();
      if (retryAllEPsOnce) {
        break;
      }
    }

    if (!attemptFailover || error == GF_NOERR) {
      getStats().setCurClientOps(--m_clientOps);
      if (error == GF_NOERR) {
        getStats().incSucceedClientOps(); /*inc Id for clientOs stat*/
      } else if (error == GF_TIMEOUT) {
        getStats().incTimeoutClientOps();
      } else {
        getStats().incFailedClientOps();
      }
      // Top-level only sees NotConnectedException
      if (error == GF_IOERR) {
        error = GF_NOTCON;
      }
      return error;
    }

    conn = nullptr;
    firstTry = false;
  }  // While

  getStats().setCurClientOps(--m_clientOps);

  if (error == GF_NOERR) {
    getStats().incSucceedClientOps();
  } else if (error == GF_TIMEOUT) {
    getStats().incTimeoutClientOps();
  } else {
    getStats().incFailedClientOps();
  }

  // Top-level only sees NotConnectedException
  if (error == GF_IOERR) {
    error = GF_NOTCON;
  }
  return error;
}