in azure_functions_worker/bindings/shared_memory_data_transfer/file_accessor_unix.py [0:0]
def create_mem_map(self, mem_map_name: str, mem_map_size: int) \
-> Optional[mmap.mmap]:
if mem_map_name is None or mem_map_name == '':
raise SharedMemoryException(
f'Cannot create memory map. Invalid name {mem_map_name}')
if mem_map_size <= 0:
raise SharedMemoryException(
f'Cannot create memory map. Invalid size {mem_map_size}')
file = self._create_mem_map_file(mem_map_name, mem_map_size)
if file is None:
logger.warning('Cannot create file: %s', mem_map_name)
return None
mem_map = mmap.mmap(file.fileno(), mem_map_size, mmap.MAP_SHARED,
mmap.PROT_WRITE)
if self._is_mem_map_initialized(mem_map):
raise SharedMemoryException(f'Memory map {mem_map_name} '
'already exists')
self._set_mem_map_initialized(mem_map)
return mem_map