in cppcache/src/TcrEndpoint.cpp [844:1041]
GfErrType TcrEndpoint::sendRequestWithRetry(
const TcrMessage& request, TcrMessageReply& reply, TcrConnection*& conn,
bool& epFailure, std::string& failReason, int maxSendRetries,
bool useEPPool, std::chrono::microseconds requestedTimeout,
bool isBgThread) {
GfErrType error = GF_NOTCON;
bool createNewConn = false;
// int32_t type = request.getMessageType();
int sendRetryCount = 0;
// Retry on the following send errors:
// Timeout: 1 retry
// EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry
// Connection pool is empty (too many threads or no connections available): 1
// retry
do {
if (sendRetryCount > 0) {
// this is a retry. set the retry bit in the early Ack
(const_cast<TcrMessage&>(request)).updateHeaderForRetry();
}
auto timeout = requestedTimeout;
epFailure = false;
if (useEPPool) {
if (m_maxConnections == 0 && !m_connCreatedWhenMaxConnsIsZero) {
std::lock_guard<decltype(m_connectionLock)> guard(m_connectionLock);
if (m_maxConnections == 0 && !m_connCreatedWhenMaxConnsIsZero) {
LOGFINE(
"Creating a new connection when connection-pool-size system "
"property set to 0");
if ((error = createNewConnection(conn, false, false,
m_cacheImpl->getDistributedSystem()
.getSystemProperties()
.connectTimeout())) !=
GF_NOERR) {
epFailure = true;
continue;
}
m_connCreatedWhenMaxConnsIsZero = true;
}
}
}
LOGDEBUG("TcrEndpoint::send() getting a connection for endpoint %s",
m_name.c_str());
if (createNewConn) {
createNewConn = false;
if (!connected_) {
return GF_NOTCON;
} else if ((error =
createNewConnection(conn, false, false,
m_cacheImpl->getDistributedSystem()
.getSystemProperties()
.connectTimeout(),
0, true)) != GF_NOERR) {
epFailure = true;
continue;
}
} else if (conn == nullptr && useEPPool) {
LOGFINER(
"sendRequestWithRetry:: looking for connection in queue timeout = "
"%s",
to_string(timeout).c_str());
// max wait time to get a connection
conn = m_opConnections.getUntil(timeout);
}
if (!connected_) {
return GF_NOTCON;
}
if (conn != nullptr) {
LOGDEBUG("TcrEndpoint::send() obtained a connection for endpoint %s",
m_name.c_str());
int reqTransId = request.getTransId();
try {
LOGDEBUG("Calling sendRequestConn");
error = sendRequestConn(request, reply, conn, failReason);
if (error == GF_IOERR) {
epFailure = true;
failReason = "received INVALID reply from server";
if (!handleIOException(failReason, conn, isBgThread)) {
break;
}
createNewConn = true;
} else if (error == GF_NOTCON) {
epFailure = true;
createNewConn = true;
} else {
if (useEPPool) {
m_opConnections.put(conn, false);
}
return GF_NOERR;
}
} catch (const TimeoutException&) {
error = GF_TIMEOUT;
LOGFINE(
"Send timed out for endpoint %s. "
"Message txid = %d",
m_name.c_str(), reqTransId);
closeFailedConnection(conn);
/*
if ( !(m_poolHADM && m_poolHADM->getThreadLocalConnections()) ){ //close
connection only when not a sticky connection.
closeConnection( conn );
}*/
std::this_thread::sleep_for(std::chrono::milliseconds(10));
int32_t type = request.getMessageType();
epFailure = (type != TcrMessage::QUERY && type != TcrMessage::PUTALL &&
type != TcrMessage::PUT_ALL_WITH_CALLBACK &&
type != TcrMessage::EXECUTE_FUNCTION &&
type != TcrMessage::EXECUTE_REGION_FUNCTION &&
type != TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP &&
type != TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE);
// epFailure = true;
failReason = "timed out waiting for endpoint";
createNewConn = true;
} catch (const GeodeIOException& ex) {
error = GF_IOERR;
epFailure = true;
failReason = "IO error for endpoint";
if (!handleIOException(ex.what(), conn,
isBgThread)) { // change here
break;
}
createNewConn = true;
} catch (const Exception& ex) {
failReason = ex.getName();
failReason.append(": ");
failReason.append(ex.what());
LOGWARN("Error during send for endpoint %s due to %s", m_name.c_str(),
failReason.c_str());
if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
conn)) {
LOGWARN("Stack trace: %s", ex.getStackTrace().c_str());
error = GF_MSG;
if (useEPPool) {
m_opConnections.put(conn, false);
} else {
// we are here its better to close the connection as
// "compareTransactionIds"
// will not close the connection
closeConnection(conn);
}
break;
} else {
error = GF_NOTCON;
epFailure = true;
createNewConn = true;
}
} catch (...) {
failReason = "unexpected exception";
LOGERROR(
"Unexpected exception while sending request to "
"endpoint %s",
m_name.c_str());
if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
conn)) {
error = GF_MSG;
if (useEPPool) {
m_opConnections.put(conn, false);
} else {
// we are here its better to close the connection as
// "compareTransactionIds"
// will not close the connection
closeConnection(conn);
}
break;
} else {
error = GF_NOTCON;
epFailure = true;
createNewConn = true;
}
}
} else {
if (useEPPool) {
epFailure = true;
failReason = "server connection could not be obtained";
if (timeout <= std::chrono::microseconds::zero()) {
error = GF_TIMEOUT;
LOGWARN(
"No connection available for %ld seconds "
"for endpoint %s.",
requestedTimeout.count(), m_name.c_str());
} else {
error = GF_NOTCON;
LOGFINE(
"Returning without connection with %s seconds remaining "
"for endpoint %s.",
std::to_string(timeout.count()).c_str(), m_name.c_str());
}
} else {
LOGERROR("Unexpected failure while sending request to server.");
}
}
} while (++sendRetryCount <= maxSendRetries);
return error;
} // namespace client