void TcrMessage::processChunk()

in cppcache/src/TcrMessage.cpp [915:1110]


void TcrMessage::processChunk(const std::vector<uint8_t>& chunk, int32_t len,
                              uint16_t endpointmemId,
                              const uint8_t isLastChunkAndisSecurityHeader) {
  // TODO: see if security header is there
  LOGDEBUG(
      "TcrMessage::processChunk isLastChunkAndisSecurityHeader = %d chunklen "
      "= "
      "%d m_msgType = %d",
      isLastChunkAndisSecurityHeader, len, m_msgType);

  this->m_isLastChunkAndisSecurityHeader = isLastChunkAndisSecurityHeader;
  handleSpecialFECase();

  if (m_tcdm == nullptr) {
    throw FatalInternalException("TcrMessage::processChunk: null DM!");
  }

  switch (m_msgType) {
    case TcrMessage::REPLY: {
      LOGDEBUG("processChunk - got reply for request %d", m_msgTypeRequest);
      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
      break;
    }
    case TcrMessage::RESPONSE: {
      if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE ||
          m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE ||
          m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE ||
          m_msgTypeRequest == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
          m_msgTypeRequest == TcrMessage::GETCQSTATS_MSG_TYPE ||
          m_msgTypeRequest == TcrMessage::MONITORCQ_MSG_TYPE) {
        LOGDEBUG("processChunk - got CQ response for request %d",
                 m_msgTypeRequest);
        // TODO: do we need to do anything here
        break;
      } else if (m_msgTypeRequest == TcrMessage::PUTALL ||
                 m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) {
        TcrChunkedContext* chunkedContext = new TcrChunkedContext(
            chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
            m_tcdm->getConnectionManager().getCacheImpl());
        m_chunkedResult->setEndpointMemId(endpointmemId);
        m_tcdm->queueChunk(chunkedContext);
        if (chunk.empty()) {
          // last chunk -- wait for processing of all the chunks to complete
          m_chunkedResult->waitFinalize();
          auto ex = m_chunkedResult->getException();
          if (ex != nullptr) {
            throw *ex;
          }
        }
        break;
      }
      // fall-through for other cases
      if (m_chunkedResult != nullptr) {
        LOGDEBUG("tcrmessage in case22 ");
        TcrChunkedContext* chunkedContext = new TcrChunkedContext(
            chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
            m_tcdm->getConnectionManager().getCacheImpl());
        m_chunkedResult->setEndpointMemId(endpointmemId);
        m_tcdm->queueChunk(chunkedContext);
        if (chunk.empty()) {
          // last chunk -- wait for processing of all the chunks to complete
          m_chunkedResult->waitFinalize();
          //  Throw any exception during processing here.
          // Do not throw it immediately since we want to read the
          // full data from socket in any case.
          // Notice that TcrChunkedContext::handleChunk stops any
          // further processing as soon as an exception is encountered.
          // This can cause behaviour like partially filled cache in case
          // of populating cache with registerAllKeys(), so that should be
          // documented since rolling that back may not be a good idea either.
          if (const auto& ex = m_chunkedResult->getException()) {
            throw Exception(*ex);
          }
        }
      } else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
                 TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
                 TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
        if (!chunk.empty()) {
          chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
        }
      }
      break;
    }
    case TcrMessage::EXECUTE_REGION_FUNCTION_RESULT:
    case TcrMessage::EXECUTE_FUNCTION_RESULT:
    case TcrMessage::CQDATAERROR_MSG_TYPE:  // one part
    case TcrMessage::CQ_EXCEPTION_TYPE:     // one part
    case TcrMessage::RESPONSE_FROM_PRIMARY: {
      if (m_chunkedResult != nullptr) {
        LOGDEBUG("tcrmessage in case22 ");
        TcrChunkedContext* chunkedContext = new TcrChunkedContext(
            chunk, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
            m_tcdm->getConnectionManager().getCacheImpl());
        m_chunkedResult->setEndpointMemId(endpointmemId);
        m_tcdm->queueChunk(chunkedContext);
        if (chunk.empty()) {
          // last chunk -- wait for processing of all the chunks to complete
          m_chunkedResult->waitFinalize();
          //  Throw any exception during processing here.
          // Do not throw it immediately since we want to read the
          // full data from socket in any case.
          // Notice that TcrChunkedContext::handleChunk stops any
          // further processing as soon as an exception is encountered.
          // This can cause behaviour like partially filled cache in case
          // of populating cache with registerAllKeys(), so that should be
          // documented since rolling that back may not be a good idea either.
          if (const auto& ex = m_chunkedResult->getException()) {
            throw Exception(*ex);
          }
        }
      } else if (TcrMessage::CQ_EXCEPTION_TYPE == m_msgType ||
                 TcrMessage::CQDATAERROR_MSG_TYPE == m_msgType ||
                 TcrMessage::GET_ALL_DATA_ERROR == m_msgType) {
        if (!chunk.empty()) {
          chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
        }
      }
      break;
    }
    case TcrMessage::REGISTER_INTEREST_DATA_ERROR:  // for  register interest
                                                    // error
    case EXECUTE_FUNCTION_ERROR:
    case EXECUTE_REGION_FUNCTION_ERROR: {
      if (!chunk.empty()) {
        // DeleteArray<const uint8_t> delChunk(bytes);
        //  DataInput input(bytes, len);
        // TODO: this not send two part...
        // looks like this is our exception so only one part will come
        // readExceptionPart(input, false);
        // readSecureObjectPart(input, false, true,
        // isLastChunkAndisSecurityHeader );
        chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
      }
      break;
    }
    case TcrMessage::EXCEPTION: {
      if (!chunk.empty()) {
        auto input =
            m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
                chunk.data(), len);
        readExceptionPart(input, isLastChunkAndisSecurityHeader);
        readSecureObjectPart(input, false, true,
                             isLastChunkAndisSecurityHeader);
      }
      break;
    }
    case TcrMessage::RESPONSE_FROM_SECONDARY: {
      // TODO: how many parts
      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
      if (chunk.size()) {
        LOGFINEST("processChunk - got response from secondary, ignoring.");
      }
      break;
    }
    case TcrMessage::PUT_DATA_ERROR: {
      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);
      if (!chunk.empty()) {
        auto input =
            m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
                chunk.data(), len);
        auto errorString = readStringPart(input);

        if (!errorString.empty()) {
          errorString.erase(
              errorString.begin(),
              std::find_if(errorString.begin(), errorString.end(),
                           std::not1(std::ptr_fun<int, int>(std::isspace))));
          LOGDEBUG(
              "TcrMessage::%s: setting thread-local ex msg to \"%s\", %s, %d",
              __FUNCTION__, errorString.c_str(), __FILE__, __LINE__);
          setThreadLocalExceptionMessage(errorString.c_str());
        }
      }
      break;
    }
    case TcrMessage::GET_ALL_DATA_ERROR: {
      chunkSecurityHeader(1, chunk, len, isLastChunkAndisSecurityHeader);

      break;
    }
    default: {
      // TODO: how many parts what should we do here
      if (chunk.empty()) {
        LOGWARN(
            "Got unhandled message type %d while processing response, "
            "possible "
            "serialization mismatch",
            m_msgType);
        throw MessageException(
            "TcrMessage::processChunk: "
            "got unhandled message type");
      }
      break;
    }
  }
}