migration/emr/emr_migration.py (59 lines of code) (raw):
import argparse
import boto3
import os
import shutil
from migration.utils.datazone_helper import get_project_repo
from migration.utils.emr_helper import get_emr_workspace_storage_location
from migration.utils.s3_helper import download_s3_directory_recursive
def upload_notebooks(local_folder, domain_id, project_id, emr_studio_id, emr_workspace_id, region):
if not local_folder:
print("No local folder provided. Skipping notebook upload.")
return
else:
if not emr_studio_id or not emr_workspace_id:
raise ValueError("EMR Studio ID and Workspace ID are required when uploading notebooks")
if not os.path.exists(local_folder):
raise ValueError(f"Local folder {local_folder} does not exist")
repo = get_project_repo(domain_id, project_id, region)
print(f"Uploading notebook from local folder {local_folder} to CodeCommit repo {repo}...")
code_commit = boto3.client('codecommit', region_name=region)
branch = "main"
putFilesList = []
for (root, folders, files) in os.walk(local_folder):
for file in files:
file_path = os.path.join(root, file)
print("Local file: " + file_path)
# If the file_path has '.git', then ignore it, because it will cause git pull to fail.
if ".git" in file_path:
print("Ignoring file: " + file_path)
continue
print("Uploading to: " + str(file_path).replace(local_folder, f'emr_notebooks/{emr_studio_id}/{emr_workspace_id}'))
with open(file_path, mode='r+b') as file_obj:
file_content = file_obj.read()
putFileEntry = {
'filePath': str(file_path).replace(local_folder, f'emr_notebooks/{emr_studio_id}/{emr_workspace_id}'),
'fileContent': file_content
}
putFilesList.append(putFileEntry)
parent_commit_id = code_commit.get_branch(repositoryName=repo, branchName=branch).get("branch").get("commitId")
code_commit.create_commit(
repositoryName=repo,
branchName=branch,
parentCommitId=parent_commit_id,
putFiles=putFilesList
)
print(f"Uploaded notebook from local folder {local_folder} to CodeCommit repo {repo}.")
if __name__ == '__main__':
# Create an ArgumentParser object
parser = argparse.ArgumentParser(description='Migrate EMR workspace notebooks to a SageMaker Unified Studio project')
# Add arguments
parser.add_argument('--domain-id', type=str, required=True, help='ID of the SageMaker Unified Studio Domain')
parser.add_argument('--project-id', type=str, required=True, help='Project ID in the SageMaker Unified Studio Domain')
parser.add_argument('--emr-studio-id', type=str, help='Id for EMR Studio. Format es-XXXX')
parser.add_argument('--emr-workspace-id', type=str, help='Id for EMR studio workspace. Format is e-YYYY')
parser.add_argument('--region', type=str, required=True, help='AWS region')
# Parse the arguments
args = parser.parse_args()
local_path = "DELEME_ME_downloaded_emr_workspace_files"
workspace_s3_uri = get_emr_workspace_storage_location(args.emr_workspace_id, args.region)
download_s3_directory_recursive(workspace_s3_uri, local_path)
upload_notebooks(local_path, args.domain_id, args.project_id, args.emr_studio_id, args.emr_workspace_id, args.region)
# Clean up the downloaded files
print("Cleaning up downloaded files...")
shutil.rmtree(local_path)
print("Done")