in src/sagemaker_xgboost_container/checkpointing.py [0:0]
def start(self):
"""Start a background thread that deletes old checkpoints
To delete stale checkpoints, we use a producer-consumer pattern: we
start a daemon thread in the background and maintain a queue of files
to delete.
There may be a lock on the file if SageMaker is uploading the file; in
that case, the file is put back on the queue and we try again later.
When training is complete, we put SENTINEL on the queue, and when we
see the SENTINEL, we clean up and exit the thread.
"""
def _is_uploading(path):
uploading = os.path.isfile(path + FILE_LOCK_SUFFIX)
uploaded = os.path.isfile(path + FILE_SAFE_SUFFIX)
return uploading and not uploaded
def _should_skip(path):
return not os.path.isfile(path) or path in self.previous_checkpoints
def _remove(path):
try:
os.remove(path)
except Exception:
logger.debug("Failed to delete %s", path)
finally:
self.delete_queue.task_done()
def _delete_uploaded_files():
for iteration in iter(self.delete_queue.get, self.SENTINEL):
path = self.format_path(iteration)
if _should_skip(path):
self.delete_queue.task_done()
continue
# If SageMaker is still uploading the file, we put the file back on the
# queue and try again later. In order to avoid file corruption, we make
# best attempt to not delete any files that are still being uploaded.
if _is_uploading(path):
self.delete_queue.put(iteration)
continue
_remove(path)
self.delete_queue.task_done()
def _cleanup():
# Here, we've reached the end of training because we place sentinel in the
# queue at the end of training. We put another sentinel, go through everything
# in the queue once again, and try to remove it anyway whether there is a lock on
# the file or not, because the training is done. On sagemaker, this should send
# a delete signal to the agent so that the upload can be canceled and removed
# from S3, if there are anything remaining in the queue. In normal cases,
# _cleanup() exits almost immediately and does not do anything.
self.delete_queue.put(self.SENTINEL)
for iteration in iter(self.delete_queue.get, self.SENTINEL):
path = self.format_path(iteration)
_remove(path)
self.delete_queue.task_done()
def _delete_uploaded_files_and_cleanup():
_delete_uploaded_files()
_cleanup()
self.thread = threading.Thread(
target=_delete_uploaded_files_and_cleanup,
daemon=True)
self.thread.start()