in src/sagemaker_training/intermediate_output.py [0:0]
def _watch(inotify, watchers, watch_flags, s3_uploader):
"""As soon as a user is done with a file under `/opt/ml/output/intermediate`
we will be notified by inotify. We will copy this file under
`/opt/ml/output/intermediate/.tmp.sagemaker_s3_sync` folder preserving
the same folder structure to prevent it from being further modified.
As we copy the file we will add timestamp with microseconds precision
to avoid modification during s3 upload.
After that we copy the file to s3 in a separate Thread.
We keep the queue of the files we need to move as FIFO.
"""
# initialize a thread pool with 1 worker
# to be used for uploading files to s3 in a separate thread
executor = futures.ThreadPoolExecutor(max_workers=1)
last_pass_done = False
stop_file_exists = False
# after we see stop file do one additional pass to make sure we didn't miss anything
while not last_pass_done: # pylint: disable=too-many-nested-blocks
# wait for any events in the directory for 1 sec and then re-check exit conditions
for event in inotify.read(timeout=1000):
for flag in inotify_simple.flags.from_mask(event.mask):
# if new directory was created traverse the directory tree to recursively add all
# created folders to the watchers list.
# Upload files to s3 if there any files.
# There is a potential race condition if upload the file and the see a notification
# for it which should cause any problems because when we copy files to temp dir
# we add a unique timestamp up to microseconds.
if flag is inotify_simple.flags.ISDIR and inotify_simple.flags.CREATE & event.mask:
path = os.path.join(intermediate_path, watchers[event.wd], event.name)
for folder, _, files in os.walk(path):
wd = inotify.add_watch(folder, watch_flags)
relative_path = os.path.relpath(folder, intermediate_path)
watchers[wd] = relative_path
tmp_sub_folder = os.path.join(tmp_dir_path, relative_path)
if not os.path.exists(tmp_sub_folder):
os.makedirs(tmp_sub_folder)
for file in files:
_copy_file(executor, s3_uploader, relative_path, file)
elif flag is inotify_simple.flags.CLOSE_WRITE:
_copy_file(executor, s3_uploader, watchers[event.wd], event.name)
last_pass_done = stop_file_exists
stop_file_exists = os.path.exists(success_file_path) or os.path.exists(failure_file_path)
# wait for all the s3 upload tasks to finish and shutdown the executor
executor.shutdown(wait=True)