in src/ClusterBootstrap/deploy.py [0:0]
def run_command(args, command, nargs, parser):
# If necessary, show parsed arguments.
global discoverserver
global homeinserver
global verbose
global config
global ipAddrMetaname
global nocache
sshtempfile = ""
nocache = args.nocache
discoverserver = args.discoverserver
homeinserver = args.homeinserver
if args.verbose:
verbose = True
utils.verbose = True
print("Args = {0}".format(args))
if command == "restore":
utils.restore_keys(nargs)
# Stop parsing additional command
exit()
elif command == "restorefromdir":
utils.restore_keys_from_dir(nargs)
exit()
elif command == "mapold2cld":
map_old_config_to_cloud(nargs)
exit()
# Cluster Config
config_cluster = os.path.join(dirpath, "cluster.yaml")
if os.path.exists(config_cluster):
merge_config(config, yaml.full_load(open(config_cluster)))
config_file = os.path.join(dirpath, "config.yaml")
if not os.path.exists(config_file):
parser.print_help()
print("ERROR: config.yaml does not exist!")
exit()
with open(config_file) as f:
merge_config(config, yaml.full_load(f))
if os.path.exists("./deploy/clusterID.yml"):
with open("./deploy/clusterID.yml") as f:
tmp = yaml.full_load(f)
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]
if "copy_sshtemp" in config and config["copy_sshtemp"]:
if "ssh_origfile" not in config:
config["ssh_origfile"] = config["ssh_cert"]
sshfile = os.path.join(dirpath, config["ssh_origfile"])
if os.path.exists(sshfile):
_, sshtempfile = tempfile.mkstemp(dir='/tmp')
if verbose:
print("SSH file is now {0}".format(sshtempfile))
with open(sshtempfile, 'wb') as output:
with open(sshfile, 'rb') as input:
output.write(input.read())
config["ssh_cert"] = sshtempfile
else:
print("SSH Key {0} not found using original".format(sshfile))
if os.path.exists("./deploy/clusterID.yml"):
update_config()
else:
apply_config_mapping(config, default_config_mapping)
update_docker_image_config()
get_ssh_config()
configuration(config, verbose)
if args.yes:
global defanswer
print("Use yes for default answer")
defanswer = "yes"
if args.public:
ipAddrMetaname = "clientIP"
if verbose:
print("deploy " + command + " " + (" ".join(nargs)))
print("PlatformScripts = {0}".format(config["platform-scripts"]))
if command == "restore":
# Second part of restore, after config has been read.
bForce = args.force if args.force is not None else False
get_kubectl_binary(force=args.force)
exit()
if command == "clean":
clean_deployment()
exit()
elif command == "sleep":
sleeptime = 10 if len(nargs) < 1 else int(nargs[0])
print("Sleep for %s sec ... " % sleeptime)
for si in range(sleeptime):
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
elif command == "connect":
check_master_ETCD_status()
role2connect = nargs[0]
if len(nargs) < 1 or role2connect == "master":
nodes = config["kubernetes_master_node"]
elif role2connect in ["etcd", "worker", "nfs", "samba", "mysqlserver", "elasticsearch"]:
nodes = config["{}_node".format(role2connect)]
else:
parser.print_help()
print("ERROR: must connect to either master, etcd, nfs or worker nodes")
exit()
if len(nodes) == 0:
parser.print_help()
print("ERROR: cannot find any node of the type to connect to")
exit()
num = 0
nodename = None
if len(nargs) >= 2:
try:
num = int(nargs[1])
if num < 0 or num >= len(nodes):
num = 0
except ValueError:
nodename = get_node_full_name(nargs[1])
if nodename is None:
nodename = nodes[num]
utils.SSH_connect(config["ssh_cert"],
config["admin_username"], nodename)
exit()
elif command == "deploy" and "clusterId" in config:
deploy_ETCD_master(force=args.force)
utils.render_template("./template/kubeconfig/kubeconfig.yaml.template",
"deploy/kubeconfig/kubeconfig.yaml", config)
elif command == "nfs-server":
if len(nargs) > 0:
if nargs[0] == "create":
set_nfs_disk()
else:
print(
"Error: subcommand %s is not recognized for nfs-server. " % nargs[0])
exit()
else:
print("Error: nfs-server need a subcommand (create) " % nargs[0])
exit()
elif command == "build":
configuration(config, verbose)
if len(nargs) <= 0:
init_deployment()
elif nargs[0] == "iso-coreos":
create_ISO()
elif nargs[0] == "pxe-coreos":
create_PXE()
elif nargs[0] == "pxe-ubuntu":
create_PXE_ubuntu()
else:
parser.print_help()
print("Error: build target %s is not recognized. " % nargs[0])
exit()
elif command == "dnssetup":
os.system("./gene_loc_dns.sh")
nodes = get_nodes(config["clusterId"])
run_script_on_all(nodes, "./scripts/dns.sh", sudo=args.sudo)
elif command == "sshkey":
if len(nargs) >= 1 and nargs[0] == "install":
install_ssh_key(nargs[1:])
else:
parser.print_help()
print("Error: build target %s is not recognized. " % nargs[0])
exit()
elif command == "scan":
if len(nargs) == 1:
utils.scan_nodes(config["ssh_cert"],
config["admin_username"], nargs[0])
else:
parser.print_help()
print("Error: scan need one parameter with format x.x.x.x/n. ")
exit()
elif command == "admin":
if len(nargs) >= 1:
if nargs[0] == "vc":
if len(nargs) >= 2:
if nargs[1] == "add":
url = "http://%s:%s/AddVC?vcName=%s"a=%s&metadata=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4])
response = requests.get(url)
print(response)
elif nargs[1] == "update":
url = "http://%s:%s/UpdateVC?vcName=%s"a=%s&metadata=%s&userName=Administrator" \
% (config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4])
response = requests.get(url)
print(response)
elif nargs[1] == "delete":
url = "http://%s:%s/DeleteVC?vcName=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2])
response = requests.get(url)
print(response)
elif nargs[1] == "list":
url = "http://%s:%s/ListVCs?userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"])
response = requests.get(url)
print(response.text)
elif nargs[0] == "storage":
if len(nargs) >= 2:
if nargs[1] == "add":
url = "http://%s:%s/AddStorage?vcName=%s&url=%s&storageType=%s&metadata=%s&defaultMountPath=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4], nargs[5], nargs[6])
response = requests.get(url)
print(response)
elif nargs[1] == "update":
url = "http://%s:%s/UpdateStorage?vcName=%s&url=%s&storageType=%s&metadata=%s&defaultMountPath=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4], nargs[5], nargs[6])
response = requests.get(url)
print(response)
elif nargs[1] == "delete":
url = "http://%s:%s/DeleteStorage?vcName=%s&url=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3])
response = requests.get(url)
print(response)
elif nargs[1] == "list":
url = "http://%s:%s/ListStorages?vcName=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2])
response = requests.get(url)
print(response.text)
elif nargs[0] == "acl":
if len(nargs) >= 2:
if nargs[1] == "update":
url = "http://%s:%s/UpdateAce?identityName=%s&resourceType=%s&resourceName=%s&permissions=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4], nargs[5])
response = requests.get(url)
print(response)
elif nargs[1] == "list":
url = "http://%s:%s/GetACL?userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"])
response = requests.get(url)
print(response.text)
elif nargs[1] == "delete":
url = "http://%s:%s/DeleteAce?identityName=%s&resourceType=%s&resourceName=%s&userName=Administrator" % (
config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4])
response = requests.get(url)
print(response.text)
elif nargs[0] == "job":
if len(nargs) >= 2:
if nargs[1] == "add":
url = "http://%s:%s/SubmitJob?jobName=%s&vcName=%s&resourcegpu=%s&gpuType=%s&dataPath=%s&workPath=%s&image=%s&jobType=%s&preemptionAllowed=%s&userName=Administrator" \
% (config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4], nargs[5], nargs[6], nargs[7], nargs[8], nargs[9], nargs[10])
response = requests.get(url)
print(response.text)
elif nargs[1] == "delete":
url = "http://%s:%s/KillJob?jobId=%s&userName=Administrator" \
% (config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2])
response = requests.get(url)
print(response.text)
elif nargs[1] == "list":
url = "http://%s:%s/ListJobs?vcName=%s&jobOwner=%s&num=%s&userName=Administrator" \
% (config["kubernetes_master_node"][0], config["restfulapiport"], nargs[2], nargs[3], nargs[4])
response = requests.get(url)
print(response.text)
elif command == "packcloudinit":
gen_configs()
render_and_pack_worker_cloud_init_files()
elif command == "updateworker":
response = raw_input_with_default("Deploy Worker Nodes (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_worker_nodes(nargs)
elif command == "updateworkerinparallel":
response = raw_input_with_default(
"Deploy Worker Nodes In Parallel (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_worker_nodes_in_parallel(nargs)
elif command == "updatescaledworker":
response = raw_input_with_default("Deploy Scaled Worker Nodes (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_scaled_worker_nodes(nargs)
elif command == "resetworker":
response = raw_input_with_default("Deploy Worker Nodes (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
reset_worker_nodes()
elif command == "updatemysqlserver":
response = raw_input_with_default("Deploy MySQLServer Node(s) (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_mysqlserver_nodes(nargs)
elif command == "updateelasticsearch":
response = raw_input_with_default("Deploy Elasticsearch Node(s) (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_elasticsearch_nodes(nargs)
elif command == "updatenfs":
response = raw_input_with_default("Deploy NFS Node(s) (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_nfs_nodes(nargs)
elif command == "deploynfsconfig":
deploy_nfs_config()
elif command == "deployrepairmanagerconfig":
deploy_repairmanager_config()
elif command == "listmac":
nodes = get_nodes(config["clusterId"])
for node in nodes:
utils.get_mac_address(
config["ssh_cert"], config["admin_username"], node)
elif command == "cordon":
cordon(config, args)
elif command == "uncordon":
uncordon(config, args)
elif command == "checkconfig":
for k, v in config.items():
print(str(k)+":"+str(v))
elif command == "hostname" and len(nargs) >= 1:
if nargs[0] == "set":
set_host_names_by_lookup()
else:
parser.print_help()
print("Error: hostname with unknown subcommand")
exit()
elif command == "freeflow" and len(nargs) >= 1:
if nargs[0] == "set":
set_freeflow_router()
else:
parser.print_help()
print("Error: hostname with unknown subcommand")
exit()
elif command == "cleanworker":
response = raw_input_with_default("Clean and Stop Worker Nodes (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
clean_worker_nodes()
elif command == "partition" and len(nargs) >= 1:
nodes = get_nodes(config["clusterId"])
if nargs[0] == "ls":
# Display parititons.
print("Show partition on data disk: " + config["data-disk"])
nodesinfo = show_partitions(nodes, config["data-disk"])
elif nargs[0] == "create":
partsInfo = config["partition-configuration"]
if len(nargs) >= 2:
partsInfo = nargs[1:]
partsInfo = list(map(float, partsInfo))
if len(partsInfo) == 1 and partsInfo[0] == 0:
print("0 partitions, use the disk as is, do not partition")
elif len(partsInfo) == 1 and partsInfo[0] < 30:
partsInfo = [100.0]*int(partsInfo[0])
nodesinfo = show_partitions(nodes, config["data-disk"])
print("This operation will DELETE all existing partitions and repartition all data drives on the %d nodes to %d partitions of %s" % (
len(nodes), len(partsInfo), str(partsInfo)))
response = input(
"Please type (REPARTITION) in ALL CAPITALS to confirm the operation ---> ")
if response == "REPARTITION":
repartition_nodes(nodes, nodesinfo, partsInfo)
else:
print("Repartition operation aborted....")
else:
parser.print_help()
exit()
elif command == "doonall" and len(nargs) >= 1:
nodes = get_nodes(config["clusterId"])
exec_on_all(nodes, nargs)
elif command == "execonall" and len(nargs) >= 1:
nodes = get_nodes(config["clusterId"])
print("Exec on all: " + str(nodes))
exec_on_all_with_output(nodes, nargs)
elif command == "runscriptonall" and len(nargs) >= 1:
nodes = get_nodes(config["clusterId"])
run_script_on_all(nodes, nargs, sudo=args.sudo)
elif command == "runscriptonallinparallel" and len(nargs) >= 1:
nodes = get_nodes(config["clusterId"])
run_script_on_all_in_parallel(nodes, nargs, sudo=args.sudo)
elif command == "runscriptonroles":
assert len(nargs) >= 1
nodeset, scripts_start = [], 0
for ni, arg in enumerate(nargs):
scripts_start = ni
if arg in config["allroles"]:
nodeset += arg,
else:
break
nodes = get_nodes_by_roles(nodeset)
run_script_on_all_in_parallel(
nodes, nargs[scripts_start:], sudo=args.sudo)
elif command == "runscriptonrandmaster" and len(nargs) >= 1:
run_script_on_rand_master(nargs, args)
elif command == "runscriptonscaleup" and len(nargs) >= 1:
nodes = get_scaled_nodes(config["clusterId"])
run_script_on_all(nodes, nargs, sudo=args.sudo)
elif command == "copytoall" and len(nargs) >= 1:
nodes = get_nodes(config["clusterId"])
print("Copy all from: {0} to: {1}".format(nargs[0], nargs[1]))
copy_to_all(nodes, nargs[0], nargs[1])
elif command == "cleanmasteretcd":
response = input("Clean and Stop Master/ETCD Nodes (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
clean_master()
clean_etcd()
elif command == "updatereport":
response = raw_input_with_default(
"Deploy IP Reporting Service on Master and ETCD nodes (y/n)?")
if first_char(response) == "y":
check_master_ETCD_status()
gen_configs()
update_reporting_service()
elif command == "display" or command == "clusterinfo":
configuration(config, verbose)
configuration(config, verbose)
check_master_ETCD_status()
elif command == "webui":
check_master_ETCD_status()
gen_configs()
deploy_webUI()
elif command == "mount":
if len(nargs) <= 0:
fileshare_install()
allmountpoints = mount_fileshares_by_service(True)
if args.force:
print("forced to re-link fileshares")
link_fileshares(allmountpoints, args.force)
elif nargs[0] == "install":
fileshare_install()
elif nargs[0] == "start":
allmountpoints = mount_fileshares_by_service(True)
link_fileshares(allmountpoints, args.force)
elif nargs[0] == "stop":
unmount_fileshares_by_service(False)
elif nargs[0] == "clean":
print("This operation will CLEAN local content in the physical mount point, and may erase the data on those locations. ")
response = input(
"Please type (CLEAN) in ALL CAPITALS to confirm the operation ---> ")
if response == "CLEAN":
unmount_fileshares_by_service(True)
elif nargs[0] == "nolink":
mount_fileshares_by_service(True)
elif nargs[0] == "link":
all_nodes = get_nodes(config["clusterId"])
allmountpoints, fstab = get_mount_fileshares()
link_fileshares(allmountpoints, args.force)
else:
parser.print_help()
print("Error: mount subcommand %s is not recognized " % nargs[0])
elif command == "labelwebui":
label_webUI(nargs[0])
elif command == "production":
set_host_names_by_lookup()
success = deploy_ETCD_master()
if success:
update_worker_nodes([])
elif command == "azure":
config["WinbindServers"] = []
run_script_blocks(args.verbose, scriptblocks["azure"])
elif command == "jobendpt":
print(get_jobendpt(nargs[0]))
elif command == "update" and len(nargs) >= 1:
if nargs[0] == "config":
update_config_nodes()
elif command == "kubectl":
run_kubectl(nargs)
elif command == "kubernetes":
configuration(config, verbose)
if len(nargs) >= 1:
if len(nargs) >= 2:
servicenames = nargs[1:]
else:
allservices = get_all_services()
servicenames = []
for service in allservices:
servicenames.append(service)
configuration(config, verbose)
if nargs[0] == "start":
# Start a kubelet service.
for servicename in servicenames:
start_kube_service(servicename)
elif nargs[0] == "stop":
# stop a kubelet service.
for servicename in servicenames:
stop_kube_service(servicename)
elif nargs[0] == "restart":
# restart a kubelet service.
for servicename in servicenames:
replace_kube_service(servicename)
elif nargs[0] == "labels":
if len(nargs) >= 2 and (nargs[1] == "active" or nargs[1] == "inactive" or nargs[1] == "remove"):
kubernetes_label_nodes(nargs[1], nargs[2:], args.yes)
elif len(nargs) == 1:
kubernetes_label_nodes("active", [], args.yes)
else:
parser.print_help()
print(
"Error: kubernetes labels expect a verb which is either active, inactive or remove, but get: %s" % nargs[1])
elif nargs[0] == "patchprovider":
# TODO(harry): read a tag to decide which tools we are using, so we don't need nargs[1]
if len(nargs) >= 2 and (nargs[1] == "aztools" or nargs[1] == "gstools" or nargs[1] == "awstools"):
if len(nargs) == 3:
kubernetes_patch_nodes_provider(nargs[1], nargs[2])
else:
kubernetes_patch_nodes_provider(nargs[1], False)
else:
print(
"Error: kubernetes patchprovider expect a verb which is either aztools, gstools or awstools.")
elif nargs[0] == "mark":
kubernetes_mark_nodes(nargs[1:], True)
elif nargs[0] == "unmark":
kubernetes_mark_nodes(nargs[1:], False)
elif nargs[0] == "cordon" or nargs[0] == "uncordon":
run_kube_command_on_nodes(nargs)
elif nargs[0] == "labelvc":
kubernetes_label_vc(True)
else:
parser.print_help()
print("Error: Unknown kubernetes subcommand " + nargs[0])
else:
parser.print_help()
print("Error: kubernetes need a subcommand.")
exit()
elif command == "gpulabel":
kubernetes_label_GpuTypes()
elif command == "labelsku":
kubernetes_label_sku()
elif command == "labelvc":
kubernetes_label_vc()
elif command == "genscripts":
gen_platform_wise_config()
gen_dns_config_script()
gen_pass_secret_script()
gen_warm_up_cluster_script()
elif command == "setconfigmap":
os.system('./deploy/bin/kubectl create configmap dlws-scripts --from-file=../Jobs_Templete -o yaml --dry-run | ./deploy.py kubectl apply -f -')
elif command == "download":
if len(nargs) >= 1:
if nargs[0] == "kubectl" or nargs[0] == "kubelet":
os.system("rm ./deploy/bin/*")
get_kubectl_binary()
else:
parser.print_help()
print("Error: unrecognized etcd subcommand.")
exit()
else:
get_kubectl_binary()
elif command == "etcd":
if len(nargs) >= 1:
if nargs[0] == "check":
get_ETCD_master_nodes(config["clusterId"])
check_etcd_service()
else:
parser.print_help()
print("Error: unrecognized etcd subcommand.")
exit()
else:
parser.print_help()
print("Error: etcd need a subcommand.")
exit()
elif command == "backup":
utils.backup_keys(config["cluster_name"], nargs)
elif command == "backuptodir":
utils.backup_keys_to_dir(nargs)
elif command == "nginx":
if len(nargs) >= 1:
configuration(config, verbose)
if nargs[0] == "config":
config_nginx()
if nargs[0] == "fqdn":
config_fqdn()
elif command == "docker":
if len(nargs) >= 1:
configuration(config, verbose)
if nargs[0] == "build":
check_buildable_images(nargs[1:])
build_docker_images(nargs[1:])
elif nargs[0] == "push":
check_buildable_images(nargs[1:])
push_docker_images(nargs[1:])
elif nargs[0] == "run":
if len(nargs) >= 2:
run_docker_image(nargs[1], args.native, sudo=args.sudo)
else:
parser.print_help()
print("Error: docker run expects an image name ")
else:
parser.print_help()
print("Error: unkown subcommand %s for docker." % nargs[0])
exit()
else:
parser.print_help()
print("Error: docker needs a subcommand")
exit()
elif command == "rendertemplate":
if len(nargs) != 2:
parser.print_help()
exit()
configuration(config, verbose)
template_file = nargs[0]
target_file = nargs[1]
utils.render_template(template_file, target_file, config)
elif command == "upgrade_masters":
gen_configs()
upgrade_masters()
elif command == "upgrade_workers":
gen_configs()
upgrade_workers(nargs)
elif command == "upgrade":
gen_configs()
upgrade_masters()
upgrade_workers(nargs)
elif command in scriptblocks:
run_script_blocks(args.verbose, scriptblocks[command])
else:
parser.print_help()
print("Error: Unknown command " + command)
if os.path.exists(sshtempfile):
print("Removing temp SSH file {0}".format(sshtempfile))
os.remove(sshtempfile)