in cppcache/src/TcrMessage.cpp [915:1110]
void TcrMessage::processChunk(const std::vector<uint8_t>& chunk, int32_t len,
uint16_t endpointmemId,
const uint8_t isLastChunkAndisSecurityHeader) {
// TODO: see if security header is there
LOGDEBUG(
"TcrMessage::processChunk isLastChunkAndisSecurityHeader = %d chunklen "
"= "
"%d m_msgType = %d",
isLastChunkAndisSecurityHeader, len, m_msgType);
this->m_isLastChunkAndisSecurityHeader = isLastChunkAndisSecurityHeader;
handleSpecialFECase();
if (m_tcdm == nullptr) {
throw FatalInternalException("TcrMessage::processChunk: null DM!");
}
switch (m_msgType) {
case TcrMessage::REPLY: {
LOGDEBUG("processChunk - got reply for request %d", m_msgTypeRequest);
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
break;
}
case TcrMessage::RESPONSE: {
if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::GETCQSTATS_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::MONITORCQ_MSG_TYPE) {
LOGDEBUG("processChunk - got CQ response for request %d",
m_msgTypeRequest);
// TODO: do we need to do anything here
break;
} else if (m_msgTypeRequest == TcrMessage::PUTALL ||
m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) {
TcrChunkedContext* chunkedContext = new TcrChunkedContext(
chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
m_tcdm->getConnectionManager().getCacheImpl());
m_chunkedResult->setEndpointMemId(endpointmemId);
m_tcdm->queueChunk(chunkedContext);
if (chunk.empty()) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
auto ex = m_chunkedResult->getException();
if (ex != nullptr) {
throw *ex;
}
}
break;
}
// fall-through for other cases
if (m_chunkedResult != nullptr) {
LOGDEBUG("tcrmessage in case22 ");
TcrChunkedContext* chunkedContext = new TcrChunkedContext(
chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
m_tcdm->getConnectionManager().getCacheImpl());
m_chunkedResult->setEndpointMemId(endpointmemId);
m_tcdm->queueChunk(chunkedContext);
if (chunk.empty()) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
// Throw any exception during processing here.
// Do not throw it immediately since we want to read the
// full data from socket in any case.
// Notice that TcrChunkedContext::handleChunk stops any
// further processing as soon as an exception is encountered.
// This can cause behaviour like partially filled cache in case
// of populating cache with registerAllKeys(), so that should be
// documented since rolling that back may not be a good idea either.
if (const auto& ex = m_chunkedResult->getException()) {
throw Exception(*ex);
}
}
} else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
if (!chunk.empty()) {
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
}
}
break;
}
case TcrMessage::EXECUTE_REGION_FUNCTION_RESULT:
case TcrMessage::EXECUTE_FUNCTION_RESULT:
case TcrMessage::CQDATAERROR_MSG_TYPE: // one part
case TcrMessage::CQ_EXCEPTION_TYPE: // one part
case TcrMessage::RESPONSE_FROM_PRIMARY: {
if (m_chunkedResult != nullptr) {
LOGDEBUG("tcrmessage in case22 ");
TcrChunkedContext* chunkedContext = new TcrChunkedContext(
chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
m_tcdm->getConnectionManager().getCacheImpl());
m_chunkedResult->setEndpointMemId(endpointmemId);
m_tcdm->queueChunk(chunkedContext);
if (chunk.empty()) {
// last chunk -- wait for processing of all the chunks to complete
m_chunkedResult->waitFinalize();
// Throw any exception during processing here.
// Do not throw it immediately since we want to read the
// full data from socket in any case.
// Notice that TcrChunkedContext::handleChunk stops any
// further processing as soon as an exception is encountered.
// This can cause behaviour like partially filled cache in case
// of populating cache with registerAllKeys(), so that should be
// documented since rolling that back may not be a good idea either.
if (const auto& ex = m_chunkedResult->getException()) {
throw Exception(*ex);
}
}
} else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
if (!chunk.empty()) {
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
}
}
break;
}
case TcrMessage::REGISTER_INTEREST_DATA_ERROR: // for register interest
// error
case EXECUTE_FUNCTION_ERROR:
case EXECUTE_REGION_FUNCTION_ERROR: {
if (!chunk.empty()) {
// DeleteArray<const uint8_t> delChunk(bytes);
// DataInput input(bytes, len);
// TODO: this not send two part...
// looks like this is our exception so only one part will come
// readExceptionPart(input, false);
// readSecureObjectPart(input, false, true,
// isLastChunkAndisSecurityHeader );
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
}
break;
}
case TcrMessage::EXCEPTION: {
if (!chunk.empty()) {
auto input =
m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
chunk.data(), len);
readExceptionPart(input, isLastChunkAndisSecurityHeader);
readSecureObjectPart(input, false, true,
isLastChunkAndisSecurityHeader);
}
break;
}
case TcrMessage::RESPONSE_FROM_SECONDARY: {
// TODO: how many parts
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
if (chunk.size()) {
LOGFINEST("processChunk - got response from secondary, ignoring.");
}
break;
}
case TcrMessage::PUT_DATA_ERROR: {
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
if (!chunk.empty()) {
auto input =
m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
chunk.data(), len);
auto errorString = readStringPart(input);
if (!errorString.empty()) {
errorString.erase(
errorString.begin(),
std::find_if(errorString.begin(), errorString.end(),
std::not1(std::ptr_fun<int, int>(std::isspace))));
LOGDEBUG(
"TcrMessage::%s: setting thread-local ex msg to \"%s\", %s, %d",
__FUNCTION__, errorString.c_str(), __FILE__, __LINE__);
setThreadLocalExceptionMessage(errorString.c_str());
}
}
break;
}
case TcrMessage::GET_ALL_DATA_ERROR: {
chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
break;
}
default: {
// TODO: how many parts what should we do here
if (chunk.empty()) {
LOGWARN(
"Got unhandled message type %d while processing response, "
"possible "
"serialization mismatch",
m_msgType);
throw MessageException(
"TcrMessage::processChunk: "
"got unhandled message type");
}
break;
}
}
}