in azure_functions_worker/bindings/shared_memory_data_transfer/file_accessor_unix.py [0:0]
def _create_mem_map_file(self, mem_map_name: str, mem_map_size: int) \
-> Optional[BufferedRandom]:
"""
Create the file descriptor for a new memory map.
Returns the BufferedRandom stream to the file.
"""
# Ensure that the file does not already exist
for temp_dir in self.valid_dirs:
file_path = os.path.join(temp_dir, mem_map_name)
if os.path.exists(file_path):
raise SharedMemoryException(
f'File {file_path} for memory map {mem_map_name} '
f'already exists')
# Create the file
for temp_dir in self.valid_dirs:
file_path = os.path.join(temp_dir, mem_map_name)
try:
file = open(file_path, 'wb+')
file.truncate(mem_map_size)
return file
except Exception as e:
# If the memory map could not be created in this directory, we
# keep trying in other applicable directories.
logger.warning('Cannot create memory map in %s - %s.'
' Trying other directories.', file_path, e,
exc_info=True)
# Could not create the memory map in any of the applicable directory
# paths so we fail.
logger.error(
'Cannot create memory map %s with size %s in any of the '
'following directories: %s',
mem_map_name, mem_map_size, self.valid_dirs)
return None