in cppcache/src/TcrEndpoint.cpp [745:837]
GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
TcrMessageReply& reply,
TcrConnection* conn,
std::string& failReason) {
int32_t type = request.getMessageType();
GfErrType error = GF_NOERR;
LOGFINER("Sending request type %d to endpoint [%s] via connection [%p]", type,
m_name.c_str(), conn);
// TcrMessage * req = const_cast<TcrMessage *>(&request);
LOGDEBUG("TcrEndpoint::sendRequestConn = %p", m_baseDM);
if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn);
if (type == TcrMessage::REGISTER_INTEREST_LIST ||
type == TcrMessage::REGISTER_INTEREST || type == TcrMessage::QUERY ||
type == TcrMessage::QUERY_WITH_PARAMETERS ||
type == TcrMessage::GET_ALL_70 ||
type == TcrMessage::GET_ALL_WITH_CALLBACK || type == TcrMessage::PUTALL ||
type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
type == TcrMessage::REMOVE_ALL ||
((type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
(request.hasResult() & 2)) ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_MSG_TYPE ||
type == TcrMessage::STOPCQ_MSG_TYPE ||
type == TcrMessage::CLOSECQ_MSG_TYPE || type == TcrMessage::KEY_SET ||
type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
type == TcrMessage::GETCQSTATS_MSG_TYPE ||
type == TcrMessage::MONITORCQ_MSG_TYPE ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
type == TcrMessage::GETDURABLECQS_MSG_TYPE) {
conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
request.getTimeout(),
reply.getTimeout());
LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE");
} else {
// Chk request type to request if so request.getCallBackArg flag & setCall
// back arg flag to true, and in response chk for this flag.
if (request.getMessageType() == TcrMessage::REQUEST) {
if (request.isCallBackArguement()) {
reply.setCallBackArguement(true);
}
}
size_t dataLen;
auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(),
&dataLen, request.getTimeout(),
reply.getTimeout(), request.getMessageType());
reply.setMessageTypeRequest(type);
reply.setData(
data, static_cast<int32_t>(dataLen), getDistributedMemberID(),
*(m_cacheImpl->getSerializationRegistry()),
*(m_cacheImpl
->getMemberListForVersionStamp())); // memory is released by
// TcrMessage setData().
}
// reset idle timeout of the connection for pool connection manager
if (type != TcrMessage::PING) {
conn->touch();
}
if (reply.getMessageType() == TcrMessage::INVALID) {
if (type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
ChunkedFunctionExecutionResponse* resultCollector =
dynamic_cast<ChunkedFunctionExecutionResponse*>(
reply.getChunkedResultHandler());
if (resultCollector->getResult() == false) {
LOGDEBUG("TcrEndpoint::send: function execution, no response desired");
// m_opConnections.put( conn, false );
// return GF_NOERR;
error = GF_NOERR;
}
} else {
// Treat INVALID messages like IO exceptions
error = GF_IOERR;
}
}
// do we need to consider case where compareTransactionIds return true?
// I think we will not have issue here
else if (!compareTransactionIds(request.getTransId(), reply.getTransId(),
failReason, conn)) {
error = GF_NOTCON;
}
if (error == GF_NOERR) {
if (m_baseDM) {
m_baseDM->afterSendingRequest(request, reply, conn);
}
}
return error;
}