in azure/datalake/store/multiprocessor.py [0:0]
def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None):
logger = logging.getLogger("azure.datalake.store")
def launch_processes(number_of_processes):
if number_of_processes is None:
number_of_processes = max(2, multiprocessing.cpu_count() - 1)
process_list = []
for i in range(number_of_processes):
process_list.append(multiprocessing.Process(target=processor,
args=(adl, file_path_queue, finish_queue_processing_flag,
method_name, acl_spec, log_queue, exception_queue)))
process_list[-1].start()
return process_list
def walk(walk_path):
try:
paths = []
all_files = adl.ls(path=walk_path, detail=True)
for files in all_files:
if files['type'] == 'DIRECTORY':
dir_processed_counter.increment() # A new directory to process
walk_thread_pool.submit(walk, files['name'])
paths.append((files['name'], files['type'] == 'FILE'))
if len(paths) == QUEUE_BUCKET_SIZE:
file_path_queue.put(list(paths))
paths = []
if paths != []:
file_path_queue.put(list(paths)) # For leftover paths < bucket_size
except FileNotFoundError:
pass # Continue in case the file was deleted in between
except Exception:
import traceback
logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!")
exception_queue.put(traceback.format_exc())
finally:
dir_processed_counter.decrement() # Processing complete for this directory
# Initialize concurrency primitives
log_queue = multiprocessing.JoinableQueue()
exception_queue = multiprocessing.Queue()
finish_queue_processing_flag = multiprocessing.Event()
file_path_queue = multiprocessing.JoinableQueue()
dir_processed_counter = CountUpDownLatch()
# Start relevant threads and processes
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
log_listener.start()
child_processes = launch_processes(number_of_sub_process)
exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes))
exception_monitor_thread.start()
walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
# Root directory needs to be explicitly passed
file_path_queue.put([(path, False)])
dir_processed_counter.increment()
# Processing starts here
walk(path)
if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
walk_thread_pool.shutdown()
file_path_queue.close() # No new elements to add
file_path_queue.join() # Wait for operations to be done
logger.log(logging.DEBUG, "file path queue closed")
finish_queue_processing_flag.set() # Set flag to break loop of child processes
for child in child_processes: # Wait for all child process to finish
logger.log(logging.DEBUG, "Joining process: "+str(child.pid))
child.join()
# Cleanup
logger.log(logging.DEBUG, "Sending exception sentinel")
exception_queue.put(END_QUEUE_SENTINEL)
exception_monitor_thread.join()
logger.log(logging.DEBUG, "Exception monitor thread finished")
logger.log(logging.DEBUG, "Sending logger sentinel")
log_queue.put(END_QUEUE_SENTINEL)
log_queue.join()
log_queue.close()
logger.log(logging.DEBUG, "Log queue closed")
log_listener.join()
logger.log(logging.DEBUG, "Log thread finished")