infrastructure-provisioning/src/general/lib/gcp/actions_lib.py (1,501 lines of code) (raw):

# ***************************************************************************** # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # ****************************************************************************** import ast import backoff import datalab.common_lib import datalab.fab import datalab.meta_lib import google.auth import json import logging import os import random import sys import time import traceback import urllib3 import urllib.request import subprocess from Crypto.PublicKey import RSA from datalab.fab import * from fabric import * from google.cloud import exceptions from google.cloud import storage from googleapiclient import errors from googleapiclient.discovery import build class GCPActions: def __init__(self, auth_type='service_account'): @backoff.on_exception(backoff.expo, google.auth.exceptions.DefaultCredentialsError, max_tries=15) def get_gcp_cred(): credentials, project = google.auth.default() return credentials, project self.auth_type = auth_type self.project = os.environ['gcp_project_id'] if os.environ['conf_resource'] == 'ssn': os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/root/service_account.json" credentials, project = google.auth.default() if credentials.requires_scopes: credentials = credentials.with_scopes( ['https://www.googleapis.com/auth/compute', 'https://www.googleapis.com/auth/iam', 'https://www.googleapis.com/auth/cloud-platform']) self.service = build('compute', 'v1', credentials=credentials) self.service_iam = build('iam', 'v1', credentials=credentials) self.dataproc = build('dataproc', 'v1', credentials=credentials) self.service_storage = build('storage', 'v1', credentials=credentials) self.storage_client = storage.Client(project=project, credentials=credentials) self.service_resource = build('cloudresourcemanager', 'v1', credentials=credentials) else: credentials, project = get_gcp_cred() self.service = build('compute', 'v1', credentials=credentials) self.service_iam = build('iam', 'v1', credentials=credentials) self.dataproc = build('dataproc', 'v1', credentials=credentials) self.service_storage = build('storage', 'v1', credentials=credentials) self.storage_client = storage.Client(project=project, credentials=credentials) self.service_resource = build('cloudresourcemanager', 'v1', credentials=credentials) def create_vpc(self, vpc_name): network_params = {'name': vpc_name, 'autoCreateSubnetworks': False} request = self.service.networks().insert(project=self.project, body=network_params) try: print("Create VPC {}".format(vpc_name)) result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print("VPC {} has been created".format(vpc_name)) return result except Exception as err: logging.info( "Unable to create VPC: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create VPC", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_vpc(self, vpc_name): request = self.service.networks().delete(project=self.project, network=vpc_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print("VPC {} has been removed".format(vpc_name)) return result except Exception as err: logging.info( "Unable to remove VPC: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove VPC", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_subnet(self, subnet_name, subnet_cidr, vpc_selflink, region): subnetwork_params = { 'name': subnet_name, 'ipCidrRange': subnet_cidr, 'privateIpGoogleAccess': 'true', 'network': vpc_selflink } request = self.service.subnetworks().insert( project=self.project, region=region, body=subnetwork_params) try: print("Create subnet {}".format(subnet_name)) result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region) print("Subnet {} has been created".format(subnet_name)) return result except Exception as err: logging.info( "Unable to create Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create Subnet", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_subnet(self, subnet_name, region): request = self.service.subnetworks().delete(project=self.project, region=region, subnetwork=subnet_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region) print("Subnet {} has been removed".format(subnet_name)) return result except Exception as err: logging.info( "Unable to remove Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove Subnet", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_firewall(self, firewall_params): request = self.service.firewalls().insert(project=self.project, body=firewall_params) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print('Firewall {} created.'.format(firewall_params['name'])) return result except Exception as err: logging.info( "Unable to create Firewall: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create Firewall", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_firewall(self, firewall_name): request = self.service.firewalls().delete(project=self.project, firewall=firewall_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print('Firewall {} removed.'.format(firewall_name)) return result except Exception as err: logging.info( "Unable to remove Firewall: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove Firewall", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_nat_route(self, nat_route_params): request = self.service.routes().insert(project=self.project, body=nat_route_params) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print('NAT route {} created.'.format(nat_route_params['name'])) return result except Exception as err: logging.info( "Unable to create NAT route: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create NAT route", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def delete_nat_route(self, nat_route_name): request = self.service.routes().delete(project=self.project, route=nat_route_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print('NAT route {} deleteed.'.format(nat_route_name)) return result except Exception as err: logging.info( "Unable to delete NAT route: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to delete NAT route", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_bucket(self, bucket_name): try: bucket = self.storage_client.create_bucket(bucket_name) print('Bucket {} created.'.format(bucket.name)) except Exception as err: logging.info( "Unable to create Bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create Bucket", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def add_bucket_labels_vers_cmek(self, bucket_name, tags, versioning_enabled='false', cmek_resource_name='', lifecycle_rules=''): try: bucket = self.storage_client.get_bucket(bucket_name) labels = bucket.labels labels.update(tags) bucket.labels = labels bucket.versioning = {"enabled": versioning_enabled} if cmek_resource_name != '': bucket.encryption = {"defaultKmsKeyName": cmek_resource_name} if lifecycle_rules != '': bucket.lifecycle_rules = ast.literal_eval(lifecycle_rules) bucket.patch() print('Updated labels on {}.'.format(bucket_name)) except Exception as err: logging.info( "Unable to create Bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create Bucket", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_bucket(self, bucket_name): try: GCPActions().bucket_cleanup(bucket_name) storage_resource = storage.Bucket(self.storage_client, bucket_name) storage_resource.delete(force=True) print('Bucket {} removed.'.format(bucket_name)) except Exception as err: logging.info( "Unable to remove Bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove Bucket", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def bucket_cleanup(self, bucket_name, user_name='', cluster_name=''): try: prefix = '' bucket = self.storage_client.get_bucket(bucket_name) if user_name != '': prefix = '{0}/{1}'.format(user_name, cluster_name) list_files = bucket.list_blobs(prefix=prefix) for item in list_files: print("Deleting:{}".format(item.name)) blob = bucket.blob(item.name) blob.delete() except Exception as err: logging.info( "Unable to remove files from bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove files from bucket", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_disk(self, instance_name, zone, size, secondary_image_name, rsa_encrypted_csek=''): try: if secondary_image_name == 'None': params = {"sizeGb": size, "name": instance_name + '-secondary', "type": "projects/{0}/zones/{1}/diskTypes/pd-ssd".format(self.project, zone)} if rsa_encrypted_csek: params['diskEncryptionKey'] = {"rsaEncryptedKey": rsa_encrypted_csek} else: params = {"sizeGb": size, "name": instance_name + '-secondary', "type": "projects/{0}/zones/{1}/diskTypes/pd-ssd".format(self.project, zone), "sourceImage": secondary_image_name} if rsa_encrypted_csek: params["sourceImageEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} params['diskEncryptionKey'] = {"rsaEncryptedKey": rsa_encrypted_csek} request = self.service.disks().insert(project=self.project, zone=zone, body=params) result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone) print('Disk {}-secondary created.'.format(instance_name)) return request except Exception as err: logging.info( "Unable to create disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to create disk", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_disk(self, instance_name, zone): try: request = self.service.disks().delete(project=self.project, zone=zone, disk=instance_name + '-secondary') try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone) print('Disk {}-secondary removed.'.format(instance_name)) except errors.HttpError as err: if err.resp.status == 404: print('Disk {}-secondary was not found. Skipped'.format(instance_name)) return request else: raise err return request except Exception as err: logging.info( "Unable to remove disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove disk", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_instance(self, instance_name, service_base_name, cluster_name, region, zone, vpc_name, subnet_name, instance_size, ssh_key_path, initial_user, image_name, secondary_image_name, service_account_name, instance_class, network_tag, labels, static_ip='', primary_disk_size='12', secondary_disk_size='30', gpu_accelerator_type='None', gpu_accelerator_count='1', os_login_enabled='FALSE', block_project_ssh_keys='FALSE', rsa_encrypted_csek=''): key = RSA.importKey(open(ssh_key_path, 'rb').read()) ssh_key = key.publickey().exportKey("OpenSSH").decode('UTF-8') unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name) service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project) access_configs = '' if instance_class == 'edge': ip_forward = True else: ip_forward = False if instance_class == 'ssn' or instance_class == 'edge': access_configs = [{ "type": "ONE_TO_ONE_NAT", "name": "External NAT", "natIP": static_ip }] if instance_class == 'notebook': GCPActions().create_disk(instance_name, zone, secondary_disk_size, secondary_image_name, rsa_encrypted_csek) disks = [ { "name": instance_name, "tag_name": instance_name + '-volume-primary', "deviceName": instance_name + '-primary', "autoDelete": "true", "boot": "true", "mode": "READ_WRITE", "type": "PERSISTENT", "initializeParams": { "diskSizeGb": primary_disk_size, "sourceImage": image_name } }, { "name": instance_name + '-secondary', "tag_name": instance_name + '-volume-secondary', "deviceName": instance_name + '-secondary', "autoDelete": "true", "boot": "false", "mode": "READ_WRITE", "type": "PERSISTENT", "interface": "SCSI", "source": "projects/{0}/zones/{1}/disks/{2}-secondary".format(self.project, zone, instance_name) } ] elif instance_class == 'dataengine': GCPActions().create_disk(instance_name, zone, secondary_disk_size, secondary_image_name, rsa_encrypted_csek) disks = [{ "name": instance_name, "tag_name": cluster_name + '-volume-primary', "deviceName": cluster_name + '-primary', "autoDelete": 'true', "initializeParams": { "diskSizeGb": primary_disk_size, "sourceImage": image_name }, "boot": 'true', "mode": "READ_WRITE" }, { "name": instance_name + '-secondary', "tag_name": instance_name + '-volume-secondary', "deviceName": instance_name + '-secondary', "autoDelete": "true", "boot": "false", "mode": "READ_WRITE", "type": "PERSISTENT", "interface": "SCSI", "source": "projects/{0}/zones/{1}/disks/{2}-secondary".format(self.project, zone, instance_name) } ] else: disks = [{ "name": instance_name, "tag_name": instance_name + '-volume-primary', "deviceName": instance_name + '-primary', "autoDelete": 'true', "initializeParams": { "diskSizeGb": primary_disk_size, "sourceImage": image_name }, "boot": 'true', "mode": "READ_WRITE" }] if service_base_name in image_name and rsa_encrypted_csek: for disk in disks: if "initializeParams" in disk: disk["initializeParams"]["sourceImageEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} disk["diskEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} elif rsa_encrypted_csek: for disk in disks: disk["diskEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} instance_params = { "name": instance_name, "machineType": "zones/{}/machineTypes/{}".format(zone, instance_size), "labels": labels, "canIpForward": ip_forward, "networkInterfaces": [ { "network": "global/networks/{}".format(vpc_name), "subnetwork": "regions/{}/subnetworks/{}".format(region, subnet_name), "accessConfigs": access_configs }, ], "metadata": {"items": [ { "key": "ssh-keys", "value": "{}:{}".format(initial_user, ssh_key) }, { "key": "enable-oslogin", "value": "{}".format(os_login_enabled) }, { "key": "block-project-ssh-keys", "value": "{}".format(block_project_ssh_keys) } ] }, "disks": disks, "serviceAccounts": [ { "email": service_account_email, "scopes": ["https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/compute"] } ] } if instance_class == 'notebook' or instance_class == 'dataengine': del instance_params['networkInterfaces'][0]['accessConfigs'] if gpu_accelerator_type != 'None': #request = self.service.acceleratorTypes().list(project=self.project, zone = zone) #result = request.execute().get('items') #gpu_accelerator_type = result[0].get('name') instance_params['guestAccelerators'] = [ { "acceleratorCount": gpu_accelerator_count, "acceleratorType": "projects/{0}/zones/{1}/acceleratorTypes/{2}".format( self.project, zone, gpu_accelerator_type) } ] instance_params['scheduling'] = { "onHostMaintenance": "terminate", "automaticRestart": "true" } request = self.service.instances().insert(project=self.project, zone=zone, body=instance_params) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone) print('Instance {} created.'.format(instance_name)) request = self.service.instances().get(instance=instance_name, project=self.project, zone=zone) res = request.execute() if 'ssn' in network_tag: instance_tag = {"items": [network_tag, "datalab", "ssn"], "fingerprint": res['tags']['fingerprint']} elif 'edge' in network_tag: instance_tag = {"items": [network_tag, "datalab", "edge"], "fingerprint": res['tags']['fingerprint']} else: instance_tag = {"items": [network_tag, "datalab"], "fingerprint": res['tags']['fingerprint']} request = self.service.instances().setTags(instance=instance_name, project=self.project, zone=zone, body=instance_tag) GCPActions().set_disks_tag(disks, zone, labels) request.execute() return result except Exception as err: logging.info( "Unable to create Instance: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to create Instance", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def set_disks_tag(self, disks, zone, labels): try: for disk in disks: labels['name'] = disk['tag_name'] request = self.service.disks().get(disk=disk['name'], project=self.project, zone=zone) finger_print = request.execute()['labelFingerprint'] label = { "labels": labels, "labelFingerprint": finger_print } request = self.service.disks().setLabels(resource=disk['name'], project=self.project, zone=zone, body=label) request.execute() except Exception as err: logging.info( "Unable to create add tags: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to add tags", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_instance(self, instance_name, zone): request = self.service.instances().delete(project=self.project, zone=zone, instance=instance_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone) print('Instance {} removed.'.format(instance_name)) return result except Exception as err: logging.info( "Unable to remove Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove Instance", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def stop_instance(self, instance_name, zone): request = self.service.instances().stop(project=self.project, zone=zone, instance=instance_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone) return True except Exception as err: logging.info( "Unable to stop Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to stop Instance", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def start_instance(self, instance_name, zone, rsa_encrypted_csek=''): if rsa_encrypted_csek: params = dict() params['disks'] = list() instance_data = datalab.meta_lib.GCPMeta().get_instance(instance_name) for disk in instance_data['disks']: params["disks"].append( {"diskEncryptionKey": {"rsaEncryptedKey": rsa_encrypted_csek}, "source": disk['source']}) request = self.service.instances().startWithEncryptionKey(project=self.project, zone=zone, instance=instance_name, body=params) else: request = self.service.instances().start(project=self.project, zone=zone, instance=instance_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone) return True except Exception as err: logging.info( "Unable to start Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to start Instance", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_service_account(self, service_account_name, service_base_name): unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name) service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project) request = self.service_iam.projects().serviceAccounts().delete( name='projects/{}/serviceAccounts/{}'.format(self.project, service_account_email)) try: result = request.execute() service_account_removed = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name) while service_account_removed: time.sleep(5) service_account_removed = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name) time.sleep(30) print('Service account {} removed.'.format(service_account_name)) return result except Exception as err: logging.info( "Unable to remove Service account: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to remove Service account", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_service_account(self, service_account_name, service_base_name, unique_index): service_account_id = service_base_name + '-' + unique_index print("Creating service account with accountID:" + service_account_id) params = {"accountId": service_account_id, "serviceAccount": {"displayName": service_account_name}} request = self.service_iam.projects().serviceAccounts().create(name='projects/{}'.format(self.project), body=params) try: result = request.execute() service_account_created = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name) while not service_account_created: time.sleep(5) service_account_created = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name) time.sleep(30) print('Service account {} created.'.format(service_account_name)) return result except Exception as err: logging.info( "Unable to create Service account: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to create Service account", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def set_role_to_service_account(self, service_account_name, role_name, service_base_name, role_type='custom', num=0): num += 1 request = GCPActions().service_resource.projects().getIamPolicy(resource=self.project, body={}) project_policy = request.execute() unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name) service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project) params = { "role": "projects/{}/roles/{}".format(self.project, role_name.replace('-', '_')), "members": [ "serviceAccount:{}".format(service_account_email) ] } if role_type == 'predefined': params['role'] = "roles/{}".format(role_name) project_policy['bindings'].append(params) params = { "policy": { "bindings": project_policy['bindings'] } } request = self.service_resource.projects().setIamPolicy(resource=self.project, body=params) try: return request.execute() except Exception as err: if "There were concurrent policy changes. " \ "Please retry the whole read-modify-write with exponential backoff." in str(err) and num <= 10: time.sleep(random.randint(5, 20)) self.set_role_to_service_account(service_base_name, role_name, service_base_name, role_type, num) logging.info( "Unable to set Service account policy: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to set Service account policy", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_role(self, role_name, permissions): request = self.service_iam.projects().roles().create(parent="projects/{}".format(self.project), body= { "roleId": role_name.replace('-', '_'), "role": { "title": role_name, "includedPermissions": permissions }}) try: result = request.execute() role_created = datalab.meta_lib.GCPMeta().get_role(role_name) while not role_created: time.sleep(5) role_created = datalab.meta_lib.GCPMeta().get_role(role_name) time.sleep(30) print('IAM role {} created.'.format(role_name)) return result except Exception as err: logging.info( "Unable to create IAM role: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to create IAM role", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def undelete_role(self, role_name): request = self.service_iam.projects().roles().undelete( name='projects/{}/roles/{}'.format(self.project, role_name.replace('-', '_')), body= { }) try: result = request.execute() role = datalab.meta_lib.GCPMeta().get_role(role_name) if 'deleted' in role: role_removed = True else: role_removed = False while role_removed: time.sleep(5) role = datalab.meta_lib.GCPMeta().get_role(role_name) if 'deleted' in role: role_removed = True time.sleep(30) print('IAM role {} restored.'.format(role_name)) return result except Exception as err: logging.info( "Unable to restore IAM role: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to restore IAM role", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_role(self, role_name): request = self.service_iam.projects().roles().delete( name='projects/{}/roles/{}'.format(self.project, role_name.replace('-', '_'))) try: result = request.execute() role = datalab.meta_lib.GCPMeta().get_role(role_name) if 'deleted' in role: role_removed = True else: role_removed = False while not role_removed: time.sleep(5) role = datalab.meta_lib.GCPMeta().get_role(role_name) if 'deleted' in role: role_removed = True time.sleep(30) print('IAM role {} removed.'.format(role_name)) return result except Exception as err: logging.info( "Unable to remove IAM role: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to remove IAM role", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def set_label_for_instance(self, zone, instance_name, key, value): try: instance_params = self.service.instances().get(project=self.project, zone=zone, instance=instance_name).execute() label_fingerprint = instance_params.get('labelFingerprint') self.service.instances().setLabels(project=self.project, zone=zone, instance=instance_name, body={ "labels": { key: value }, "labelFingerprint": label_fingerprint }).execute() except Exception as err: logging.info( "Unable to set label to instance: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to set label to instance", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def set_service_account_to_instance(self, service_account_name, instance_name, service_base_name): unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name) service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project) params = { "email": service_account_email } request = self.service.instances().setServiceAccount( project=self.project, zone=os.environ['gcp_zone'], instance=instance_name, body=params) try: return request.execute() except Exception as err: logging.info( "Unable to set Service account to instance: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to set Service account to instance", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_static_address(self, address_name, region): params = {"name": address_name} request = self.service.addresses().insert(project=self.project, region=region, body=params) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region) print('Static address {} created.'.format(address_name)) return result except Exception as err: logging.info( "Unable to create Static IP address: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to create Static IP address", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def remove_static_address(self, address_name, region): request = self.service.addresses().delete(project=self.project, region=region, address=address_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region) print('Static address {} removed.'.format(address_name)) return result except Exception as err: logging.info( "Unable to remove Static IP address: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to remove Static IP address", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def create_image_from_instance_disks(self, primary_image_name, secondary_image_name, instance_name, zone, labels, rsa_encrypted_csek=''): primary_disk_name = "projects/{0}/zones/{1}/disks/{2}".format(self.project, zone, instance_name) secondary_disk_name = "projects/{0}/zones/{1}/disks/{2}-secondary".format(self.project, zone, instance_name) labels.update({"name": primary_image_name}) primary_params = {"name": primary_image_name, "sourceDisk": primary_disk_name, "labels": labels} if rsa_encrypted_csek: primary_params["imageEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} primary_params["sourceDiskEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} primary_request = self.service.images().insert(project=self.project, body=primary_params) labels.update({"name": secondary_image_name}) secondary_params = {"name": secondary_image_name, "sourceDisk": secondary_disk_name, "labels": labels} if rsa_encrypted_csek: secondary_params["imageEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} secondary_params["sourceDiskEncryptionKey"] = {"rsaEncryptedKey": rsa_encrypted_csek} secondary_request = self.service.images().insert(project=self.project, body=secondary_params) id_list = [] try: GCPActions().stop_instance(instance_name, zone) primary_image_check = datalab.meta_lib.GCPMeta().get_image_by_name(primary_image_name) if primary_image_check != '': GCPActions().start_instance(instance_name, zone, rsa_encrypted_csek) return '' primary_result = primary_request.execute() secondary_result = secondary_request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(primary_result['name']) print('Image {} has been created.'.format(primary_image_name)) id_list.append(primary_result.get('id')) datalab.meta_lib.GCPMeta().wait_for_operation(secondary_result['name']) print('Image {} has been created.'.format(secondary_image_name)) id_list.append(secondary_result.get('id')) GCPActions().start_instance(instance_name, zone, rsa_encrypted_csek) return id_list except Exception as err: logging.info( "Unable to create image from disks: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to create images from disks", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return id_list def remove_image(self, image_name): try: request = self.service.images().delete(project=self.project, image=image_name) try: result = request.execute() datalab.meta_lib.GCPMeta().wait_for_operation(result['name']) print('Image {} was removed.'.format(image_name)) except errors.HttpError as err: if err.resp.status == 404: print('Image {} was not found. Skipped'.format(image_name)) return request else: raise err return request except Exception as err: logging.info( "Unable to remove disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) append_result(str({"error": "Unable to remove disk", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def put_to_bucket(self, bucket_name, local_file, dest_file): try: bucket = self.storage_client.get_bucket(bucket_name) blob = bucket.blob(dest_file) blob.upload_from_filename(local_file) return True except: return False def get_from_bucket(self, bucket_name, dest_file, local_file): try: bucket = self.storage_client.get_bucket(bucket_name) blob = bucket.blob(dest_file) if blob.exists(): blob.download_to_filename(local_file) return True else: return False except exceptions.NotFound: return False def set_bucket_owner(self, bucket_name, service_account_name, service_base_name): try: unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name) service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project) bucket = self.storage_client.get_bucket(bucket_name) # setting bucket owner acl = bucket.acl acl.user(service_account_email).grant_owner() acl.save() # setting default ACL for bucket self.service_storage.defaultObjectAccessControls().insert(bucket=bucket_name, body={ "entity": "user-{}".format(service_account_email), "role": "OWNER" }).execute() # setting new default ACL for all objects in bucket default_acl = self.service_storage.defaultObjectAccessControls().list( bucket=bucket_name).execute().get('items') objects = bucket.list_blobs() for bucket_object in objects: object_params = bucket.get_blob(bucket_object.name) for acl in default_acl: if acl.get('role') == 'OWNER' and acl.get('entity')[:5] == 'user-': object_params.acl.user(acl.get('entity')[5:]).grant_owner() object_params.acl.save() except Exception as err: logging.info( "Unable to modify bucket ACL: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to modify bucket ACL", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def get_gitlab_cert(self, bucket_name, certfile): try: bucket = self.storage_client.get_bucket(bucket_name) blob = bucket.blob(certfile) if blob.exists(): blob.download_to_filename(certfile) return True else: return False except exceptions.NotFound: return False def create_dataproc_cluster(self, cluster_name, region, params): request = self.dataproc.projects().regions().clusters().create(projectId=self.project, region=region, body=params) try: result = request.execute() time.sleep(5) cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name]) while cluster_status[0]['status'] != 'running': time.sleep(5) print('The cluster is being created... Please wait') cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name]) if cluster_status[0]['status'] == 'terminated': raise Exception return result except Exception as err: logging.info( "Unable to create dataproc cluster: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to create dataproc cluster", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def set_cluster_volume_tag(self, clusteName, region, zone): try: print('Setting volume tags') print(clusteName + ':' + region + ':' + zone) result = self.dataproc.projects().regions().clusters().list( projectId=self.project, region=region).execute() clusters = result.get('clusters') dataproc_instances = [] labels = '' for cluster in clusters: if cluster['clusterName'] == clusteName: print(cluster) labels = cluster.get('labels') master_instances = cluster.get('config').get('masterConfig').get('instanceNames') slave_instances = cluster.get('config').get('workerConfig').get('instanceNames') for instance in master_instances: param = {} param['name'] = instance param['tag_name'] = clusteName + '-volume-primary' dataproc_instances.append(param) for instance in slave_instances: param = {} param['name'] = instance param['tag_name'] = clusteName + '-volume-primary' dataproc_instances.append(param) GCPActions().set_disks_tag(dataproc_instances, zone, labels) except Exception as err: logging.info( "Unable to tag volume dataproc cluster: " + str( err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to tag volume dataproc cluster", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def delete_dataproc_cluster(self, cluster_name, region): request = self.dataproc.projects().regions().clusters().delete(projectId=self.project, region=region, clusterName=cluster_name) try: result = request.execute() cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name]) while cluster_status[0]['status'] != 'terminated': time.sleep(5) print('The cluster is being terminated... Please wait') cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name]) GCPActions().delete_dataproc_jobs(cluster_name) return result except Exception as err: logging.info( "Unable to delete dataproc cluster: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to delete dataproc cluster", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def update_dataproc_cluster(self, cluster_name, cluster_labels): body = {"labels": cluster_labels} request = self.dataproc.projects().regions().clusters().patch(projectId=self.project, region=os.environ['gcp_region'], clusterName=cluster_name, updateMask='labels', body=body) try: result = request.execute() time.sleep(15) GCPActions().set_cluster_volume_tag(cluster_name, os.environ['gcp_region'], os.environ['gcp_zone']) return result except Exception as err: logging.info( "Unable to update dataproc cluster: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to update dataproc cluster", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def submit_dataproc_job(self, job_body): request = self.dataproc.projects().regions().jobs().submit(projectId=self.project, region=os.environ['gcp_region'], body=job_body) try: res = request.execute() print("Job ID: {}".format(res['reference']['jobId'])) job_status = datalab.meta_lib.GCPMeta().get_dataproc_job_status(res['reference']['jobId']) while job_status != 'done': time.sleep(5) job_status = datalab.meta_lib.GCPMeta().get_dataproc_job_status(res['reference']['jobId']) if job_status in ('failed', 'error'): raise Exception return job_status except Exception as err: logging.info( "Unable to submit dataproc job: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to submit dataproc job", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) sys.exit(1) def delete_dataproc_jobs(self, cluster_filter): try: jobs = datalab.meta_lib.GCPMeta().get_dataproc_jobs() cluster_jobs_ids = [job['reference']['jobId'] for job in jobs if cluster_filter in job['placement']['clusterName']] for job_id in list(set(cluster_jobs_ids)): print('The cluster jobs is being deleted... Please wait') try: req = self.dataproc.projects().regions().jobs().delete(projectId=self.project, region=os.environ['gcp_region'], jobId=job_id) req.execute() except errors.HttpError as err: if err.resp.status == 404: print('Job with ID: {} have not been found.'.format(job_id)) except Exception as err: logging.info( "Unable to delete dataproc jobs: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to delete dataproc jobs", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) sys.exit(1) def get_cluster_app_version(self, bucket, user_name, cluster_name, application): try: version_file = '{0}/{1}/{2}_version'.format(user_name, cluster_name, application) if GCPActions().get_from_bucket(bucket, version_file, '/tmp/{}_version'.format(application)): with open('/tmp/{}_version'.format(application)) as f: version = f.read() return version[0:5] else: raise Exception except Exception as err: logging.info( "Unable to get software version: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to get software version", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def jars(self, args, dataproc_dir): print("Downloading jars...") GCPActions().get_from_bucket(args.bucket, 'jars/{0}/jars.tar.gz'.format(args.dataproc_version), '/tmp/jars.tar.gz') GCPActions().get_from_bucket(args.bucket, 'jars/{0}/jars-checksum.chk'.format(args.dataproc_version), '/tmp/jars-checksum.chk') if 'WARNING' in subprocess.run('md5sum -c /tmp/jars-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'): subprocess.run('rm -f /tmp/jars.tar.gz', shell=True, check=True) GCPActions().get_from_bucket(args.bucket, 'jars/{0}/jars.tar.gz'.format(args.cluster_name), '/tmp/jars.tar.gz') if 'WARNING' in subprocess.run('md5sum -c /tmp/jars-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'): print("The checksum of jars.tar.gz is mismatched. It could be caused by gcp network issue.") sys.exit(1) subprocess.run('tar -zhxvf /tmp/jars.tar.gz -C {}'.format(dataproc_dir), shell=True, check=True) def yarn(self, args, yarn_dir): print("Downloading yarn configuration...") bucket = self.storage_client.get_bucket(args.bucket) list_files = bucket.list_blobs(prefix='{0}/{1}/config/'.format(args.user_name, args.cluster_name)) subprocess.run('mkdir -p /tmp/{0}/{1}/config/'.format(args.user_name, args.cluster_name), shell=True, check=True) for item in list_files: local_file = '/tmp/{0}/{1}/config/{2}'.format(args.user_name, args.cluster_name, item.name.split("/")[-1:][0]) GCPActions().get_from_bucket(args.bucket, item.name, local_file) subprocess.run('sudo mv /tmp/{0}/{1}/config/* {2}'.format(args.user_name, args.cluster_name, yarn_dir), shell=True, check=True) subprocess.run('sudo rm -rf /tmp/{}'.format(args.user_name), shell=True, check=True) def install_dataproc_spark(self, args): print("Installing spark...") GCPActions().get_from_bucket(args.bucket, '{0}/{1}/spark.tar.gz'.format(args.user_name, args.cluster_name), '/tmp/spark.tar.gz') GCPActions().get_from_bucket(args.bucket, '{0}/{1}/spark-checksum.chk'.format(args.user_name, args.cluster_name), '/tmp/spark-checksum.chk') if 'WARNING' in subprocess.run('md5sum -c /tmp/spark-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'): subprocess.run('rm -f /tmp/spark.tar.gz', shell=True, check=True) GCPActions().get_from_bucket(args.bucket, '{0}/{1}/spark.tar.gz'.format(args.user_name, args.cluster_name), '/tmp/spark.tar.gz') if 'WARNING' in subprocess.run('md5sum -c /tmp/spark-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'): print("The checksum of spark.tar.gz is mismatched. It could be caused by gcp network issue.") sys.exit(1) subprocess.run('sudo tar -zhxvf /tmp/spark.tar.gz -C /opt/{0}/{1}/'.format(args.dataproc_version, args.cluster_name), shell=True, check=True) def spark_defaults(self, args): spark_def_path = '/opt/{0}/{1}/spark/conf/spark-env.sh'.format(args.dataproc_version, args.cluster_name) subprocess.run(""" sudo bash -c " sed -i '/#/d' {}" """.format(spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i '/^\s*$/d' {}" """.format(spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i 's|/usr/lib/hadoop|/opt/{0}/jars/usr/lib/hadoop|g' {1}" """.format(args.dataproc_version, spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i 's|/etc/hadoop/conf|/opt/{0}/{1}/conf|g' {2}" """.format(args.dataproc_version, args.cluster_name, spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -l -c " sed -i '/\$HADOOP_HOME\/\*/a SPARK_DIST_CLASSPATH=\\"\$SPARK_DIST_CLASSPATH:\$HADOOP_HOME\/client\/*\\"' {}" """.format(spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -l -c " sed -i '/\$HADOOP_YARN_HOME\/\*/a SPARK_DIST_CLASSPATH=\\"\$SPARK_DIST_CLASSPATH:\/opt\/jars\/\*\\"' {}" """.format(spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i 's|/hadoop/spark/work|/tmp/hadoop/spark/work|g' {}" """.format(spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i 's|/hadoop/spark/tmp|/tmp/hadoop/spark/tmp|g' {}" """.format(spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i 's/STANDALONE_SPARK_MASTER_HOST.*/STANDALONE_SPARK_MASTER_HOST={0}-m/g' {1}" """.format(args.cluster_name, spark_def_path), shell=True, check=True) subprocess.run(""" sudo bash -c " sed -i 's|/hadoop_gcs_connector_metadata_cache|/tmp/hadoop_gcs_connector_metadata_cache|g' /opt/{0}/{1}/conf/core-site.xml" """.format(args.dataproc_version, args.cluster_name), shell=True, check=True) def remove_kernels(self, notebook_name, dataproc_name, dataproc_version, ssh_user, key_path, computational_name): try: notebook_ip = datalab.meta_lib.GCPMeta().get_private_ip_address(notebook_name) global con con = datalab.fab.init_datalab_connection(notebook_ip, ssh_user, key_path) con.sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(ssh_user, dataproc_name)) if exists(con, '/home/{}/.ensure_dir/dataengine-service_{}_interpreter_ensured'.format(ssh_user, dataproc_name)): if os.environ['notebook_multiple_clusters'] == 'true': try: livy_port = con.sudo("cat /opt/" + dataproc_version + "/" + dataproc_name + "/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | awk '{printf $3}'").stdout.replace('\n','') process_number = con.sudo("netstat -natp 2>/dev/null | grep ':" + livy_port + "' | awk '{print $7}' | sed 's|/.*||g'").stdout.replace('\n','') con.sudo('kill -9 ' + process_number) con.sudo('systemctl disable livy-server-' + livy_port) except: print("Wasn't able to find Livy server for this EMR!") con.sudo('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh') con.sudo("rm -rf /home/{}/.ensure_dir/dataengine-service_interpreter_ensure".format(ssh_user)) zeppelin_url = 'http://' + notebook_ip + ':8080/api/interpreter/setting/' opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = opener.open(urllib.request.Request(zeppelin_url)) r_text = req.read() interpreter_json = json.loads(r_text) interpreter_prefix = dataproc_name for interpreter in interpreter_json['body']: if interpreter_prefix in interpreter['name']: print("Interpreter with ID: {} and name: {} will be removed from zeppelin!".format( interpreter['id'], interpreter['name'])) request = urllib.request.Request(zeppelin_url + interpreter['id'], data=''.encode()) request.get_method = lambda: 'DELETE' url = opener.open(request) print(url.read()) con.sudo('chown {0}:{0} -R /opt/zeppelin/'.format(ssh_user)) con.sudo('systemctl restart zeppelin-notebook.service') zeppelin_restarted = False while not zeppelin_restarted: con.sudo('sleep 5') result = con.sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?').stdout.replace('\n','') result = result[:1] if result == '1': zeppelin_restarted = True con.sudo('sleep 5') con.sudo('rm -rf /home/{}/.ensure_dir/dataengine-service_{}_interpreter_ensured'.format(ssh_user, dataproc_name)) if exists(con, '/home/{}/.ensure_dir/rstudio_dataengine-service_ensured'.format(ssh_user)): datalab.fab.remove_rstudio_dataengines_kernel(computational_name, ssh_user) con.sudo('rm -rf /opt/{0}/{1}/'.format(dataproc_version, dataproc_name)) print("Notebook's {} kernels were removed".format(notebook_ip)) except Exception as err: logging.info( "Unable to delete dataproc kernels from notebook: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to delete dataproc kernels from notebook", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def configure_zeppelin_dataproc_interpreter(self, dataproc_version, cluster_name, spark_dir, os_user, yarn_dir, bucket, user_name, multiple_clusters): try: port_number_found = False zeppelin_restarted = False default_port = 8998 GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python') with open('/tmp/python_version') as f: python_version = f.read() python_version = python_version[0:5] livy_port = '' livy_path = '/opt/{0}/{1}/livy/'.format(dataproc_version, cluster_name) subprocess.run('echo \"Configuring dataproc path for Zeppelin\"', shell=True, check=True) subprocess.run('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/{0}\/{1}\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh' .format(dataproc_version, cluster_name), shell=True, check=True) subprocess.run('sed -i \"s/^export HADOOP_CONF_DIR.*/export HADOOP_CONF_DIR=\/opt\/{0}\/{1}\/conf/\" /opt/{0}/{1}/spark/conf/spark-env.sh' .format(dataproc_version, cluster_name), shell=True, check=True) subprocess.run('sed -i "/spark.executorEnv.PYTHONPATH/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'.format(dataproc_version, cluster_name), shell=True, check=True) subprocess.run('sed -i "/spark.yarn.dist.files/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'.format(dataproc_version, cluster_name), shell=True, check=True) subprocess.run('sudo chown {0}:{0} -R /opt/zeppelin/'.format(os_user), shell=True, check=True) subprocess.run('sudo systemctl restart zeppelin-notebook.service', shell=True, check=True) while not zeppelin_restarted: subprocess.run('sleep 5', shell=True, check=True) result = subprocess.run('sudo bash -c "nmap -p 8080 localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r") result = result[:1] if result == '1': zeppelin_restarted = True subprocess.run('sleep 5', shell=True, check=True) subprocess.run('echo \"Configuring dataproc spark interpreter for Zeppelin\"', shell=True, check=True) if multiple_clusters == 'true': while not port_number_found: port_free = subprocess.run('sudo bash -c "nmap -p ' + str(default_port) + ' localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r") port_free = port_free[:1] if port_free == '0': livy_port = default_port port_number_found = True else: default_port += 1 subprocess.run('sudo echo "livy.server.port = {0}" >> {1}conf/livy.conf'.format(str(livy_port), livy_path), shell=True, check=True) subprocess.run('sudo echo "livy.spark.master = yarn" >> {}conf/livy.conf'.format(livy_path), shell=True, check=True) if os.path.exists('{}conf/spark-blacklist.conf'.format(livy_path)): subprocess.run('sudo sed -i "s/^/#/g" {}conf/spark-blacklist.conf'.format(livy_path), shell=True, check=True) subprocess.run('sudo echo "export SPARK_HOME={0}" >> {1}conf/livy-env.sh'.format(spark_dir, livy_path), shell=True, check=True) subprocess.run('sudo echo "export HADOOP_CONF_DIR={0}" >> {1}conf/livy-env.sh'.format(yarn_dir, livy_path), shell=True, check=True) subprocess.run('sudo echo "export PYSPARK3_PYTHON=python{0}" >> {1}conf/livy-env.sh'.format(python_version[0:3], livy_path), shell=True, check=True) template_file = "/tmp/dataengine-service_interpreter.json" fr = open(template_file, 'r+') text = fr.read() text = text.replace('CLUSTER_NAME', cluster_name) text = text.replace('SPARK_HOME', spark_dir) text = text.replace('LIVY_PORT', str(livy_port)) fw = open(template_file, 'w') fw.write(text) fw.close() for _ in range(5): try: subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " + "@/tmp/dataengine-service_interpreter.json http://localhost:8080/api/interpreter/setting", shell=True, check=True) break except: subprocess.run('sleep 5', shell=True, check=True) subprocess.run('sudo cp /opt/livy-server-cluster.service /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True) subprocess.run("sudo sed -i 's|OS_USER|{0}|' /etc/systemd/system/livy-server-{1}.service".format(os_user, str(livy_port)), shell=True, check=True) subprocess.run("sudo sed -i 's|LIVY_PATH|{0}|' /etc/systemd/system/livy-server-{1}.service".format(livy_path, str(livy_port)), shell=True, check=True) subprocess.run('sudo chmod 644 /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True) subprocess.run('sudo systemctl daemon-reload', shell=True, check=True) subprocess.run('sudo systemctl enable livy-server-{}'.format(str(livy_port)), shell=True, check=True) subprocess.run('sudo systemctl start livy-server-{}'.format(str(livy_port)), shell=True, check=True) else: template_file = "/tmp/dataengine-service_interpreter.json" p_versions = ["2", "{}-dp".format(python_version[:3])] for p_version in p_versions: fr = open(template_file, 'r+') text = fr.read() text = text.replace('CLUSTERNAME', cluster_name) text = text.replace('PYTHONVERSION', p_version) text = text.replace('SPARK_HOME', spark_dir) text = text.replace('PYTHONVER_SHORT', p_version[:1]) text = text.replace('DATAENGINE-SERVICE_VERSION', dataproc_version) tmp_file = '/tmp/dataproc_spark_py{}_interpreter.json'.format(p_version) fw = open(tmp_file, 'w') fw.write(text) fw.close() for _ in range(5): try: subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " + "@/tmp/dataproc_spark_py{}_interpreter.json http://localhost:8080/api/interpreter/setting".format(p_version), shell=True, check=True) break except: subprocess.run('sleep 5', shell=True, check=True) subprocess.run('touch /home/{0}/.ensure_dir/dataengine-service_{1}_interpreter_ensured'.format(os_user, cluster_name), shell=True, check=True) except: sys.exit(1) def install_python(self, bucket, user_name, cluster_name, application, numpy_version): try: GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python') with open('/tmp/python_version') as f: python_version = f.read() python_version = python_version[0:5] if not os.path.exists('/opt/python/python{}'.format(python_version)): subprocess.run('wget https://www.python.org/ftp/python/{0}/Python-{0}.tgz -O /tmp/Python-{0}.tgz'.format(python_version), shell=True, check=True) subprocess.run('tar zxvf /tmp/Python-{}.tgz -C /tmp/'.format(python_version), shell=True, check=True) subprocess.run('cd /tmp/Python-{0}; ./configure --prefix=/opt/python/python{0} --with-zlib-dir=/usr/local/lib/ --with-ensurepip=install'.format(python_version), shell=True, check=True) subprocess.run('cd /tmp/Python-{}; sudo make altinstall'.format(python_version), shell=True, check=True) subprocess.run('cd /tmp/; sudo rm -rf Python-{}/'.format(python_version), shell=True, check=True) subprocess.run('sudo -i virtualenv /opt/python/python{}'.format(python_version), shell=True, check=True) venv_command = 'source /opt/python/python{}/bin/activate'.format(python_version) pip_command = '/opt/python/python{0}/bin/pip{1}'.format(python_version, python_version[:3]) for lib in ['-U pip==9.0.3', 'pyzmq==17.0.0', 'ipython ipykernel boto boto3 pybind11 pythran cython NumPy=={} Matplotlib --no-cache-dir'.format(numpy_version), 'SciPy pandas Sympy Pillow --no-cache-dir', 'sklearn --no-cache-dir']: subprocess.run('bash -c "{0} && sudo -i {1} install {2}"' .format(venv_command, pip_command, lib), shell=True, check=True) if application == 'deeplearning': subprocess.run('bash -c "{0} && sudo -i {1} install mxnet-cu80 opencv-python keras Theano --no-cache-dir"'.format(venv_command, pip_command), shell=True, check=True) python_without_dots = python_version.replace('.', '') subprocess.run('bash -c "{0} && sudo -i {1} install https://cntk.ai/PythonWheel/GPU/cntk-2.0rc3-cp{2}-cp{2}m-linux_x86_64.whl --no-cache-dir"' .format(venv_command, pip_command, python_without_dots[:2]), shell=True, check=True) subprocess.run('sudo rm -rf /usr/bin/python{}-dp'.format(python_version[0:3]), shell=True, check=True) subprocess.run('sudo ln -fs /opt/python/python{0}/bin/python{1} /usr/bin/python{1}-dp'.format(python_version, python_version[0:3]), shell=True, check=True) except Exception as err: logging.info( "Unable to install python: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to install python", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)})) traceback.print_exc(file=sys.stdout) return '' def ensure_local_jars(os_user, jars_dir): if not exists(datalab.fab.conn,'/home/{}/.ensure_dir/gs_kernel_ensured'.format(os_user)): try: templates_dir = '/root/templates/' datalab.fab.conn.sudo('mkdir -p {}'.format(jars_dir)) datalab.fab.conn.sudo('wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-{0}.jar -O {1}' 'gcs-connector-hadoop2-{0}.jar'.format(os.environ['notebook_gcs_connector_version'], jars_dir)) datalab.fab.conn.sudo('wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/2.7.4/{0} -O {1}{0}' .format('hadoop-yarn-server-web-proxy-2.7.4.jar', jars_dir)) datalab.fab.conn.put(templates_dir + 'core-site.xml', '/tmp/core-site.xml') datalab.fab.conn.sudo('sed -i "s|GCP_PROJECT_ID|{}|g" /tmp/core-site.xml'.format(os.environ['gcp_project_id'])) datalab.fab.conn.sudo('mv /tmp/core-site.xml /opt/spark/conf/core-site.xml') datalab.fab.conn.put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf') if os.environ['application'] == 'zeppelin': datalab.fab.conn.run('echo \"spark.jars $(ls -1 ' + jars_dir + '* | tr \'\\n\' \',\')\" >> /tmp/notebook_spark-defaults_local.conf') datalab.fab.conn.sudo('\cp /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf') datalab.fab.conn.sudo('touch /home/{}/.ensure_dir/gs_kernel_ensured'.format(os_user)) except Exception as err: print('Error:', str(err)) sys.exit(1) def get_cluster_python_version(region, bucket, user_name, cluster_name): try: GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python') except: sys.exit(1) def installing_python(region, bucket, user_name, cluster_name, application='', pip_mirror='', numpy_version='1.14.3'): try: GCPActions().install_python(bucket, user_name, cluster_name, application, numpy_version) except: sys.exit(1) def prepare_disk(os_user): if not exists(datalab.fab.conn,'/home/' + os_user + '/.ensure_dir/disk_ensured'): try: disk_name = datalab.fab.conn.sudo("lsblk | grep disk | awk '{print $1}' | sort | tail -n 1").stdout.replace('\n','') datalab.fab.conn.sudo('''bash -c 'echo -e "o\nn\np\n1\n\n\nw" | fdisk /dev/{}' '''.format(disk_name)) datalab.fab.conn.sudo('mkfs.ext4 -F /dev/{}1'.format(disk_name)) datalab.fab.conn.sudo('mount /dev/{}1 /opt/'.format(disk_name)) datalab.fab.conn.sudo(''' bash -c "echo '/dev/{}1 /opt/ ext4 errors=remount-ro 0 1' >> /etc/fstab" '''.format(disk_name)) datalab.fab.conn.sudo('touch /home/' + os_user + '/.ensure_dir/disk_ensured') except: sys.exit(1) def ensure_local_spark(os_user, spark_link, spark_version, hadoop_version, local_spark_path): if not exists(datalab.fab.conn,'/home/' + os_user + '/.ensure_dir/local_spark_ensured'): try: datalab.fab.conn.sudo('wget ' + spark_link + ' -O /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz') datalab.fab.conn.sudo('tar -zxvf /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/') datalab.fab.conn.sudo('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + local_spark_path) datalab.fab.conn.sudo('chown -R ' + os_user + ':' + os_user + ' ' + local_spark_path) datalab.fab.conn.sudo('touch /home/' + os_user + '/.ensure_dir/local_spark_ensured') except Exception as err: print('Error:', str(err)) sys.exit(1) def configure_local_spark(jars_dir, templates_dir, memory_type='driver'): try: # Checking if spark.jars parameter was generated previously spark_jars_paths = None if exists(datalab.fab.conn, '/opt/spark/conf/spark-defaults.conf'): try: spark_jars_paths = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf | grep -e "^spark.jars " ').stdout.replace('\n','') except: spark_jars_paths = None datalab.fab.conn.put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf') if os.environ['application'] == 'zeppelin': datalab.fab.conn.run('echo \"spark.jars $(ls -1 ' + jars_dir + '* | tr \'\\n\' \',\')\" >> /tmp/notebook_spark-defaults_local.conf') datalab.fab.conn.sudo('\cp -f /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf') if memory_type == 'driver': spark_memory = datalab.fab.get_spark_memory() datalab.fab.conn.sudo('sed -i "/spark.*.memory/d" /opt/spark/conf/spark-defaults.conf') datalab.fab.conn.sudo('''bash -c 'echo "spark.{0}.memory {1}m" >> /opt/spark/conf/spark-defaults.conf' ''' .format(memory_type, spark_memory)) if not exists(datalab.fab.conn,'/opt/spark/conf/spark-env.sh'): datalab.fab.conn.sudo('mv /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh') if os.environ['conf_deeplearning_cloud_ami'] == 'true' and os.environ['conf_cloud_provider'] == 'gcp' and os.environ['application'] == 'deeplearning': java_home = '/usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre' else: java_home = datalab.fab.conn.run( "update-alternatives --query java | grep -o --color=never \'/.*/java-8.*/jre\'").stdout.splitlines()[0] datalab.fab.conn.sudo( '''bash -l -c 'echo "export JAVA_HOME={}" >> /opt/spark/conf/spark-env.sh' '''.format(java_home)) if 'spark_configurations' in os.environ: datalab_header = datalab.fab.conn.sudo('cat /tmp/notebook_spark-defaults_local.conf | grep "^#"').stdout spark_configurations = ast.literal_eval(os.environ['spark_configurations']) new_spark_defaults = list() spark_defaults = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf').stdout current_spark_properties = spark_defaults.split('\n') for param in current_spark_properties: if param.split(' ')[0] != '#': for config in spark_configurations: if config['Classification'] == 'spark-defaults': for property in config['Properties']: if property == param.split(' ')[0]: param = property + ' ' + config['Properties'][property] else: new_spark_defaults.append(property + ' ' + config['Properties'][property]) new_spark_defaults.append(param) new_spark_defaults = set(new_spark_defaults) datalab.fab.conn.sudo('''bash -c 'echo "{}" > /opt/spark/conf/spark-defaults.conf' '''.format(datalab_header)) for prop in new_spark_defaults: datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(prop)) datalab.fab.conn.sudo('sed -i "/^\s*$/d" /opt/spark/conf/spark-defaults.conf') if spark_jars_paths: datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(spark_jars_paths)) except Exception as err: print('Error:', str(err)) sys.exit(1) def remove_dataengine_kernels(notebook_name, os_user, key_path, cluster_name): try: computational_name = os.environ['computational_name'].replace('_', '-').lower() private = datalab.meta_lib.get_instance_private_ip_address(cluster_name, notebook_name) global con con = datalab.fab.init_datalab_connection(private, os_user, key_path) con.sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(os_user, cluster_name)) if exists(con, '/home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name)): if os.environ['notebook_multiple_clusters'] == 'true': try: livy_port = con.sudo("cat /opt/" + cluster_name + "/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | awk '{printf $3}'").stdout.replace('\n','') process_number = con.sudo("netstat -natp 2>/dev/null | grep ':" + livy_port + "' | awk '{print $7}' | sed 's|/.*||g'").stdout.replace('\n','') con.sudo('kill -9 ' + process_number) con.sudo('systemctl disable livy-server-' + livy_port) except: print("Wasn't able to find Livy server for this EMR!") con.sudo( 'sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh') con.sudo("rm -rf /home/{}/.ensure_dir/dataengine_interpreter_ensure".format(os_user)) zeppelin_url = 'http://' + private + ':8080/api/interpreter/setting/' opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = opener.open(urllib.request.Request(zeppelin_url)) r_text = req.read() interpreter_json = json.loads(r_text) interpreter_prefix = cluster_name for interpreter in interpreter_json['body']: if interpreter_prefix in interpreter['name']: print("Interpreter with ID: {} and name: {} will be removed from zeppelin!".format( interpreter['id'], interpreter['name'])) request = urllib.request.Request(zeppelin_url + interpreter['id'], data=''.encode()) request.get_method = lambda: 'DELETE' url = opener.open(request) print(url.read()) con.sudo('chown ' + os_user + ':' + os_user + ' -R /opt/zeppelin/') con.sudo('systemctl daemon-reload') con.sudo("service zeppelin-notebook stop") con.sudo("service zeppelin-notebook start") zeppelin_restarted = False while not zeppelin_restarted: con.sudo('sleep 5') result = con.sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?').stdout result = result[:1] if result == '1': zeppelin_restarted = True con.sudo('sleep 5') con.sudo('rm -rf /home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name)) if exists(con, '/home/{}/.ensure_dir/rstudio_dataengine_ensured'.format(os_user)): datalab.fab.remove_rstudio_dataengines_kernel(computational_name, os_user) con.sudo('rm -rf /opt/' + cluster_name + '/') print("Notebook's {} kernels were removed".format(private)) except Exception as err: logging.info("Unable to remove kernels on Notebook: " + str(err) + "\n Traceback: " + traceback.print_exc( file=sys.stdout)) append_result(str({"error": "Unable to remove kernels on Notebook", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) traceback.print_exc(file=sys.stdout) def install_dataengine_spark(cluster_name, spark_link, spark_version, hadoop_version, cluster_dir, os_user, datalake_enabled): subprocess.run('wget ' + spark_link + ' -O /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz', shell=True, check=True) subprocess.run('tar -zxvf /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/', shell=True, check=True) subprocess.run('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + cluster_dir + 'spark/', shell=True, check=True) subprocess.run('chown -R ' + os_user + ':' + os_user + ' ' + cluster_dir + 'spark/', shell=True, check=True) def configure_dataengine_spark(cluster_name, jars_dir, cluster_dir, datalake_enabled, spark_configs=''): subprocess.run("jar_list=`find {0} -name '*.jar' | tr '\\n' ',' | sed 's/,$//'` ; echo \"spark.jars $jar_list\" >> \ /tmp/{1}/notebook_spark-defaults_local.conf".format(jars_dir, cluster_name), shell=True, check=True) if os.path.exists('{0}spark/conf/spark-defaults.conf'.format(cluster_dir)): additional_spark_properties = subprocess.run('diff --changed-group-format="%>" --unchanged-group-format="" ' '/tmp/{0}/notebook_spark-defaults_local.conf ' '{1}spark/conf/spark-defaults.conf | grep -v "^#"'.format( cluster_name, cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r") for property in additional_spark_properties.split('\n'): subprocess.run('echo "{0}" >> /tmp/{1}/notebook_spark-defaults_local.conf'.format(property, cluster_name), shell=True, check=True) if os.path.exists('{0}'.format(cluster_dir)): subprocess.run('cp -f /tmp/{0}/notebook_spark-defaults_local.conf {1}spark/conf/spark-defaults.conf'.format(cluster_name, cluster_dir), shell=True, check=True) if os.path.exists('{0}'.format(cluster_dir)): subprocess.run('cp -f /opt/spark/conf/core-site.xml {}spark/conf/'.format(cluster_dir), shell=True, check=True) if spark_configs and os.path.exists('{0}'.format(cluster_dir)): datalab_header = subprocess.run('cat /tmp/{0}/notebook_spark-defaults_local.conf | grep "^#"'.format(cluster_name), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r") spark_configurations = ast.literal_eval(spark_configs) new_spark_defaults = list() spark_defaults = subprocess.run('cat {0}spark/conf/spark-defaults.conf'.format(cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r") current_spark_properties = spark_defaults.split('\n') for param in current_spark_properties: if param.split(' ')[0] != '#': for config in spark_configurations: if config['Classification'] == 'spark-defaults': for property in config['Properties']: if property == param.split(' ')[0]: param = property + ' ' + config['Properties'][property] else: new_spark_defaults.append(property + ' ' + config['Properties'][property]) new_spark_defaults.append(param) new_spark_defaults = set(new_spark_defaults) subprocess.run("echo '{0}' > {1}/spark/conf/spark-defaults.conf".format(datalab_header, cluster_dir), shell=True, check=True) for prop in new_spark_defaults: prop = prop.rstrip() subprocess.run('echo "{0}" >> {1}/spark/conf/spark-defaults.conf'.format(prop, cluster_dir), shell=True, check=True) subprocess.run('sed -i "/^\s*$/d" {0}/spark/conf/spark-defaults.conf'.format(cluster_dir), shell=True, check=True) def find_des_jars(all_jars, des_path): try: # Use this method to filter cloud jars (see an example in aws method) return all_jars except Exception as err: print('Error:', str(err)) sys.exit(1)