in cppcache/src/TcrMessage.cpp [1132:1605]
void TcrMessage::handleByteArrayResponse(
const char* bytearray, int32_t len, uint16_t endpointMemId,
const SerializationRegistry& serializationRegistry,
MemberListForVersionStamp& memberListForVersionStamp) {
auto input = m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
reinterpret_cast<uint8_t*>(const_cast<char*>(bytearray)), len, getPool());
// TODO:: this need to make sure that pool is there
// if(m_tcdm == nullptr)
// throw IllegalArgumentException("Pool is nullptr in TcrMessage");
m_msgType = input.readInt32();
int32_t msglen;
msglen = input.readInt32();
int32_t numparts;
numparts = input.readInt32();
m_txId = input.readInt32();
auto earlyack = input.read();
LOGDEBUG(
"handleByteArrayResponse m_msgType = %d m_isSecurityOn = %d "
"requesttype "
"=%d",
m_msgType, m_isSecurityOn, m_msgTypeRequest);
LOGDEBUG(
"Message type=%d, length=%d, parts=%d, txid=%d and eack %d with data "
"length=%d",
m_msgType, msglen, numparts, m_txId, earlyack, len);
// LOGFINE("Message type=%d, length=%d, parts=%d, txid=%d and eack %d with
// data length=%d",
// m_msgType, msglen, numparts, m_txId, earlyack, len);
switch (m_msgType) {
case TcrMessage::RESPONSE: {
if (m_msgTypeRequest == TcrMessage::CONTAINS_KEY) {
readBooleanPartAsObject(input, &m_boolValue);
} else if (m_msgTypeRequest == TcrMessage::USER_CREDENTIAL_MESSAGE) {
readUniqueIDObjectPart(input);
} else if (m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_TYPE ||
m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_ENUM) {
// int will come in response
uint32_t typeId;
readIntPart(input, &typeId);
m_value = CacheableInt32::create(typeId);
} else if (m_msgTypeRequest == TcrMessage::GET_PDX_TYPE_BY_ID) {
// PdxType will come in response
input.advanceCursor(sizeof(int32_t)); // ignore part size
if (input.readBoolean()) { // part type. If 1 is an object
m_value = serializationRegistry.deserialize(
input, static_cast<int8_t>(DSCode::PdxType));
}
} else if (m_msgTypeRequest == TcrMessage::GET_PDX_ENUM_BY_ID) {
// PdxType will come in response
input.advanceCursor(5); // part header
m_value = serializationRegistry.deserialize(input);
} else if (m_msgTypeRequest == TcrMessage::GET_FUNCTION_ATTRIBUTES) {
// read and ignore length
input.readInt32();
input.advanceCursor(1); // ignore byte
if (!m_functionAttributes) {
m_functionAttributes = std::make_shared<std::vector<int8_t>>();
}
m_functionAttributes->push_back(input.read());
m_functionAttributes->push_back(input.read());
m_functionAttributes->push_back(input.read());
} else if (m_msgTypeRequest == TcrMessage::REQUEST) {
int32_t receivednumparts = 2;
readObjectPart(input);
uint32_t flag = 0;
readIntPart(input, &flag);
if (flag & 0x01) {
readCallbackObjectPart(input);
receivednumparts++;
}
if ((m_value == nullptr) && (flag & 0x08 /*VALUE_IS_INVALID*/)) {
m_value = CacheableToken::invalid();
}
if (flag & 0x02) {
readVersionTag(input, endpointMemId, memberListForVersionStamp);
receivednumparts++;
}
if (flag & 0x04 /*KEY_NOT_PRESENT*/) {
m_value = CacheableToken::tombstone();
}
if (numparts > receivednumparts) readPrMetaData(input);
} else if (m_decodeAll) {
readObjectPart(input);
if (numparts == 2) {
if (m_isCallBackArguement) {
readCallbackObjectPart(input);
} else {
int32_t lenObj = input.readInt32();
input.readBoolean();
m_metaDataVersion = input.read();
if (lenObj == 2) {
m_serverGroupVersion = input.read();
LOGDEBUG(
"Single-hop m_serverGroupVersion in message response is %d",
m_serverGroupVersion);
}
}
} else if (numparts > 2) {
skipParts(input, 1);
int32_t lenObj = input.readInt32();
input.readBoolean();
m_metaDataVersion = input.read();
LOGFINE("Single-hop metadata version in message response is %d",
m_metaDataVersion);
if (lenObj == 2) {
m_serverGroupVersion = input.read();
LOGDEBUG(
"Single-hop m_serverGroupVersion in message response is %d",
m_serverGroupVersion);
}
}
}
break;
}
case TcrMessage::EXCEPTION: {
uint8_t lastChunk = static_cast<uint8_t>(numparts);
lastChunk = (lastChunk << 5);
readExceptionPart(input, lastChunk);
// if (m_isSecurityOn)
// readSecureObjectPart( input );
break;
}
case TcrMessage::INVALID: {
// Read the string in the reply
LOGWARN("Received invalid message type as reply from server");
readObjectPart(input, true);
break;
}
case TcrMessage::CLIENT_REGISTER_INTEREST:
case TcrMessage::CLIENT_UNREGISTER_INTEREST:
case TcrMessage::SERVER_TO_CLIENT_PING:
case TcrMessage::REGISTER_INSTANTIATORS: {
// ignore this
m_shouldIgnore = true;
break;
}
case TcrMessage::REGISTER_INTEREST_DATA_ERROR:
case TcrMessage::UNREGISTER_INTEREST_DATA_ERROR:
case TcrMessage::PUT_DATA_ERROR:
case TcrMessage::KEY_SET_DATA_ERROR:
case TcrMessage::DESTROY_REGION_DATA_ERROR:
case TcrMessage::CLEAR_REGION_DATA_ERROR:
case TcrMessage::CONTAINS_KEY_DATA_ERROR:
case TcrMessage::PUT_DELTA_ERROR:
case TcrMessage::REQUEST_DATA_ERROR: {
m_value = std::make_shared<CacheableString>(readStringPart(input));
break;
}
case TcrMessage::REPLY: {
switch (m_msgTypeRequest) {
case TcrMessage::PUT: {
readPrMetaData(input);
uint32_t flags = 0;
readIntPart(input, &flags);
if (flags & 0x01) { // has old value
readOldValue(input);
}
if (flags & 0x04) {
readVersionTag(input, endpointMemId, memberListForVersionStamp);
}
break;
}
case TcrMessage::INVALIDATE: {
uint32_t flags = 0;
readIntPart(input, &flags);
if (flags & 0x01) {
readVersionTag(input, endpointMemId, memberListForVersionStamp);
}
readPrMetaData(input);
break;
}
case TcrMessage::DESTROY: {
uint32_t flags = 0;
readIntPart(input, &flags);
if (flags & 0x01) {
readVersionTag(input, endpointMemId, memberListForVersionStamp);
}
readPrMetaData(input);
// skip the Destroy65.java response entryNotFound int part so
// that the readSecureObjectPart() call below gets the security part
// skipParts(input, 1);
readIntPart(input, &m_entryNotFound);
LOGDEBUG("Inside TcrMessage::REPLY::DESTROY m_entryNotFound = %d ",
m_entryNotFound);
break;
}
case TcrMessage::PING:
default: {
readPrMetaData(input);
break;
}
}
break;
}
case TcrMessage::LOCAL_INVALIDATE:
case TcrMessage::LOCAL_DESTROY: {
int32_t regionLen = input.readInt32();
input.advanceCursor(1); // ignore byte
char* regname = nullptr;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
readKeyPart(input);
// skipParts(input, 1); // skip callbackarg parts
readCallbackObjectPart(input);
readVersionTag(input, endpointMemId, memberListForVersionStamp);
readBooleanPartAsObject(input, &m_isInterestListPassed);
readBooleanPartAsObject(input, &m_hasCqsPart);
if (m_hasCqsPart) {
if (m_msgType == TcrMessage::LOCAL_INVALIDATE) {
readIntPart(input, &m_msgTypeForCq);
} else {
m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
}
// LOGINFO("got cq local local_invalidate/local_destroy read
// m_hasCqsPart");
readCqsPart(input);
}
// read eventid part
readEventIdPart(input, false);
break;
}
case TcrMessage::LOCAL_CREATE:
case TcrMessage::LOCAL_UPDATE: {
int32_t regionLen = input.readInt32();
input.advanceCursor(1); // ignore byte
char* regname = nullptr;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
readKeyPart(input);
// Read delta flag
bool isDelta = false;
readBooleanPartAsObject(input, &isDelta);
if (isDelta) {
m_deltaBytesLen = input.readInt32();
input.advanceCursor(1); // ignore byte
m_deltaBytes = new int8_t[m_deltaBytesLen];
input.readBytesOnly(m_deltaBytes, m_deltaBytesLen);
m_delta = std::unique_ptr<DataInput>(new DataInput(
m_tcdm->getConnectionManager().getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(m_deltaBytes),
m_deltaBytesLen)));
} else {
readObjectPart(input);
}
// skip callbackarg part
// skipParts(input, 1);
readCallbackObjectPart(input);
readVersionTag(input, endpointMemId, memberListForVersionStamp);
readBooleanPartAsObject(input, &m_isInterestListPassed);
readBooleanPartAsObject(input, &m_hasCqsPart);
if (m_hasCqsPart) {
// LOGINFO("got cq local_create/local_create");
readCqsPart(input);
m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
}
// read eventid part
readEventIdPart(input, false);
_GEODE_SAFE_DELETE_ARRAY(regname); // COVERITY ---> 30299 Resource leak
break;
}
case TcrMessage::CLIENT_MARKER: {
// dont skip (non-existent) callbackarg part, just read eventid part
readEventIdPart(input, false);
break;
}
case TcrMessage::LOCAL_DESTROY_REGION:
case TcrMessage::CLEAR_REGION: {
int32_t regionLen = input.readInt32();
input.advanceCursor(1); // ignore byte
char* regname = nullptr;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
// skip callbackarg part
// skipParts(input, 1);
readCallbackObjectPart(input);
readBooleanPartAsObject(input, &m_hasCqsPart);
if (m_hasCqsPart) {
// LOGINFO("got cq region_destroy read m_hasCqsPart");
readCqsPart(input);
}
// read eventid part
readEventIdPart(input, false);
break;
}
case TcrMessage::RESPONSE_CLIENT_PR_METADATA: {
if (len == 17) {
LOGDEBUG("RESPONSE_CLIENT_PR_METADATA len is 17");
return;
}
m_metadata =
new std::vector<std::vector<std::shared_ptr<BucketServerLocation>>>();
for (int32_t i = 0; i < numparts; i++) {
input.readInt32(); // ignore partlen
input.read(); // ignore isObj;
auto bits8 = input.read(); // cacheable vector typeid
LOGDEBUG("Expected typeID %d, got %d", DSCode::CacheableArrayList,
bits8);
auto arrayLength = input.readArrayLength(); // array length
LOGDEBUG("Array length = %d ", arrayLength);
if (arrayLength > 0) {
std::vector<std::shared_ptr<BucketServerLocation>>
bucketServerLocations;
for (int32_t index = 0; index < arrayLength; index++) {
// ignore DS typeid, CLASS typeid, and string typeid
input.advanceCursor(3);
uint16_t classLen = input.readInt16(); // Read classLen
input.advanceCursor(classLen);
auto location = std::make_shared<BucketServerLocation>();
location->fromData(input);
LOGFINE("location contains %d\t%s\t%d\t%d\t%s",
location->getBucketId(), location->getServerName().c_str(),
location->getPort(), location->getVersion(),
(location->isPrimary() ? "true" : "false"));
bucketServerLocations.push_back(location);
}
m_metadata->push_back(bucketServerLocations);
}
LOGFINER("Metadata size is %zu", m_metadata->size());
}
break;
}
case TcrMessage::GET_CLIENT_PR_METADATA_ERROR: {
LOGERROR("Failed to get single-hop meta data");
break;
}
case TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES: {
input.readInt32(); // ignore partlen;
input.read(); // ignore isObj;
// PART1 = bucketCount
m_bucketCount = input.readNativeInt32();
auto partLength = input.readInt32(); // partlen;
input.read(); // ignore isObj;
if (partLength > 0) {
// PART2 = colocatedwith
m_colocatedWith = input.readString();
}
if (numparts == 4) {
partLength = input.readInt32(); // partlen;
input.read(); // ignore isObj;
if (partLength > 0) {
// PART3 = partitionresolvername
input.readString(); // ignore
}
input.readInt32(); // ignore partlen;
input.read(); // ignore isObj;
input.read(); // ignore cacheable CacheableHashSet typeid
auto arrayLength = input.readArrayLength(); // array length
if (arrayLength > 0) {
m_fpaSet =
new std::vector<std::shared_ptr<FixedPartitionAttributesImpl>>();
for (int32_t index = 0; index < arrayLength; index++) {
input.advanceCursor(
3); // ignore DS typeid, CLASS typeid, string typeid
auto classLen = input.readInt16(); // Read classLen
input.advanceCursor(classLen);
auto fpa = std::make_shared<FixedPartitionAttributesImpl>();
fpa->fromData(input); // PART4 = set of FixedAttributes.
LOGDEBUG("fpa contains %d\t%s\t%d\t%d", fpa->getNumBuckets(),
fpa->getPartitionName().c_str(), fpa->isPrimary(),
fpa->getStartingBucketID());
m_fpaSet->push_back(fpa);
}
}
}
break;
}
case TcrMessage::TOMBSTONE_OPERATION: {
uint32_t tombstoneOpType;
int32_t regionLen = input.readInt32();
input.read();
char* regname = nullptr;
regname = new char[regionLen + 1];
DeleteArray<char> delRegName(regname);
input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
regname[regionLen] = '\0';
m_regionName = regname;
readIntPart(input, &tombstoneOpType); // partlen;
// read and ignore length
input.readInt32();
// read and ignore isObj
input.read();
if (tombstoneOpType == 0) {
if (m_tombstoneVersions == nullptr) {
m_tombstoneVersions = CacheableHashMap::create();
}
readHashMapForGCVersions(input, m_tombstoneVersions);
} else if (tombstoneOpType == 1) {
if (m_tombstoneKeys == nullptr) {
m_tombstoneKeys = CacheableHashSet::create();
}
// input.readObject(m_tombstoneKeys);
readHashSetForGCVersions(input, m_tombstoneKeys);
} else {
LOGERROR("Failed to read the tombstone versions");
break;
}
// readEventId Part
readEventIdPart(input, false);
break;
}
case TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES_ERROR: {
LOGERROR("Failed to get server partitioned region attributes");
break;
}
case TcrMessage::UNKNOWN_MESSAGE_TYPE_ERROR: {
// do nothing
break;
}
case TcrMessage::REQUEST_EVENT_VALUE_ERROR: {
LOGERROR("Error while requesting full value for delta");
break;
}
default:
LOGERROR(
"Unknown message type %d in response, possible serialization "
"mismatch",
m_msgType);
std::stringstream ss;
ss << boost::stacktrace::stacktrace();
LOGERROR(ss.str().c_str());
throw MessageException("handleByteArrayResponse: unknown message type");
}
LOGDEBUG("handleByteArrayResponse earlyack = %d ", earlyack);
if (earlyack & 0x2) readSecureObjectPart(input);
}