in scripts/share_ad_job_state/export_model_snapshot.py [0:0]
def create_archive(job_id: str, files: List[Optional[str]]) -> None:
"""
Creates a tar.gz archive containing the specified files.
Args:
job_id (str): The ID of the job.
files (List[Optional[str]]): List of file paths to include in the archive.
"""
safe_job_id = sanitize_filename(job_id)
archive_name = f"{safe_job_id}_state.tar.gz"
try:
with tarfile.open(archive_name, "w:gz") as tar:
for file in files:
if file and os.path.exists(file):
tar.add(file, arcname=os.path.basename(file))
logger.info(f"Added {file} to archive {archive_name}")
else:
logger.warning(f"File {file} not found, skipping.")
logger.info(f"Archive {archive_name} created successfully.")
except IOError as e:
logger.error(f"Failed to create archive: {e}")
finally:
# Remove the archived files
logger.info("Removing temporary files")
for file in files:
if file and os.path.exists(file):
os.remove(file)