def _watch()

in src/sagemaker_training/intermediate_output.py [0:0]


def _watch(inotify, watchers, watch_flags, s3_uploader):
    """As soon as a user is done with a file under `/opt/ml/output/intermediate`
    we will be notified by inotify. We will copy this file under
    `/opt/ml/output/intermediate/.tmp.sagemaker_s3_sync` folder preserving
    the same folder structure to prevent it from being further modified.
    As we copy the file we will add timestamp with microseconds precision
    to avoid modification during s3 upload.
    After that we copy the file to s3 in a separate Thread.
    We keep the queue of the files we need to move as FIFO.
    """
    # initialize a thread pool with 1 worker
    # to be used for uploading files to s3 in a separate thread
    executor = futures.ThreadPoolExecutor(max_workers=1)

    last_pass_done = False
    stop_file_exists = False

    # after we see stop file do one additional pass to make sure we didn't miss anything
    while not last_pass_done:  # pylint: disable=too-many-nested-blocks
        # wait for any events in the directory for 1 sec and then re-check exit conditions
        for event in inotify.read(timeout=1000):
            for flag in inotify_simple.flags.from_mask(event.mask):
                # if new directory was created traverse the directory tree to recursively add all
                # created folders to the watchers list.
                # Upload files to s3 if there any files.
                # There is a potential race condition if upload the file and the see a notification
                # for it which should cause any problems because when we copy files to temp dir
                # we add a unique timestamp up to microseconds.
                if flag is inotify_simple.flags.ISDIR and inotify_simple.flags.CREATE & event.mask:
                    path = os.path.join(intermediate_path, watchers[event.wd], event.name)
                    for folder, _, files in os.walk(path):
                        wd = inotify.add_watch(folder, watch_flags)
                        relative_path = os.path.relpath(folder, intermediate_path)
                        watchers[wd] = relative_path
                        tmp_sub_folder = os.path.join(tmp_dir_path, relative_path)
                        if not os.path.exists(tmp_sub_folder):
                            os.makedirs(tmp_sub_folder)
                        for file in files:
                            _copy_file(executor, s3_uploader, relative_path, file)
                elif flag is inotify_simple.flags.CLOSE_WRITE:
                    _copy_file(executor, s3_uploader, watchers[event.wd], event.name)

        last_pass_done = stop_file_exists
        stop_file_exists = os.path.exists(success_file_path) or os.path.exists(failure_file_path)

    # wait for all the s3 upload tasks to finish and shutdown the executor
    executor.shutdown(wait=True)