in azure/datalake/store/multiprocessor.py [0:0]
def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, acl_spec, log_queue, exception_queue):
logger = logging.getLogger("azure.datalake.store")
logger.setLevel(logging.DEBUG)
removed_default_acl_spec = ",".join([x for x in acl_spec.split(',') if not x.lower().startswith("default")])
try:
logger.addHandler(logging.handlers.QueueHandler(log_queue))
logger.propagate = False # Prevents double logging
except AttributeError:
# Python 2 doesn't have Queue Handler. Default to best effort logging.
pass
try:
func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries}
function_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
adl_function = func_table[method_name]
logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid()))
def func_wrapper(func, path, spec):
try:
func(path=path, acl_spec=spec)
except FileNotFoundError:
logger.exception("File "+str(path)+" not found")
# Complete Exception is being logged in the relevant acl method. Don't print exception here
except Exception as e:
logger.exception("File " + str(path) + " not set. Exception "+str(e))
logger.log(logging.DEBUG, "Completed running on path:" + str(path))
while finish_queue_processing_flag.is_set() == False:
try:
file_paths = file_path_queue.get(timeout=0.1)
file_path_queue.task_done() # Will not be called if empty
for file_path in file_paths:
is_file = file_path[1]
if is_file:
spec = removed_default_acl_spec
else:
spec = acl_spec
logger.log(logging.DEBUG, "Starting on path:" + str(file_path))
function_thread_pool.submit(func_wrapper, adl_function, file_path[0], spec)
except Empty:
pass
except Exception as e:
import traceback
logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e))
exception_queue.put(traceback.format_exc())
finally:
function_thread_pool.shutdown() # Blocking call. Will wait till all threads are done executing.
logger.log(logging.DEBUG, "Finished processor pid: " + str(os.getpid()))