def multi_processor_change_acl()

in azure/datalake/store/multiprocessor.py [0:0]


def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None):
    logger = logging.getLogger("azure.datalake.store")

    def launch_processes(number_of_processes):
        if number_of_processes is None:
            number_of_processes = max(2, multiprocessing.cpu_count() - 1)
        process_list = []
        for i in range(number_of_processes):
            process_list.append(multiprocessing.Process(target=processor,
                                    args=(adl, file_path_queue, finish_queue_processing_flag,
                                          method_name, acl_spec, log_queue, exception_queue)))
            process_list[-1].start()
        return process_list

    def walk(walk_path):
        try:
            paths = []
            all_files = adl.ls(path=walk_path, detail=True)

            for files in all_files:
                if files['type'] == 'DIRECTORY':
                    dir_processed_counter.increment()               # A new directory to process
                    walk_thread_pool.submit(walk, files['name'])

                paths.append((files['name'], files['type'] == 'FILE'))

                if len(paths) == QUEUE_BUCKET_SIZE:
                    file_path_queue.put(list(paths))
                    paths = []

            if paths != []:
                file_path_queue.put(list(paths))  # For leftover paths < bucket_size
        except FileNotFoundError:
            pass                    # Continue in case the file was deleted in between
        except Exception:
            import traceback
            logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!")
            exception_queue.put(traceback.format_exc())
        finally:
            dir_processed_counter.decrement()           # Processing complete for this directory

    # Initialize concurrency primitives
    log_queue = multiprocessing.JoinableQueue()
    exception_queue = multiprocessing.Queue()
    finish_queue_processing_flag = multiprocessing.Event()
    file_path_queue = multiprocessing.JoinableQueue()
    dir_processed_counter = CountUpDownLatch()

    # Start relevant threads and processes
    log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
    log_listener.start()
    child_processes = launch_processes(number_of_sub_process)
    exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes))
    exception_monitor_thread.start()
    walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)

    # Root directory needs to be explicitly passed
    file_path_queue.put([(path, False)])
    dir_processed_counter.increment()

    # Processing starts here
    walk(path)

    if dir_processed_counter.is_zero():  # Done processing all directories. Blocking call.
        walk_thread_pool.shutdown()
        file_path_queue.close()          # No new elements to add
        file_path_queue.join()           # Wait for operations to be done
        logger.log(logging.DEBUG, "file path queue closed")
        finish_queue_processing_flag.set()  # Set flag to break loop of child processes
        for child in child_processes:  # Wait for all child process to finish
            logger.log(logging.DEBUG, "Joining process: "+str(child.pid))
            child.join()

    # Cleanup
    logger.log(logging.DEBUG, "Sending exception sentinel")
    exception_queue.put(END_QUEUE_SENTINEL)
    exception_monitor_thread.join()
    logger.log(logging.DEBUG, "Exception monitor thread finished")
    logger.log(logging.DEBUG, "Sending logger sentinel")
    log_queue.put(END_QUEUE_SENTINEL)
    log_queue.join()
    log_queue.close()
    logger.log(logging.DEBUG, "Log queue closed")
    log_listener.join()
    logger.log(logging.DEBUG, "Log thread finished")