python-cli/mft_cli/airavata_mft_cli/storage/gcs.py (169 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from rich import print as rprint from rich.panel import Panel from rich.text import Text from pick import pick import typer from airavata_mft_sdk import mft_client from airavata_mft_sdk.gcs import GCSCredential_pb2 from airavata_mft_sdk.gcs import GCSStorage_pb2 from airavata_mft_sdk import MFTTransferApi_pb2 from airavata_mft_sdk import MFTAgentStubs_pb2 from airavata_mft_sdk.common import StorageCommon_pb2 import json import configparser import os import base64 import google.auth import googleapiclient.discovery import sys sys.path.append('../airavata_mft_cli') from airavata_mft_cli import config as configcli gcs_key_path = '.mft/keys/gcs/service_account_key.json' def handle_add_storage(): options = ["Through Google Cloud SDK config file", "Enter manually"] option, index = pick(options, "How do you want to load credentials", indicator="=>") if index == 1: # Manual configuration text_msg = ("MFT uses service accounts to gain access to Google Cloud. " "\n 1) You can create a service account by going to https://console.cloud.google.com/iam-admin/serviceaccounts." "\n 2) Download the JSON format of the service account key." "\n 3) Use that downloaded Service Account Credential JSON file to access Google storage." "\nMore information about service accounts can be found here https://cloud.google.com/iam/docs/service-accounts") panel = Panel.fit(Text(text_msg, justify="left")) rprint(panel) credential_json_path = typer.prompt("Service Account Credential JSON path") with open(credential_json_path) as json_file: sa_entries = json.load(json_file) client_id = sa_entries['client_id'] client_email = sa_entries['client_email'] client_secret = sa_entries['private_key'] project_id = sa_entries['project_id'] else: # Loading credentials from the gcloud cli config file service_account = None client_email = None default_service_account_name = 'mft-gcs-serviceaccount' # Find the active config for Google cloud ADC active_config_path = os.path.join(os.path.expanduser('~'), '.config/gcloud/active_config') with open(active_config_path) as f: active_config = f.readline() print('Active config : ' + active_config) # Load default project config = configparser.RawConfigParser() default_project_path = os.path.join(os.path.expanduser('~'), '.config/gcloud/configurations/config_' + active_config) config.read(default_project_path) project_id = config['core']['project'] account_email = config['core']['account'] print('Project ID : ' + project_id) print(account_email) default_service_account_email = (default_service_account_name + '@' + project_id + '.iam.gserviceaccount.com') path = os.path.join(os.path.expanduser('~'), gcs_key_path) if os.path.exists(path): with open(path, 'r') as config_file: config_data = json.load(config_file) client_id = config_data['client_id'] client_email = config_data['client_email'] project_id = config_data['project_id'] client_secret = config_data['private_key'] print('Client email : ', client_email) if client_email is None or client_email != default_service_account_email: inferred_cred, inferred_project = google.auth.default(active_config) service_acc_list = list_service_accounts(project_id=project_id, credentials=inferred_cred) service_account_available = any(x['email'] == default_service_account_email for x in service_acc_list['accounts']) print('Service account availability : ', service_account_available) if service_account_available: # Already have a service account for mft, But no key get_service_account_key(credentials=inferred_cred, service_account_email=default_service_account_email) else: # No service account for mft # Read OAuth credentials and create the service account service_account = create_service_account(project_id, default_service_account_name, 'Airavata MFT Service Account', credentials=inferred_cred) # Load service account details again if os.path.exists(path): with open(path, 'r') as config_file: config_data = json.load(config_file) client_id = config_data['client_id'] client_email = config_data['client_email'] project_id = config_data['project_id'] client_secret = config_data['private_key'] else: print("No credential found in ~/" + gcs_key_path + " file") exit() client = mft_client.MFTClient(transfer_api_port = configcli.transfer_api_port, transfer_api_secured = configcli.transfer_api_secured, resource_service_host = configcli.resource_service_host, resource_service_port = configcli.resource_service_port, resource_service_secured = configcli.resource_service_secured, secret_service_host = configcli.secret_service_host, secret_service_port = configcli.secret_service_port) gcs_secret = GCSCredential_pb2.GCSSecret(clientEmail=client_email, privateKey=client_secret, projectId=project_id) secret_wrapper = MFTAgentStubs_pb2.SecretWrapper(gcs=gcs_secret) gcs_storage = GCSStorage_pb2.GCSStorage() storage_wrapper = MFTAgentStubs_pb2.StorageWrapper(gcs=gcs_storage) direct_req = MFTAgentStubs_pb2.GetResourceMetadataRequest(resourcePath="", secret=secret_wrapper, storage=storage_wrapper) resource_medata_req = MFTTransferApi_pb2.FetchResourceMetadataRequest(directRequest=direct_req) metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req) bucket_options = ["Manually Enter"] bucket_list = metadata_resp.directory.directories if len(bucket_list) > 0: for b in bucket_list: bucket_options.append(b.friendlyName) title = "Select the Bucket: " selected_bucket, index = pick(bucket_options, title, indicator="=>") if index == 0: selected_bucket = typer.prompt("Enter bucket name ") storage_name = typer.prompt("Name of the storage ", selected_bucket) gcs_storage_create_req = GCSStorage_pb2.GCSStorageCreateRequest(storageId=storage_name, bucketName=selected_bucket, name=storage_name) created_storage = client.gcs_storage_api.createGCSStorage(gcs_storage_create_req) secret_create_req = GCSCredential_pb2.GCSSecretCreateRequest(clientEmail=client_id, privateKey=client_secret, projectId=project_id) created_secret = client.gcs_secret_api.createGCSSecret(secret_create_req) secret_for_storage_req = StorageCommon_pb2.SecretForStorage(storageId=created_storage.storageId, secretId=created_secret.secretId, storageType=StorageCommon_pb2.StorageType.GCS) client.common_api.registerSecretForStorage(secret_for_storage_req) print("Successfully added the GCS Bucket...") def list_service_accounts(project_id, credentials): """Lists all service accounts for the current project.""" service = googleapiclient.discovery.build('iam', 'v1', credentials=credentials) service_accounts = service.projects().serviceAccounts().list(name='projects/' + project_id).execute() # for account in service_accounts['accounts']: # print('Name: ' + account['name']) # print('Email: ' + account['email']) # print(' ') return service_accounts def create_service_account(project_id, name, display_name, credentials): """Creates a service account.""" service = googleapiclient.discovery.build('iam', 'v1', credentials=credentials) my_service_account = service.projects().serviceAccounts().create( name='projects/' + project_id, body={ 'accountId': name, 'serviceAccount': { 'displayName': display_name } }).execute() print('Created service account: ' + my_service_account['email']) resource_service = googleapiclient.discovery.build('cloudresourcemanager', 'v1', credentials=credentials) policy = resource_service.projects().getIamPolicy(resource=project_id).execute() account_handle = f"serviceAccount:{my_service_account['email']}" # Add policy modified = False roles = [role["role"] for role in policy["bindings"]] target_role = "roles/storage.admin" # Service account role which can transfer GCS files if target_role not in roles: for role in policy["bindings"]: if role["role"] == target_role: if account_handle not in role["members"]: role["members"].append(account_handle) modified = True else: # role does not exist policy["bindings"].append({"role": target_role, "members": [account_handle]}) modified = True if modified: # execute policy change resource_service.projects().setIamPolicy(resource=project_id, body={"policy": policy}).execute() # Generate a Service account key get_service_account_key(credentials, my_service_account['email']) return my_service_account def get_service_account_key(credentials, service_account_email): """Creates a key for a service account.""" key_path = os.path.join(os.path.expanduser('~'), gcs_key_path) service = googleapiclient.discovery.build('iam', 'v1', credentials=credentials) # write key file if not os.path.exists(key_path): # list existing keys keys = service.projects().serviceAccounts().keys().list(name="projects/-/serviceAccounts/" + service_account_email).execute() # cannot have more than 10 keys per service account if len(keys["keys"]) >= 10: raise ValueError(f"Service account {service_account_email} has too many keys. Make sure to copy keys to {key_path} or create a new service account.") # create key key = (service.projects().serviceAccounts().keys().create(name="projects/-/serviceAccounts/" + service_account_email, body={}).execute()) print("New Key generated ...") # create service key files os.makedirs(os.path.dirname(key_path), exist_ok=True) json_key_file = base64.b64decode(key["privateKeyData"]).decode("utf-8") open(key_path, "w").write(json_key_file) else: print("Please backup the existing in service_account_key file ~/" + gcs_key_path + " to create new key file") exit() return key_path