GfErrType TcrEndpoint::sendRequestWithRetry()

in cppcache/src/TcrEndpoint.cpp [844:1041]


GfErrType TcrEndpoint::sendRequestWithRetry(
    const TcrMessage& request, TcrMessageReply& reply, TcrConnection*& conn,
    bool& epFailure, std::string& failReason, int maxSendRetries,
    bool useEPPool, std::chrono::microseconds requestedTimeout,
    bool isBgThread) {
  GfErrType error = GF_NOTCON;
  bool createNewConn = false;
  // int32_t type = request.getMessageType();
  int sendRetryCount = 0;

  //  Retry on the following send errors:
  // Timeout: 1 retry
  // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry
  // Connection pool is empty (too many threads or no connections available): 1
  // retry

  do {
    if (sendRetryCount > 0) {
      // this is a retry. set the retry bit in the early Ack
      (const_cast<TcrMessage&>(request)).updateHeaderForRetry();
    }

    auto timeout = requestedTimeout;
    epFailure = false;
    if (useEPPool) {
      if (m_maxConnections == 0 && !m_connCreatedWhenMaxConnsIsZero) {
        std::lock_guard<decltype(m_connectionLock)> guard(m_connectionLock);
        if (m_maxConnections == 0 && !m_connCreatedWhenMaxConnsIsZero) {
          LOGFINE(
              "Creating a new connection when connection-pool-size system "
              "property set to 0");
          if ((error = createNewConnection(conn, false, false,
                                           m_cacheImpl->getDistributedSystem()
                                               .getSystemProperties()
                                               .connectTimeout())) !=
              GF_NOERR) {
            epFailure = true;
            continue;
          }
          m_connCreatedWhenMaxConnsIsZero = true;
        }
      }
    }
    LOGDEBUG("TcrEndpoint::send() getting a connection for endpoint %s",
             m_name.c_str());
    if (createNewConn) {
      createNewConn = false;
      if (!connected_) {
        return GF_NOTCON;
      } else if ((error =
                      createNewConnection(conn, false, false,
                                          m_cacheImpl->getDistributedSystem()
                                              .getSystemProperties()
                                              .connectTimeout(),
                                          0, true)) != GF_NOERR) {
        epFailure = true;
        continue;
      }
    } else if (conn == nullptr && useEPPool) {
      LOGFINER(
          "sendRequestWithRetry:: looking for connection in queue timeout = "
          "%s",
          to_string(timeout).c_str());
      // max wait time to get a connection
      conn = m_opConnections.getUntil(timeout);
    }
    if (!connected_) {
      return GF_NOTCON;
    }
    if (conn != nullptr) {
      LOGDEBUG("TcrEndpoint::send() obtained a connection for endpoint %s",
               m_name.c_str());
      int reqTransId = request.getTransId();

      try {
        LOGDEBUG("Calling sendRequestConn");
        error = sendRequestConn(request, reply, conn, failReason);
        if (error == GF_IOERR) {
          epFailure = true;
          failReason = "received INVALID reply from server";
          if (!handleIOException(failReason, conn, isBgThread)) {
            break;
          }
          createNewConn = true;
        } else if (error == GF_NOTCON) {
          epFailure = true;
          createNewConn = true;
        } else {
          if (useEPPool) {
            m_opConnections.put(conn, false);
          }
          return GF_NOERR;
        }
      } catch (const TimeoutException&) {
        error = GF_TIMEOUT;
        LOGFINE(
            "Send timed out for endpoint %s. "
            "Message txid = %d",
            m_name.c_str(), reqTransId);
        closeFailedConnection(conn);
        /*
        if ( !(m_poolHADM && m_poolHADM->getThreadLocalConnections()) ){ //close
        connection only when not a sticky connection.
          closeConnection( conn );
        }*/
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        int32_t type = request.getMessageType();
        epFailure = (type != TcrMessage::QUERY && type != TcrMessage::PUTALL &&
                     type != TcrMessage::PUT_ALL_WITH_CALLBACK &&
                     type != TcrMessage::EXECUTE_FUNCTION &&
                     type != TcrMessage::EXECUTE_REGION_FUNCTION &&
                     type != TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP &&
                     type != TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE);

        // epFailure = true;
        failReason = "timed out waiting for endpoint";
        createNewConn = true;
      } catch (const GeodeIOException& ex) {
        error = GF_IOERR;
        epFailure = true;
        failReason = "IO error for endpoint";
        if (!handleIOException(ex.what(), conn,
                               isBgThread)) {  // change here
          break;
        }
        createNewConn = true;
      } catch (const Exception& ex) {
        failReason = ex.getName();
        failReason.append(": ");
        failReason.append(ex.what());
        LOGWARN("Error during send for endpoint %s due to %s", m_name.c_str(),
                failReason.c_str());
        if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
                                  conn)) {
          LOGWARN("Stack trace: %s", ex.getStackTrace().c_str());
          error = GF_MSG;
          if (useEPPool) {
            m_opConnections.put(conn, false);
          } else {
            // we are here its better to close the connection as
            // "compareTransactionIds"
            // will not close the connection
            closeConnection(conn);
          }
          break;
        } else {
          error = GF_NOTCON;
          epFailure = true;
          createNewConn = true;
        }
      } catch (...) {
        failReason = "unexpected exception";
        LOGERROR(
            "Unexpected exception while sending request to "
            "endpoint %s",
            m_name.c_str());
        if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
                                  conn)) {
          error = GF_MSG;
          if (useEPPool) {
            m_opConnections.put(conn, false);
          } else {
            // we are here its better to close the connection as
            // "compareTransactionIds"
            // will not close the connection
            closeConnection(conn);
          }
          break;
        } else {
          error = GF_NOTCON;
          epFailure = true;
          createNewConn = true;
        }
      }
    } else {
      if (useEPPool) {
        epFailure = true;
        failReason = "server connection could not be obtained";
        if (timeout <= std::chrono::microseconds::zero()) {
          error = GF_TIMEOUT;
          LOGWARN(
              "No connection available for %ld seconds "
              "for endpoint %s.",
              requestedTimeout.count(), m_name.c_str());
        } else {
          error = GF_NOTCON;
          LOGFINE(
              "Returning without connection with %s seconds remaining "
              "for endpoint %s.",
              std::to_string(timeout.count()).c_str(), m_name.c_str());
        }
      } else {
        LOGERROR("Unexpected failure while sending request to server.");
      }
    }
  } while (++sendRetryCount <= maxSendRetries);
  return error;
}  // namespace client