in src/data_load/load.py [0:0]
def load_files(dir_name):
logger.info("Starting load_files function")
n_cores = multiprocessing.cpu_count()
logger.info(f"Number of CPU cores: {n_cores}")
if os.getenv("WORKERS") is not None:
n_jobs = int(os.getenv("WORKERS"))
logger.info(f"Using WORKERS environment variable for number of jobs: {n_jobs}")
else:
n_jobs = 2 * n_cores
logger.info(f"Calculated number of jobs: {n_jobs}")
dir_size = get_size_format(get_directory_size(dir_name))
logger.info(f"Amount of data to transfer: {dir_size}")
logger.info(f"Request worker count: {n_jobs}")
logger.info(f"Max Chunk size: {MAX_CHUNK_SIZE} MB")
results = []
sessions = [requests_retry_session() for i in range(n_jobs)]
logger.info(f"Created {n_jobs} sessions")
for root, _, files in os.walk(dir_name):
logger.info(f"Processing directory: {root}")
files = [f for f in files if re.match(includes, f)]
logger.info(f"Total number of files to upload: {len(files)}")
with concurrent.futures.ThreadPoolExecutor(max_workers=n_jobs) as executor:
logger.info(f"Submitting {len(files)} files to the executor")
future_result = {executor.submit(
load_single_file, sessions[i % n_jobs], root, files[i]): i for i in range(0, len(files))}
logger.info(f"Submitted {len(files)} files to the executor")
for future in concurrent.futures.as_completed(future_result):
file_index = future_result[future]
try:
result = future.result()
if result[0] == FILE_UPLOAD_FAILED:
file_path = result[1]
logger.error(f"File upload failed for {file_path}")
else:
file_path, metadata = result[1]
logger.info(f"File upload succeeded for {file_path}")
logger.debug(f"Metadata for {file_path}: {metadata}")
results.append(result)
except Exception as e:
logger.error(f"Exception occurred for file index {file_index}, Exception: {e}")
failed = []
success = {}
for result in results:
if result[0] == FILE_UPLOAD_FAILED:
failed.append(result[1])
logger.error(f"File upload failed: {result[1]}")
else:
file, metadata = result[1]
success[file] = metadata
logger.info(f"File upload succeeded: {file}")
logger.info("Completed load_files function")
return success, failed