in petastorm/spark/spark_dataset_converter.py [0:0]
def _wait_file_available(url_list):
"""Waiting about _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS seconds (default 30 seconds) to make sure
all files are available for reading. This is useful in some filesystems, such as S3 which only
providing eventually consistency.
"""
fs, path_list = get_filesystem_and_path_or_paths(url_list)
logger.debug('Waiting some seconds until all parquet-store files appear at urls %s', ','.join(url_list))
def wait_for_file(path):
end_time = time.time() + _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS
while time.time() < end_time:
if fs.exists(path):
return True
time.sleep(0.1)
return False
pool = ThreadPool(64)
try:
results = pool.map(wait_for_file, path_list)
failed_list = [url for url, result in zip(url_list, results) if not result]
if failed_list:
raise RuntimeError('Timeout while waiting for all parquet-store files to appear at urls {failed_list},'
'Please check whether these files were saved successfully when materializing dataframe.'
.format(failed_list=','.join(failed_list)))
finally:
pool.close()
pool.join()