transcoder/source/file/CmeBinaryPacketFileMessageSource.py (48 lines of code) (raw):

# # Copyright 2022 Google LLC # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from transcoder.source.file import LengthDelimitedFileMessageSource class CmeBinaryPacketFileMessageSource(LengthDelimitedFileMessageSource): """CME binary package file message source implementation. Derives from length delimited source and overrides the message slicing logic """ @staticmethod def source_type_identifier(): return 'cme_binary_packet' def __init__(self, file_path: str, endian: str, skip_bytes: int = 0, message_skip_bytes: int = 0, prefix_length: int = 2): super().__init__(file_path, skip_bytes=skip_bytes, endian=endian, message_skip_bytes=message_skip_bytes, prefix_length=prefix_length) def get_message_iterator(self): # pylint: disable=duplicate-code while True: if self.message_skip_bytes > 0: # Skip the channel id 2 bytes skipped_bytes = self.file_handle.read(self.message_skip_bytes) if not skipped_bytes: break # Read the parent message length 2 bytes parent_msg_bytes = self.file_handle.read(self.prefix_length) if not parent_msg_bytes: break message_length = int.from_bytes(parent_msg_bytes, self.endian) remaining_message_length = message_length # Skip binary packet header 12 bytes # Unable seek on a stream, # self.file_handle.seek(12, 1) packet_header_bytes = self.file_handle.read(12) if not packet_header_bytes: break # Read message header message size 2 bytes msg_len_bytes = self.file_handle.read(self.prefix_length) if not msg_len_bytes: break child_message_length = int.from_bytes(msg_len_bytes, self.endian) remainder = child_message_length - self.prefix_length self.increment_count() first_msg_bytes = self.file_handle.read(remainder) if not first_msg_bytes: break # print(''.join('{:02x}'.format(x) for x in result)) yield first_msg_bytes # Subtract out the binary packet header 12 bytes remaining_message_length = remaining_message_length - child_message_length - 12 while remaining_message_length > 0: child_msg_len_bytes = self.file_handle.read(self.prefix_length) if not child_msg_len_bytes: break child_message_length = int.from_bytes(child_msg_len_bytes, self.endian) remainder = child_message_length - self.prefix_length self.increment_count() child_msg_bytes = self.file_handle.read(remainder) if not child_msg_bytes: break yield child_msg_bytes remaining_message_length = remaining_message_length - child_message_length self._log_percentage_read()