def processor()

in azure/datalake/store/multiprocessor.py [0:0]


def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, acl_spec, log_queue, exception_queue):
    logger = logging.getLogger("azure.datalake.store")
    logger.setLevel(logging.DEBUG)
    removed_default_acl_spec = ",".join([x for x in acl_spec.split(',') if not x.lower().startswith("default")])

    try:
        logger.addHandler(logging.handlers.QueueHandler(log_queue))
        logger.propagate = False                                                        # Prevents double logging
    except AttributeError:
        # Python 2 doesn't have Queue Handler. Default to best effort logging.
        pass

    try:
        func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries}
        function_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
        adl_function = func_table[method_name]
        logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid()))

        def func_wrapper(func, path, spec):
            try:
                func(path=path, acl_spec=spec)
            except FileNotFoundError:
                logger.exception("File "+str(path)+" not found")
                # Complete Exception is being logged in the relevant acl method. Don't print exception here
            except Exception as e:
                logger.exception("File " + str(path) + " not set. Exception "+str(e))

            logger.log(logging.DEBUG, "Completed running on path:" + str(path))

        while finish_queue_processing_flag.is_set() == False:
            try:
                file_paths = file_path_queue.get(timeout=0.1)
                file_path_queue.task_done()                 # Will not be called if empty
                for file_path in file_paths:
                    is_file = file_path[1]
                    if is_file:
                        spec = removed_default_acl_spec
                    else:
                        spec = acl_spec

                    logger.log(logging.DEBUG, "Starting on path:" + str(file_path))
                    function_thread_pool.submit(func_wrapper, adl_function, file_path[0], spec)
            except Empty:
                pass

    except Exception as e:
        import traceback
        logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e))
        exception_queue.put(traceback.format_exc())
    finally:
        function_thread_pool.shutdown()  # Blocking call. Will wait till all threads are done executing.
        logger.log(logging.DEBUG, "Finished processor pid: " + str(os.getpid()))