ManagedkdbInsights/update_cluster/update_cluster.py (88 lines of code) (raw):
import argparse
import pprint
import awswrangler as wr
from urllib.parse import urlparse
from pathlib import Path
from managed_kx import *
pp = pprint.PrettyPrinter(indent=2)
def divide_chunks(l, n):
# looping till length l
for i in range(0, len(l), n):
yield l[i:i + n]
class S3Url(object):
def __init__(self, url):
self._parsed = urlparse(url, allow_fragments=False)
@property
def bucket(self):
return self._parsed.netloc
@property
def key(self):
if self._parsed.query:
return self._parsed.path.lstrip('/') + '?' + self._parsed.query
else:
return self._parsed.path.lstrip('/')
@property
def url(self):
return self._parsed.geturl()
#
# This program assumes that you have credentials in $HOME/.aws/credentials
# those credentials must also be linked to a FinSpace user with ability to create datasets
#
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# arguments
parser.add_argument("-environmentId", "-e", help="Finspace with managed kdb Insights Environment ID", required=True)
parser.add_argument("-profile", "-p", help="profile to use for API access", default ="default")
parser.add_argument("-s3", "-s3", help="S3 Staging location", required=True)
parser.add_argument("-cluster", "-cl", help="Cluster to update", required=True)
parser.add_argument("-code", "-co", help="Code directory to send to cluster", required=True)
parser.add_argument("-wait", "-w", help="Wait for cluster status", default=True)
args = parser.parse_args()
ENV_ID = args.environmentId
S3_PATH = args.s3
CLUSTER_NAME = args.cluster
code_path = Path(args.code)
wait = args.wait
deploy_strategy = 'NO_RESTART'
# use the user's AWS credentials from environment
session = boto3.Session(profile_name = args.profile)
client = session.client(service_name='finspace')
# check arguments
# - code location exists
# - S3 bucket exists
# - cluster exists
# -------------------------------------------------------------
# does code location exist?
if code_path.is_dir() is False:
sys.exit(f"directory {code_path.absolute()} not found")
# cluster exists?
if has_cluster(client, environmentId=ENV_ID, clusterName=CLUSTER_NAME) is False:
sys.exit(f"cluster {CLUSTER_NAME} not found")
# zip the code, ignore known temp files
print(f"Zipping: {code_path.absolute()}")
os.system(f"cd {code_path.name}; zip -r -X ../{code_path.name}.zip . -x '*.ipynb_checkpoints*';")
# copy to S3
s3_code_path = f"{S3_PATH}/{code_path.name}.zip"
print(f"Copying to: {s3_code_path}")
wr.s3.upload(f"{code_path.name}.zip", s3_code_path)
# update cluster
# get current state and use its arguments in update
# -------------------------------------------------------------
print(f"Updating Cluster: {CLUSTER_NAME}")
# ensure the cluster is running
current_state = get_kx_cluster(client, environmentId=ENV_ID, clusterName=CLUSTER_NAME)
status = current_state.get('status', 'UNKNOWN')
if status != 'RUNNING':
sys.exit(f"Cluster: {CLUSTER_NAME} is not in RUNNING, state is {status}")
print(f"State of {CLUSTER_NAME}:")
pp.pprint(current_state)
print(80*'-')
# split s3 URL into bucket and key
s3 = S3Url(s3_code_path)
kwargs = {
'environmentId': ENV_ID,
'clusterName': CLUSTER_NAME,
'code': {
's3Bucket': s3.bucket,
's3Key': s3.key
},
'deploymentConfiguration': {
'deploymentStrategy': deploy_strategy # 'NO_RESTART'|'ROLLING'|'FORCE'
}
}
# copy arguments from current state to keep them unchanged
arg_list = [] #['commandLineArguments', 'initializationScript']
if deploy_strategy != 'NO_RESTART':
arg_list.append('initializationScript')
arg_list.append('commandLineArguments')
# add arguments not changing
for a in arg_list:
if a in current_state:
kwargs[a] = current_state[a]
resp = client.update_kx_cluster_code_configuration( **kwargs)
print("Requested State")
pp.pprint(resp)
print(80*'-')
# wait for the update
if wait:
wait_for_cluster_status(client, environmentId=ENV_ID, clusterName=CLUSTER_NAME, sleep_sec=10, show_wait=True)
# new (updated) state
resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=CLUSTER_NAME)
print(80*'-')
print(f"New Current State")
pp.pprint(resp)