in docker/runtime/doris-compose/command.py [0:0]
def run(self, args):
if not args.NAME:
raise Exception("Need specific not empty cluster name")
for_all = True
add_fdb_num = 0
is_new_cluster = False
try:
cluster = CLUSTER.Cluster.load(args.NAME)
if not cluster.is_cloud:
args.add_ms_num = None
args.add_recycle_num = None
args.ms_id = None
args.recycle_id = None
args.fdb_id = None
if args.fe_id != None or args.be_id != None \
or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \
or args.add_fe_num or args.add_be_num \
or args.add_ms_num or args.add_recycle_num:
for_all = False
except:
# a new cluster
is_new_cluster = True
if not args.IMAGE:
raise Exception("New cluster must specific image") from None
if args.fe_id != None:
args.fe_id = None
LOG.warning(
utils.render_yellow("Ignore --fe-id for new cluster"))
if args.be_id != None:
args.be_id = None
LOG.warning(
utils.render_yellow("Ignore --be-id for new cluster"))
args.fdb_id = None
args.ms_id = None
args.recycle_id = None
if args.add_fe_num is None:
args.add_fe_num = 0 if args.remote_master_fe else 3
if args.add_be_num is None:
args.add_be_num = 3
cloud_store_config = {}
if args.cloud:
add_fdb_num = 1
if not args.add_ms_num:
args.add_ms_num = 1
if not args.add_recycle_num:
args.add_recycle_num = 1
if not args.be_cluster:
args.be_cluster = "compute_cluster"
cloud_store_config = self._get_cloud_store_config()
else:
args.add_ms_num = 0
args.add_recycle_num = 0
if args.remote_master_fe:
if not args.local_network_ip:
args.local_network_ip = utils.get_local_ip()
parts = args.remote_master_fe.split(":")
if len(parts) != 2:
raise Exception(
f"invalid --remote-master-fe-addr {args.remote_master_fe}, should be 'ip:query_port'"
)
if not parts[0]:
args.remote_master_fe = args.local_network_ip + ":" + parts[
1]
if args.cloud:
args.sql_mode_node_mgr = True
cluster = CLUSTER.Cluster.new(
args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
args.be_config, args.ms_config, args.recycle_config,
args.remote_master_fe, args.local_network_ip, args.fe_follower,
args.be_disks, args.be_cluster, args.reg_be, args.coverage_dir,
cloud_store_config, args.sql_mode_node_mgr,
args.be_metaservice_endpoint, args.be_cluster_id)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
if args.be_cluster and cluster.is_cloud:
cluster.be_cluster = args.be_cluster
if cluster.is_cloud:
cluster.fe_follower = args.fe_follower
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id, args.ms_id,
args.recycle_id,
args.fdb_id)
add_fe_ids = []
add_be_ids = []
add_ms_ids = []
add_recycle_ids = []
add_fdb_ids = []
add_type_nums = [
(CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids),
(CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids),
(CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids),
(CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids),
(CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids),
]
if not related_nodes:
related_nodes = []
def do_add_node(node_type, add_num, add_ids):
if not add_num:
return
for i in range(add_num):
node = cluster.add(node_type)
related_nodes.append(node)
add_ids.append(node.id)
for node_type, add_num, add_ids in add_type_nums:
do_add_node(node_type, add_num, add_ids)
if args.IMAGE:
for node in related_nodes:
node.set_image(args.IMAGE)
if for_all:
cluster.set_image(args.IMAGE)
for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB):
node.set_image("foundationdb/foundationdb:{}".format(
args.fdb_version))
cluster.save()
options = []
if not args.start:
options.append("--no-start")
else:
options += ["--remove-orphans"]
if args.detach:
options.append("-d")
if args.force_recreate:
options.append("--force-recreate")
related_node_num = len(related_nodes)
if for_all:
related_node_num = cluster.get_all_nodes_num()
related_nodes = None
output_real_time = args.start and not args.detach
utils.exec_docker_compose_command(cluster.get_compose_file(),
"up",
options,
related_nodes,
output_real_time=output_real_time)
if not args.start:
LOG.info(
utils.render_green(
"Not up cluster cause specific --no-start, related node num {}"
.format(related_node_num)))
else:
LOG.info("Using SQL mode for node management ? {}".format(
cluster.sql_mode_node_mgr))
if cluster.remote_master_fe:
if is_new_cluster:
with open(CLUSTER.get_master_fe_addr_path(cluster.name),
"w") as f:
f.write(cluster.remote_master_fe)
if cluster.is_cloud:
cloud_config = "\n".join([
f"meta_service_endpoint = {cluster.get_meta_server_addr()}",
"deploy_mode = cloud",
f"cluster_id = {CLUSTER.CLUSTER_ID}",
])
# write add conf to remote_master_fe_add.conf, remote fe can send ssh to get this content.
with open(
os.path.join(
CLUSTER.get_status_path(cluster.name),
"remote_master_fe_add.conf"), "w") as f:
f.write(cloud_config)
ans = input(
utils.render_red(
f"\nAdd remote fe {cluster.remote_master_fe} fe.conf with follow config: "
) + "\n\n" + f"{cloud_config}\n\nConfirm ? y/n: ")
if ans != 'y':
LOG.info(
"Up cluster failed due to not confirm write the above config."
)
return
LOG.info("Waiting connect to remote FE...")
expire_ts = time.time() + 3600 * 5
parts = cluster.remote_master_fe.split(":")
fe_ip = parts[0]
fe_port = int(parts[1])
ready = False
while expire_ts > time.time():
if utils.is_socket_avail(fe_ip, fe_port):
ready = True
break
if not ready:
raise Exception(
"Cannot connect to remote master fe: " +
cluster.remote_master_fe)
LOG.info("After connect to remote FE...")
else:
# Wait for FE master to be elected
LOG.info("Waiting for FE master to be elected...")
expire_ts = time.time() + 30
while expire_ts > time.time():
ready = False
db_mgr = database.get_db_mgr(
args.NAME, cluster.get_all_node_net_infos(), False)
for id in add_fe_ids:
fe_state = db_mgr.get_fe(id)
if fe_state is not None and fe_state.alive:
ready = True
break
if ready:
break
LOG.info("there is no fe ready")
time.sleep(1)
LOG.info("after Waiting for FE master to be elected...")
if cluster.is_cloud and cluster.sql_mode_node_mgr:
db_mgr = database.get_db_mgr(args.NAME,
cluster.get_all_node_net_infos(),
False)
master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
cluster.name, True)
# Add FEs except master_fe
for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
fe_querypoint = f"{fe.get_ip()}:{fe.meta['ports']['query_port']}"
fe_endpoint = f"{fe.get_ip()}:{fe.meta['ports']['edit_log_port']}"
if fe_querypoint != master_fe_endpoint:
try:
db_mgr.add_fe(
fe_endpoint, "FOLLOWER"
if cluster.fe_follower else "OBSERVER")
LOG.info(f"Added FE {fe_endpoint} successfully.")
except Exception as e:
LOG.error(
f"Failed to add FE {fe_endpoint}: {str(e)}")
# Add BEs
for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
be_endpoint = f"{be.get_ip()}:{be.meta['ports']['heartbeat_service_port']}"
try:
db_mgr.add_be(be_endpoint)
LOG.info(f"Added BE {be_endpoint} successfully.")
except Exception as e:
LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
if is_new_cluster:
cloud_store_config = self._get_cloud_store_config()
db_mgr.create_default_storage_vault(cloud_store_config)
if not cluster.is_host_network():
wait_ready_service(args.wait_timeout, cluster, add_fe_ids,
add_be_ids)
LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
LOG.info(
"Master fe query address: " +
utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
"\n")
return {
"fe": {
"add_list": add_fe_ids,
},
"be": {
"add_list": add_be_ids,
},
"ms": {
"add_list": add_ms_ids,
},
"recycle": {
"add_list": add_recycle_ids,
},
"fdb": {
"add_list": add_fdb_ids,
},
}