tools/gnmsg/client_messages.py (407 lines of code) (raw):

#!/usr/local/bin/python3 # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import json from ds_codes import ds_codes from read_values import ( call_reader_function, parse_key_or_value, read_array_length, read_byte_value, read_cacheable, read_geode_jmutf8_string_value, read_int_value, read_long_value, read_short_value, read_string_value, read_unsigned_byte_value, ) from read_parts import read_object_header, read_int_part from numeric_conversion import decimal_string_to_hex_string, int_to_hex_string from interest_policy import interest_policy from interest_type import interest_type CHARS_IN_MESSAGE_HEADER = 34 def parse_region_part(message_bytes, offset): region_part = {} (region_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (region_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (region_part["Name"], offset) = read_string_value( message_bytes, region_part["Size"], offset ) return (region_part, offset) def parse_regex_part(message_bytes, offset): regex_part = {} (regex_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (regex_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (regex_part["Expression"], offset) = read_string_value( message_bytes, regex_part["Size"], offset ) return (regex_part, offset) def parse_object_part(message_bytes, offset): object_part = {} (object_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (object_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) offset += 2 * object_part["Size"] return (object_part, offset) def parse_event_id_part(message_bytes, offset): event_id_part = {} (event_id_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (event_id_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (event_id_part["LongCode1"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (event_id_part["EventIdThread"], offset) = call_reader_function( message_bytes, offset, read_long_value ) (event_id_part["LongCode2"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (event_id_part["EventIdSequence"], offset) = call_reader_function( message_bytes, offset, read_long_value ) return (event_id_part, offset) def parse_operation_part(message_bytes, offset): operation_part = {} (operation_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (operation_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) if operation_part["Size"] != 1: raise Exception( "'Operation' part of PUT message should always be size 1 for native client" ) operation_part["Data"], offset = call_reader_function( message_bytes, offset, read_byte_value ) return (operation_part, offset) def parse_flags_part(message_bytes, offset): flags_part = {} (flags_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (flags_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) if flags_part["Size"] != 4: raise Exception( "'Flags' part of PUT message should always be size 4 for native client" ) flags_part["Data"], offset = call_reader_function( message_bytes, offset, read_int_value ) return (flags_part, offset) def parse_credentials(message_bytes, offset): credentials = {} array_len = 0 (credentials["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (credentials["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (array_len, offset) = read_array_length(message_bytes, offset) for i in range(0, array_len): (credentials["Key" + str(i)], offset) = read_cacheable(message_bytes, offset) (credentials["Value" + str(i)], offset) = read_cacheable(message_bytes, offset) return (credentials, offset) def parse_raw_string_part(message_bytes, offset): string_part = {} (string_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (string_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (string_part["Value"], offset) = read_geode_jmutf8_string_value( message_bytes, offset ) return (string_part, offset) def parse_raw_int_part(message_bytes, offset): int_part = {} (int_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (int_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (int_part["Value"], offset) = call_reader_function( message_bytes, offset, read_int_value ) return (int_part, offset) def parse_raw_byte_part(message_bytes, offset): byte_part = {} (byte_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (byte_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (byte_part["Value"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) return (byte_part, offset) def parse_raw_boolean_part(message_bytes, offset): bool_part = {} (bool_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (bool_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) bool_val = 0 (bool_val, offset) = call_reader_function(message_bytes, offset, read_byte_value) bool_part["Value"] = "False" if bool_val == 0 else "True" return (bool_part, offset) def parse_byte_and_timeout_part(message_bytes, offset): byte_and_timeout_part = {} (byte_and_timeout_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (byte_and_timeout_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) bool_val = 0 (byte_and_timeout_part["Byte"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (byte_and_timeout_part["TimeoutMs"], offset) = call_reader_function( message_bytes, offset, read_int_value ) return (byte_and_timeout_part, offset) def parse_interest_result_policy_part(message_bytes, offset): interest_result_policy_part = {} (interest_result_policy_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (interest_result_policy_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) dscode, offset = call_reader_function(message_bytes, offset, read_byte_value) interest_result_policy_part["DSCode1"] = ds_codes[dscode] dscode, offset = call_reader_function(message_bytes, offset, read_byte_value) interest_result_policy_part["DSCode2"] = ds_codes[dscode] policy, offset = call_reader_function(message_bytes, offset, read_byte_value) interest_result_policy_part["Policy"] = interest_policy[policy] return (interest_result_policy_part, offset) def parse_interest_type_part(message_bytes, offset): value, offset = parse_raw_int_part(message_bytes, offset) value["InterestType"] = interest_type[value["Value"]] del value["Value"] return value, offset def read_put_message(properties, message_bytes, offset): (properties["Region"], offset) = parse_region_part(message_bytes, offset) (properties["Operation"], offset) = parse_operation_part(message_bytes, offset) (properties["Flags"], offset) = parse_flags_part(message_bytes, offset) (properties["Key"], offset) = parse_key_or_value(message_bytes, offset) (properties["IsDelta"], offset) = parse_key_or_value(message_bytes, offset) (properties["Value"], offset) = parse_key_or_value(message_bytes, offset) # This is a little weird, so here's the explanation: the geode-native logger has a buffer size limit of 8KB, so we # can't fully parse any message that's longer than that. At the same time, we would really like to extract as much # information as we can from such a message. Since the first 5 parts of the PUT message are trivial in size, the # actual value being PUT is the thing that we can't decode. Even then, we would like to know as much about the # "Value" part of the message as we can, so we will attempt to parse the value, and catch any exceptions above this # level. In the case of large values that we don't attempt to parse, such as PDX, we will still inform that the # value is of size (n), is or is not an object, and the type is (PDX or whatever). Great so far, but what about the # EventId part, that sits at the very end of the PUT message? Well, we shouldn't bother with it if the length of # the message is > the logger size limit, because we're guaranteed it's not in the log. So that's what we do here, # just skip the EventId. if properties["Length"] < 8192: (properties["EventId"], offset) = parse_event_id_part(message_bytes, offset) else: properties["EventId"] = {"Data": "Unavailable - message is too long"} def read_request_message(properties, message_bytes, offset): (properties["Region"], offset) = parse_region_part(message_bytes, offset) (properties["Key"], offset) = parse_key_or_value(message_bytes, offset) def read_close_connection_message(properties, message_bytes, offset): object_part = {} (object_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) if object_part["Size"] != 1: raise Exception("CloseConnection message should be only one byte long!") (object_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (object_part["KeepAlive"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) properties["ObjectPart"] = object_part def read_contains_key_message(properties, message_bytes, offset): (properties["RegionPart"], offset) = parse_region_part(message_bytes, offset) (properties["Key"], offset) = parse_key_or_value(message_bytes, offset) (request_type, offset) = parse_raw_int_part(message_bytes, offset) if request_type["Value"] == 1: properties["RequestType"] == "ContainsValueForKey" else: properties["RequestType"] = "ContainsKey" def read_destroy_message(properties, message_bytes, offset): if properties["Parts"] > 5: raise Exception( "Parser can't handle callback argument in DESTROY message. Send this log file to the dev team!" ) (properties["Region"], offset) = parse_region_part(message_bytes, offset) (properties["Key"], offset) = parse_key_or_value(message_bytes, offset) (properties["ExpectedOldValue"], offset) = parse_key_or_value(message_bytes, offset) (properties["Operation"], offset) = parse_operation_part(message_bytes, offset) (properties["EventId"], offset) = parse_event_id_part(message_bytes, offset) def read_get_client_partition_attributes_message(properties, message_bytes, offset): (properties["RegionPart"], offset) = parse_region_part(message_bytes, offset) for i in range(1, properties["Parts"]): (properties["ObjectPart" + str(i)], offset) = parse_object_part( message_bytes, offset ) def read_get_client_pr_metadata_message(properties, message_bytes, offset): (properties["RegionPart"], offset) = parse_region_part(message_bytes, offset) for i in range(1, properties["Parts"]): (properties["ObjectPart" + str(i)], offset) = parse_object_part( message_bytes, offset ) def read_query_message(properties, message_bytes, offset): (properties["Query"], offset) = parse_region_part(message_bytes, offset) (properties["EventId"], offset) = parse_event_id_part(message_bytes, offset) if properties["Parts"] == 3: timeout_part = {} (timeout_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (timeout_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (timeout_part["TimeoutMs"], offset) = call_reader_function( message_bytes, offset, read_int_value ) properties["Timeout"] = timeout_part def read_user_credential_message(properties, message_bytes, offset): (properties["Credentials"], offset) = parse_credentials(message_bytes, offset) def read_executecq_msg_type_message(properties, message_bytes, offset): (properties["CQName"], offset) = parse_raw_string_part(message_bytes, offset) (properties["QueryString"], offset) = parse_raw_string_part(message_bytes, offset) (properties["CqState"], offset) = parse_raw_int_part(message_bytes, offset) (properties["Durable"], offset) = parse_raw_boolean_part(message_bytes, offset) (properties["RegionDataPolicy"], offset) = parse_raw_byte_part( message_bytes, offset ) def read_executecq_with_ir_msg_type_message(properties, message_bytes, offset): if properties["Parts"] != 5: raise Exception( "Don't know how to parse a RESPONSE_CLIENT_PARTITION_ATTRIBUTES message with " + properties["Parts"] + " parts (should always have 5)." ) (properties["CQName"], offset) = parse_raw_string_part(message_bytes, offset) (properties["QueryString"], offset) = parse_raw_string_part(message_bytes, offset) (properties["CqState"], offset) = parse_raw_int_part(message_bytes, offset) (properties["Durable"], offset) = parse_raw_boolean_part(message_bytes, offset) (properties["RegionDataPolicy"], offset) = parse_raw_byte_part( message_bytes, offset ) def read_stopcq_or_closecq_msg_type_message(properties, message_bytes, offset): (properties["Region"], offset) = parse_region_part(message_bytes, offset) (properties["EventId"], offset) = parse_event_id_part(message_bytes, offset) if properties["Parts"] == 3: timeout_part = {} (timeout_part["Size"], offset) = call_reader_function( message_bytes, offset, read_int_value ) (timeout_part["IsObject"], offset) = call_reader_function( message_bytes, offset, read_byte_value ) (timeout_part["TimeoutMs"], offset) = call_reader_function( message_bytes, offset, read_int_value ) properties["Timeout"] = timeout_part def read_get_pdx_id_for_type_message(properties, message_bytes, offset): (properties["PdxType"], offset) = parse_object_part(message_bytes, offset) def read_get_function_attributes_message(properties, message_bytes, offset): (properties["FunctionName"], offset) = parse_region_part(message_bytes, offset) def read_execute_function_message(properties, message_bytes, offset): (properties["ByteAndTimeout"], offset) = parse_byte_and_timeout_part( message_bytes, offset ) (properties["FunctionName"], offset) = parse_region_part(message_bytes, offset) (properties["Arguments"], offset) = parse_object_part(message_bytes, offset) def parse_getall_optional_callback_arguments(message_bytes, offset): (local_object, local_offset) = parse_object_part(message_bytes, offset) if local_object["IsObject"] == 0: (local_object, local_offset) = parse_raw_int_part(message_bytes, offset) return (local_object, local_offset) def read_get_all_70_message(properties, message_bytes, offset): (properties["Region"], offset) = parse_region_part(message_bytes, offset) (properties["KeyList"], offset) = parse_key_or_value(message_bytes, offset) ( properties["CallbackArguments"], offset, ) = parse_getall_optional_callback_arguments(message_bytes, offset) def read_key_set(properties, message_bytes, offset): (properties["Region"], offset) = parse_region_part(message_bytes, offset) def read_object_as_raw_bytes(message_bytes, offset): raw_bytes_part, offset = read_object_header(message_bytes, offset) bytes_string = "" for i in range(raw_bytes_part["Size"]): if i: bytes_string += " " byte_val, offset = call_reader_function( message_bytes, offset, read_unsigned_byte_value ) bytes_string += decimal_string_to_hex_string(str(byte_val)) raw_bytes_part["Bytes"] = bytes_string return raw_bytes_part, offset def read_add_pdx_type_message(properties, message_bytes, offset): properties["PdxType"], offset = read_object_as_raw_bytes(message_bytes, offset) properties["TypeId"], offset = read_int_part(message_bytes, offset) def read_register_interest_message(properties, message_bytes, offset): properties["Region"], offset = parse_region_part(message_bytes, offset) properties["InterestType"], offset = parse_interest_type_part(message_bytes, offset) properties["InterestResultPolicy"], offset = parse_interest_result_policy_part( message_bytes, offset ) properties["IsDurable"], offset = parse_raw_byte_part(message_bytes, offset) properties["Regex"], offset = parse_regex_part(message_bytes, offset) properties["Param1"], offset = read_object_as_raw_bytes(message_bytes, offset) byte_values = properties["Param1"]["Bytes"] del properties["Param1"]["Bytes"] properties["Param1"]["ReceiveValues"] = int(byte_values) properties["Param2"], offset = read_object_as_raw_bytes(message_bytes, offset) byte_values = properties["Param2"]["Bytes"] del properties["Param2"]["Bytes"] caching_enabled, serialize_values = byte_values.split(" ") properties["Param2"]["CachingEnabled"] = int(caching_enabled) properties["Param2"]["SerializeValues"] = int(serialize_values) client_message_parsers = { "ADD_PDX_TYPE": read_add_pdx_type_message, "CLOSECQ_MSG_TYPE": read_stopcq_or_closecq_msg_type_message, "CLOSE_CONNECTION": read_close_connection_message, "CONTAINS_KEY": read_contains_key_message, "DESTROY": read_destroy_message, "EXECUTECQ_MSG_TYPE": read_executecq_msg_type_message, "EXECUTECQ_WITH_IR_MSG_TYPE": read_executecq_with_ir_msg_type_message, "EXECUTE_FUNCTION": read_execute_function_message, "GET_ALL_70": read_get_all_70_message, "GET_CLIENT_PARTITION_ATTRIBUTES": read_get_client_partition_attributes_message, "GET_CLIENT_PR_METADATA": read_get_client_pr_metadata_message, "GET_FUNCTION_ATTRIBUTES": read_get_function_attributes_message, "GET_PDX_ID_FOR_TYPE": read_get_pdx_id_for_type_message, "KEY_SET": read_key_set, "PUT": read_put_message, "QUERY": read_query_message, "REGISTER_INTEREST": read_register_interest_message, "REQUEST": read_request_message, "STOPCQ_MSG_TYPE": read_stopcq_or_closecq_msg_type_message, "USER_CREDENTIAL_MESSAGE": read_user_credential_message, } def parse_client_message(properties, message_bytes): offset = CHARS_IN_MESSAGE_HEADER if properties["Type"] in client_message_parsers.keys(): try: client_message_parsers[properties["Type"]]( properties, message_bytes, offset ) except: properties["ERROR"] = "Exception reading message - probably incomplete" return