in azure_functions_worker/bindings/shared_memory_data_transfer/shared_memory_map.py [0:0]
def get_bytes(self, content_offset: int = 0, bytes_to_read: int = 0) \
-> Optional[bytes]:
"""
Read content from this SharedMemoryMap with the given name and starting
at the given offset.
content_offset = 0 means read from the beginning of the content.
bytes_to_read = 0 means read the entire content.
Returns the content as bytes if successful, None otherwise.
"""
content_length = self._get_content_length()
if content_length is None:
return None
# Seek past the header and get to the content
self.mem_map.seek(consts.CONTENT_HEADER_TOTAL_BYTES)
if content_offset > 0:
self.mem_map.seek(content_offset, os.SEEK_CUR)
if bytes_to_read > 0:
# Read up to the specified number of bytes to read
content = self.mem_map.read(bytes_to_read)
else:
# Read the entire content
content = self.mem_map.read()
return content