in smdebug/core/state_store.py [0:0]
def is_checkpoint_updated(self):
"""
Check whether new checkpoint files got added or existing checkpoint files that are
stored got updated.
"""
if self._checkpoint_dir is not None:
checkpoint_files = self._get_checkpoint_files_in_dir(self._checkpoint_dir)
if not checkpoint_files:
logger.debug(
"Checkpoints not updated. There are no checkpoint files created yet, to be updated"
)
return False
timestamps = []
for file in checkpoint_files:
try:
timestamps.append(os.path.getmtime(file))
except FileNotFoundError as e:
timestamps.append(0)
logger.debug(e)
logger.info(
f"Timestamps of different checkpoint files {[i for i in zip(checkpoint_files, timestamps)]}"
)
if len(self._last_seen_checkpoint_files) != len(checkpoint_files):
self._last_seen_checkpoint_files = checkpoint_files
for file in checkpoint_files:
try:
sz = os.path.getsize(file)
self._last_seen_cp_files_size.append(sz)
except FileNotFoundError as e:
self._last_seen_cp_files_size.append(0)
logger.debug(e)
logger.info(
f"sizes of different checkpoint files {[i for i in zip(checkpoint_files, self._last_seen_cp_files_size)]}"
)
return True
# check for each file if file size has changed
cp_file_sizes = []
for file in checkpoint_files:
try:
cp_file_sizes.append(os.path.getsize(file))
except FileNotFoundError as e:
cp_file_sizes.append(0)
logger.warning(e)
i = 0
for size in cp_file_sizes:
if size != self._last_seen_cp_files_size[i]:
self._last_seen_cp_files_size = cp_file_sizes
self._last_seen_checkpoint_files = checkpoint_files
logger.info(
f"sizes of different checkpoint files {[i for i in zip(checkpoint_files, self._last_seen_cp_files_size)]}"
)
return True
i += 1
# check if actual seen files has changed
i = 0
for file in checkpoint_files:
if file != self._last_seen_checkpoint_files[i]:
self._last_seen_checkpoint_files = checkpoint_files
for file in checkpoint_files:
try:
self._last_seen_cp_files_size.append(os.path.getsize(file))
except FileNotFoundError as e:
self._last_seen_cp_files_size.append(0)
logger.warning(e)
logger.info(
f"sizes of different checkpoint files {[i for i in zip(checkpoint_files, self._last_seen_cp_files_size)]}"
)
return True
i += 1
return False