community-content/vertex_model_garden/model_oss/util/gcs_syncer.py (98 lines of code) (raw):

"""Sync local directory to GCS directory using rsync.""" import multiprocessing import os import subprocess import time from typing import Optional, Sequence, Tuple from absl import logging from util import constants from util import fileutils _GCS_COMMAND_RETRIES = 3 _RSYNC_RETRY_INTERVAL_SECS = 30 def is_gcs_or_gcsfuse_path(path: str) -> bool: """Returns if the path is a GCS or gcsfuse path. Args: path: The path to check. Returns: True if the path is a GCS or gcsfuse path. """ return path.startswith( (constants.GCS_URI_PREFIX, constants.GCSFUSE_URI_PREFIX) ) def manage_sync_path( path: str, node_rank: Optional[int] = None ) -> Tuple[str, str]: """Returns local dir and GCS location for the given path if the given path is a GCS or gcsfuse path. It will also create a local directory if it does not exist. Otherwise, it returns the same path. Args: path: The local or GCS path to manage. node_rank: The node rank to be appended to the GCS path. Returns: The local and GCS paths. """ local_dir = path gcs_dir = path if is_gcs_or_gcsfuse_path(path): local_dir = os.path.join( constants.LOCAL_OUTPUT_DIR, fileutils.force_gcs_fuse_path(path)[1:], ) gcs_dir = fileutils.force_gcs_path(path) if not os.path.exists(local_dir): os.makedirs(local_dir, exist_ok=True) if node_rank is None: return local_dir, gcs_dir return local_dir, os.path.join(gcs_dir, f"node-{node_rank}") def setup_gcs_rsync( dirs_to_sync: Sequence[Tuple[str, str]], mp_queue: multiprocessing.Queue, gcs_rsync_interval_secs: int, ) -> multiprocessing.Process: """Sets up the GCS rsync process. Args: dirs_to_sync: The absolute directory paths which will be synced to GCS. mp_queue: The multiprocessing queue to check if the training is finished. gcs_rsync_interval_secs: Integer, interval in seconds to run gcs rsync. Returns: The GCS rsync process. """ rsync_process = multiprocessing.Process( target=start_gcs_rsync, args=(dirs_to_sync, mp_queue, gcs_rsync_interval_secs), ) rsync_process.start() return rsync_process def cleanup_gcs_rsync( rsync_process: multiprocessing.Process, mp_queue: multiprocessing.Queue ) -> None: """Cleans up the GCS rsync process. Args: rsync_process: The GCS rsync process. mp_queue: The multiprocessing queue. """ mp_queue.put("finish rsync process") rsync_process.join() if rsync_process.exitcode == 0: logging.info("Artifacts have been uploaded to GCS.") else: logging.error( "GCS rsync process failed with exit code %d.", rsync_process.exitcode ) def _rsync_local_to_gcs(local_dir: str, gcs_dir: str) -> None: """Syncs the local directory to GCS. Args: local_dir: The local directory to sync. gcs_dir: The GCS directory to sync to. """ if not os.listdir(local_dir): logging.info("Not rsyncing to GCS since %s is empty.", local_dir) return logging.info("Rsyncing %s <--> %s...", local_dir, gcs_dir) cmd = [ "gcloud", "storage", "rsync", "-r", "--delete-unmatched-destination-objects", ] cmd.extend([local_dir, gcs_dir]) attempt = 0 while attempt < _GCS_COMMAND_RETRIES: try: subprocess.check_output(cmd) break except subprocess.CalledProcessError as e: attempt += 1 if attempt < _GCS_COMMAND_RETRIES: logging.exception( "Attempt %d: Command failed: %s. Retrying in %d seconds...", attempt, e, _RSYNC_RETRY_INTERVAL_SECS, ) time.sleep(_RSYNC_RETRY_INTERVAL_SECS) else: logging.exception( "Command failed after %d attempts: %s.", e, _GCS_COMMAND_RETRIES ) logging.info("%s rsynced to %s.", local_dir, gcs_dir) def start_gcs_rsync( dirs_to_sync: Sequence[Tuple[str, str]], mp_queue: multiprocessing.Queue, gcs_rsync_interval_secs: int, ) -> None: """Starts a rsync process to sync local directories to GCS directories. Args: dirs_to_sync: A list of tuples, where each tuple contains local directory which will be synced to GCS. For example: [('/tmp/local_dir_1', 'gs://bucket/gcs_dir_1'), ('/tmp/local_dir_2', 'gs://bucket/gcs_dir_2')] mp_queue: The multiprocessing queue to check if the training is finished. gcs_rsync_interval_secs: Integer, interval in seconds to run gcs rsync. """ while True: for local_dir, gcs_dir in dirs_to_sync: _rsync_local_to_gcs(local_dir, gcs_dir) if not mp_queue.empty(): break time.sleep(gcs_rsync_interval_secs) # Sync up the directory one more time to avoid a race condition. # There can be a case when we are doing an rsync and receive a signal that # the training has been done. The final checkpoint will be skipped in such # case. So we do a final sync to make sure that the all directories # are synced. for local_dir, gcs_dir in dirs_to_sync: _rsync_local_to_gcs(local_dir, gcs_dir)