void TcrMessage::handleByteArrayResponse()

in cppcache/src/TcrMessage.cpp [1132:1605]


void TcrMessage::handleByteArrayResponse(
    const char* bytearray, int32_t len, uint16_t endpointMemId,
    const SerializationRegistry& serializationRegistry,
    MemberListForVersionStamp& memberListForVersionStamp) {
  auto input = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
      reinterpret_cast<uint8_t*>(const_cast<char*>(bytearray)), len, getPool());
  // TODO:: this need to make sure that pool is there
  //  if(m_tcdm == nullptr)
  //  throw IllegalArgumentException("Pool is nullptr in TcrMessage");
  m_msgType = input.readInt32();
  int32_t msglen;
  msglen = input.readInt32();
  int32_t numparts;
  numparts = input.readInt32();
  m_txId = input.readInt32();
  auto earlyack = input.read();
  LOGDEBUG(
      "handleByteArrayResponse m_msgType = %d m_isSecurityOn = %d "
      "requesttype "
      "=%d",
      m_msgType, m_isSecurityOn, m_msgTypeRequest);
  LOGDEBUG(
      "Message type=%d, length=%d, parts=%d, txid=%d and eack %d with data "
      "length=%d",
      m_msgType, msglen, numparts, m_txId, earlyack, len);

  // LOGFINE("Message type=%d, length=%d, parts=%d, txid=%d and eack %d with
  // data length=%d",
  // m_msgType, msglen, numparts, m_txId, earlyack, len);

  switch (m_msgType) {
    case TcrMessage::RESPONSE: {
      if (m_msgTypeRequest == TcrMessage::CONTAINS_KEY) {
        readBooleanPartAsObject(input, &m_boolValue);
      } else if (m_msgTypeRequest == TcrMessage::USER_CREDENTIAL_MESSAGE) {
        readUniqueIDObjectPart(input);
      } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_TYPE ||
                 m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_ENUM) {
        // int will come in response
        uint32_t typeId;
        readIntPart(input, &typeId);
        m_value = CacheableInt32::create(typeId);
      } else if (m_msgTypeRequest == TcrMessage::GET_PDX_TYPE_BY_ID) {
        // PdxType will come in response
        input.advanceCursor(sizeof(int32_t));  // ignore part size
        if (input.readBoolean()) {             // part type. If 1 is an object
          m_value = serializationRegistry.deserialize(
              input, static_cast<int8_t>(DSCode::PdxType));
        }
      } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ENUM_BY_ID) {
        // PdxType will come in response
        input.advanceCursor(5);  // part header
        m_value = serializationRegistry.deserialize(input);
      } else if (m_msgTypeRequest == TcrMessage::GET_FUNCTION_ATTRIBUTES) {
        // read and ignore length
        input.readInt32();
        input.advanceCursor(1);  // ignore byte

        if (!m_functionAttributes) {
          m_functionAttributes = std::make_shared<std::vector<int8_t>>();
        }
        m_functionAttributes->push_back(input.read());
        m_functionAttributes->push_back(input.read());
        m_functionAttributes->push_back(input.read());
      } else if (m_msgTypeRequest == TcrMessage::REQUEST) {
        int32_t receivednumparts = 2;
        readObjectPart(input);
        uint32_t flag = 0;
        readIntPart(input, &flag);
        if (flag & 0x01) {
          readCallbackObjectPart(input);
          receivednumparts++;
        }

        if ((m_value == nullptr) && (flag & 0x08 /*VALUE_IS_INVALID*/)) {
          m_value = CacheableToken::invalid();
        }

        if (flag & 0x02) {
          readVersionTag(input, endpointMemId, memberListForVersionStamp);
          receivednumparts++;
        }

        if (flag & 0x04 /*KEY_NOT_PRESENT*/) {
          m_value = CacheableToken::tombstone();
        }

        if (numparts > receivednumparts) readPrMetaData(input);

      } else if (m_decodeAll) {
        readObjectPart(input);
        if (numparts == 2) {
          if (m_isCallBackArguement) {
            readCallbackObjectPart(input);
          } else {
            int32_t lenObj = input.readInt32();
            input.readBoolean();
            m_metaDataVersion = input.read();
            if (lenObj == 2) {
              m_serverGroupVersion = input.read();
              LOGDEBUG(
                  "Single-hop m_serverGroupVersion in message response is %d",
                  m_serverGroupVersion);
            }
          }
        } else if (numparts > 2) {
          skipParts(input, 1);
          int32_t lenObj = input.readInt32();
          input.readBoolean();
          m_metaDataVersion = input.read();
          LOGFINE("Single-hop metadata version in message response is %d",
                  m_metaDataVersion);
          if (lenObj == 2) {
            m_serverGroupVersion = input.read();
            LOGDEBUG(
                "Single-hop m_serverGroupVersion in message response is %d",
                m_serverGroupVersion);
          }
        }
      }
      break;
    }

    case TcrMessage::EXCEPTION: {
      uint8_t lastChunk = static_cast<uint8_t>(numparts);
      lastChunk = (lastChunk << 5);
      readExceptionPart(input, lastChunk);
      // if (m_isSecurityOn)
      // readSecureObjectPart( input );
      break;
    }

    case TcrMessage::INVALID: {
      // Read the string in the reply
      LOGWARN("Received invalid message type as reply from server");
      readObjectPart(input, true);
      break;
    }

    case TcrMessage::CLIENT_REGISTER_INTEREST:
    case TcrMessage::CLIENT_UNREGISTER_INTEREST:
    case TcrMessage::SERVER_TO_CLIENT_PING:
    case TcrMessage::REGISTER_INSTANTIATORS: {
      // ignore this
      m_shouldIgnore = true;
      break;
    }

    case TcrMessage::REGISTER_INTEREST_DATA_ERROR:
    case TcrMessage::UNREGISTER_INTEREST_DATA_ERROR:
    case TcrMessage::PUT_DATA_ERROR:
    case TcrMessage::KEY_SET_DATA_ERROR:
    case TcrMessage::DESTROY_REGION_DATA_ERROR:
    case TcrMessage::CLEAR_REGION_DATA_ERROR:
    case TcrMessage::CONTAINS_KEY_DATA_ERROR:
    case TcrMessage::PUT_DELTA_ERROR:
    case TcrMessage::REQUEST_DATA_ERROR: {
      m_value = std::make_shared<CacheableString>(readStringPart(input));
      break;
    }

    case TcrMessage::REPLY: {
      switch (m_msgTypeRequest) {
        case TcrMessage::PUT: {
          readPrMetaData(input);
          uint32_t flags = 0;
          readIntPart(input, &flags);
          if (flags & 0x01) {  //  has old value
            readOldValue(input);
          }
          if (flags & 0x04) {
            readVersionTag(input, endpointMemId, memberListForVersionStamp);
          }
          break;
        }
        case TcrMessage::INVALIDATE: {
          uint32_t flags = 0;
          readIntPart(input, &flags);
          if (flags & 0x01) {
            readVersionTag(input, endpointMemId, memberListForVersionStamp);
          }
          readPrMetaData(input);

          break;
        }
        case TcrMessage::DESTROY: {
          uint32_t flags = 0;
          readIntPart(input, &flags);
          if (flags & 0x01) {
            readVersionTag(input, endpointMemId, memberListForVersionStamp);
          }
          readPrMetaData(input);
          // skip the Destroy65.java response entryNotFound int part so
          // that the readSecureObjectPart() call below gets the security part
          // skipParts(input, 1);
          readIntPart(input, &m_entryNotFound);
          LOGDEBUG("Inside TcrMessage::REPLY::DESTROY m_entryNotFound = %d ",
                   m_entryNotFound);
          break;
        }
        case TcrMessage::PING:
        default: {
          readPrMetaData(input);
          break;
        }
      }
      break;
    }
    case TcrMessage::LOCAL_INVALIDATE:
    case TcrMessage::LOCAL_DESTROY: {
      int32_t regionLen = input.readInt32();
      input.advanceCursor(1);  // ignore byte
      char* regname = nullptr;
      regname = new char[regionLen + 1];
      DeleteArray<char> delRegName(regname);
      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
      regname[regionLen] = '\0';
      m_regionName = regname;

      readKeyPart(input);

      // skipParts(input, 1); // skip callbackarg parts
      readCallbackObjectPart(input);
      readVersionTag(input, endpointMemId, memberListForVersionStamp);
      readBooleanPartAsObject(input, &m_isInterestListPassed);
      readBooleanPartAsObject(input, &m_hasCqsPart);
      if (m_hasCqsPart) {
        if (m_msgType == TcrMessage::LOCAL_INVALIDATE) {
          readIntPart(input, &m_msgTypeForCq);
        } else {
          m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
        }
        // LOGINFO("got cq local local_invalidate/local_destroy read
        // m_hasCqsPart");
        readCqsPart(input);
      }

      // read eventid part
      readEventIdPart(input, false);

      break;
    }

    case TcrMessage::LOCAL_CREATE:
    case TcrMessage::LOCAL_UPDATE: {
      int32_t regionLen = input.readInt32();
      input.advanceCursor(1);  // ignore byte
      char* regname = nullptr;
      regname = new char[regionLen + 1];
      DeleteArray<char> delRegName(regname);
      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
      regname[regionLen] = '\0';
      m_regionName = regname;

      readKeyPart(input);
      //  Read delta flag
      bool isDelta = false;
      readBooleanPartAsObject(input, &isDelta);
      if (isDelta) {
        m_deltaBytesLen = input.readInt32();

        input.advanceCursor(1);  // ignore byte
        m_deltaBytes = new int8_t[m_deltaBytesLen];
        input.readBytesOnly(m_deltaBytes, m_deltaBytesLen);
        m_delta = std::unique_ptr<DataInput>(new DataInput(
            m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
                reinterpret_cast<const uint8_t*>(m_deltaBytes),
                m_deltaBytesLen)));
      } else {
        readObjectPart(input);
      }

      // skip callbackarg part
      // skipParts(input, 1);
      readCallbackObjectPart(input);
      readVersionTag(input, endpointMemId, memberListForVersionStamp);
      readBooleanPartAsObject(input, &m_isInterestListPassed);
      readBooleanPartAsObject(input, &m_hasCqsPart);

      if (m_hasCqsPart) {
        // LOGINFO("got cq local_create/local_create");
        readCqsPart(input);
        m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
      }

      // read eventid part
      readEventIdPart(input, false);
      _GEODE_SAFE_DELETE_ARRAY(regname);  // COVERITY ---> 30299 Resource leak

      break;
    }
    case TcrMessage::CLIENT_MARKER: {
      // dont skip (non-existent) callbackarg part, just read eventid part
      readEventIdPart(input, false);
      break;
    }

    case TcrMessage::LOCAL_DESTROY_REGION:
    case TcrMessage::CLEAR_REGION: {
      int32_t regionLen = input.readInt32();
      input.advanceCursor(1);  // ignore byte
      char* regname = nullptr;
      regname = new char[regionLen + 1];
      DeleteArray<char> delRegName(regname);
      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
      regname[regionLen] = '\0';
      m_regionName = regname;
      // skip callbackarg part
      // skipParts(input, 1);
      readCallbackObjectPart(input);
      readBooleanPartAsObject(input, &m_hasCqsPart);
      if (m_hasCqsPart) {
        // LOGINFO("got cq region_destroy read m_hasCqsPart");
        readCqsPart(input);
      }
      // read eventid part
      readEventIdPart(input, false);
      break;
    }

    case TcrMessage::RESPONSE_CLIENT_PR_METADATA: {
      if (len == 17) {
        LOGDEBUG("RESPONSE_CLIENT_PR_METADATA len is 17");
        return;
      }
      m_metadata =
          new std::vector<std::vector<std::shared_ptr<BucketServerLocation>>>();
      for (int32_t i = 0; i < numparts; i++) {
        input.readInt32();          // ignore partlen
        input.read();               // ignore  isObj;
        auto bits8 = input.read();  // cacheable vector typeid
        LOGDEBUG("Expected typeID %d, got %d", DSCode::CacheableArrayList,
                 bits8);

        auto arrayLength = input.readArrayLength();  // array length
        LOGDEBUG("Array length = %d ", arrayLength);
        if (arrayLength > 0) {
          std::vector<std::shared_ptr<BucketServerLocation>>
              bucketServerLocations;
          for (int32_t index = 0; index < arrayLength; index++) {
            // ignore DS typeid, CLASS typeid, and string typeid
            input.advanceCursor(3);
            uint16_t classLen = input.readInt16();  // Read classLen
            input.advanceCursor(classLen);
            auto location = std::make_shared<BucketServerLocation>();
            location->fromData(input);
            LOGFINE("location contains %d\t%s\t%d\t%d\t%s",
                    location->getBucketId(), location->getServerName().c_str(),
                    location->getPort(), location->getVersion(),
                    (location->isPrimary() ? "true" : "false"));
            bucketServerLocations.push_back(location);
          }
          m_metadata->push_back(bucketServerLocations);
        }
        LOGFINER("Metadata size is %zu", m_metadata->size());
      }
      break;
    }

    case TcrMessage::GET_CLIENT_PR_METADATA_ERROR: {
      LOGERROR("Failed to get single-hop meta data");
      break;
    }

    case TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES: {
      input.readInt32();  // ignore partlen;
      input.read();       // ignore isObj;

      // PART1 = bucketCount
      m_bucketCount = input.readNativeInt32();

      auto partLength = input.readInt32();  // partlen;
      input.read();                         // ignore isObj;
      if (partLength > 0) {
        // PART2 = colocatedwith
        m_colocatedWith = input.readString();
      }

      if (numparts == 4) {
        partLength = input.readInt32();  // partlen;
        input.read();                    // ignore isObj;
        if (partLength > 0) {
          // PART3 = partitionresolvername
          input.readString();  // ignore
        }

        input.readInt32();  // ignore partlen;
        input.read();       // ignore isObj;
        input.read();       // ignore cacheable CacheableHashSet typeid

        auto arrayLength = input.readArrayLength();  // array length
        if (arrayLength > 0) {
          m_fpaSet =
              new std::vector<std::shared_ptr<FixedPartitionAttributesImpl>>();
          for (int32_t index = 0; index < arrayLength; index++) {
            input.advanceCursor(
                3);  // ignore DS typeid, CLASS typeid, string typeid
            auto classLen = input.readInt16();  // Read classLen
            input.advanceCursor(classLen);
            auto fpa = std::make_shared<FixedPartitionAttributesImpl>();
            fpa->fromData(input);  // PART4 = set of FixedAttributes.
            LOGDEBUG("fpa contains %d\t%s\t%d\t%d", fpa->getNumBuckets(),
                     fpa->getPartitionName().c_str(), fpa->isPrimary(),
                     fpa->getStartingBucketID());
            m_fpaSet->push_back(fpa);
          }
        }
      }
      break;
    }
    case TcrMessage::TOMBSTONE_OPERATION: {
      uint32_t tombstoneOpType;
      int32_t regionLen = input.readInt32();
      input.read();
      char* regname = nullptr;

      regname = new char[regionLen + 1];
      DeleteArray<char> delRegName(regname);
      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
      regname[regionLen] = '\0';
      m_regionName = regname;
      readIntPart(input, &tombstoneOpType);  // partlen;
      // read and ignore length
      input.readInt32();
      // read and ignore isObj
      input.read();

      if (tombstoneOpType == 0) {
        if (m_tombstoneVersions == nullptr) {
          m_tombstoneVersions = CacheableHashMap::create();
        }
        readHashMapForGCVersions(input, m_tombstoneVersions);
      } else if (tombstoneOpType == 1) {
        if (m_tombstoneKeys == nullptr) {
          m_tombstoneKeys = CacheableHashSet::create();
        }
        // input.readObject(m_tombstoneKeys);
        readHashSetForGCVersions(input, m_tombstoneKeys);
      } else {
        LOGERROR("Failed to read the tombstone versions");
        break;
      }
      // readEventId Part
      readEventIdPart(input, false);
      break;
    }
    case TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES_ERROR: {
      LOGERROR("Failed to get server partitioned region attributes");
      break;
    }

    case TcrMessage::UNKNOWN_MESSAGE_TYPE_ERROR: {
      // do nothing
      break;
    }

    case TcrMessage::REQUEST_EVENT_VALUE_ERROR: {
      LOGERROR("Error while requesting full value for delta");
      break;
    }

    default:
      LOGERROR(
          "Unknown message type %d in response, possible serialization "
          "mismatch",
          m_msgType);
      std::stringstream ss;
      ss << boost::stacktrace::stacktrace();
      LOGERROR(ss.str().c_str());
      throw MessageException("handleByteArrayResponse: unknown message type");
  }
  LOGDEBUG("handleByteArrayResponse earlyack = %d ", earlyack);
  if (earlyack & 0x2) readSecureObjectPart(input);
}