kubernetes/storage/parallelstore-transfer-tool/main.py (131 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.cloud import parallelstore_v1 import logging import os import sys import argparse import time # Configure logging logging.basicConfig(stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) def import_data_to_parallelstore( gcs_bucket, path, instance, location, request_id, project_id ): """Imports data from GCS to Parallelstore.""" client = parallelstore_v1.ParallelstoreClient() source_gcs_bucket = parallelstore_v1.SourceGcsBucket() source_gcs_bucket.uri = f"gs://{gcs_bucket}" request = parallelstore_v1.ImportDataRequest( source_gcs_bucket=source_gcs_bucket, name=f"projects/{project_id}/locations/{location}/instances/{instance}", ) if request_id is not None: request.request_id = request_id if path != "/": request.destination_parallelstore = path operation = client.import_data(request=request) log_operation_start( "Import", gcs_bucket, instance, location, project_id, request_id ) wait_for_operation(operation) if operation.done(): response = operation.result() logger.info(f"Operation succeeded: {response}") def export_data_to_gcs(gcs_bucket, path, instance, location, request_id, project_id): """Exports data from Parallelstore to GCS.""" client = parallelstore_v1.ParallelstoreClient() destination_gcs_bucket = parallelstore_v1.DestinationGcsBucket() destination_gcs_bucket.uri = f"gs://{gcs_bucket}" request = parallelstore_v1.ExportDataRequest( name=f"projects/{project_id}/locations/{location}/instances/{instance}", destination_gcs_bucket=destination_gcs_bucket, ) if request_id is not None: request.request_id = request_id if path != "/": request.destination_parallelstore = path operation = client.export_data(request=request) log_operation_start( "Export", gcs_bucket, instance, location, project_id, request_id ) wait_for_operation(operation) if operation.done(): response = operation.result() logger.info(f"Operation succeeded: {response}") def log_operation_start( operation_type, gcs_bucket, instance, location, project_id, request_id ): """Logs the start of an import/export operation.""" logger.info(f"Starting {operation_type}") logger.info(f"GCS Bucket: {gcs_bucket}") logger.info(f"Parallelstore Instance: {instance}") logger.info(f"Location: {location}") logger.info(f"Project ID: {project_id}") logger.info(f"Request ID: {request_id}") def wait_for_operation(operation): """Waits for a long-running operation to complete.""" while not operation.done(): logger.info("Waiting for operation to complete...") time.sleep(10) def main(): parser = argparse.ArgumentParser( description="Import or Export data between Parallelstore and GCS", epilog="Example usage: python import_data.py --mode import --gcsbucket my-bucket-name --instance my-instance-name --location us-central1-a", ) parser.add_argument( "--mode", required=True, help="Import or Export data to / from a Parallelstore Instance", ) parser.add_argument( "--gcsbucket", required=False, help="specifies the URI to a Cloud Storage bucket, or a path within a bucket, using the format gs://<bucket_name>/<optional_path_inside_bucket>", ) parser.add_argument( "--path", required=False, default="/", help="Root directory path to the Parallelstore file system", ) parser.add_argument( "--instance", required=False, help="Parallelstore instance name" ) parser.add_argument( "--location", required=False, help="Parallelstore location, must be a supported zone.", ) parser.add_argument("--project-id", required=False, help="Project ID") parser.add_argument( "--request-id", required=False, default=None, help="allows you to assign a unique ID to this request. If you retry this request using the same request ID, the server will ignore the request if it has already been completed. Must be a valid UUID that is not all zeros.", ) args = parser.parse_args() # Get values from environment variables or command-line arguments gcs_bucket = os.environ.get("GCS_BUCKET", args.gcsbucket) path = os.environ.get("PARALLELSTORE_PATH", args.path) instance = os.environ.get("PARALLELSTORE_INSTANCE", args.instance) location = os.environ.get("PARALLELSTORE_LOCATION", args.location) project_id = os.environ.get("PROJECT_ID", args.project_id) request_id = os.environ.get("REQUEST_ID", args.request_id) # Check for missing values missing_args = [] for arg_name, arg_value in [ ("GCS_BUCKET", gcs_bucket), ("PARALLELSTORE_INSTANCE", instance), ("PARALLELSTORE_LOCATION", location), ("project_id", project_id), ]: if arg_value is None: missing_args.append(arg_name) if missing_args: logger.error(f"Error: Missing required arguments: {', '.join(missing_args)}") logger.error( "Please provide them through command-line arguments or environment variables." ) exit(1) # Exit with an error code if args.mode == "import": import_data_to_parallelstore( gcs_bucket, path, instance, location, request_id, project_id ) elif args.mode == "export": export_data_to_gcs(gcs_bucket, path, instance, location, request_id, project_id) else: logger.error("Missing operation mode") if __name__ == "__main__": main()