def run()

in docker/runtime/doris-compose/command.py [0:0]


    def run(self, args):
        if not args.NAME:
            raise Exception("Need specific not empty cluster name")
        for_all = True
        add_fdb_num = 0
        is_new_cluster = False
        try:
            cluster = CLUSTER.Cluster.load(args.NAME)

            if not cluster.is_cloud:
                args.add_ms_num = None
                args.add_recycle_num = None
                args.ms_id = None
                args.recycle_id = None
                args.fdb_id = None

            if args.fe_id != None or args.be_id != None \
                or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \
                or args.add_fe_num or args.add_be_num \
                or args.add_ms_num or args.add_recycle_num:
                for_all = False
        except:
            # a new cluster
            is_new_cluster = True
            if not args.IMAGE:
                raise Exception("New cluster must specific image") from None
            if args.fe_id != None:
                args.fe_id = None
                LOG.warning(
                    utils.render_yellow("Ignore --fe-id for new cluster"))
            if args.be_id != None:
                args.be_id = None
                LOG.warning(
                    utils.render_yellow("Ignore --be-id for new cluster"))

            args.fdb_id = None
            args.ms_id = None
            args.recycle_id = None

            if args.add_fe_num is None:
                args.add_fe_num = 0 if args.remote_master_fe else 3
            if args.add_be_num is None:
                args.add_be_num = 3

            cloud_store_config = {}
            if args.cloud:
                add_fdb_num = 1
                if not args.add_ms_num:
                    args.add_ms_num = 1
                if not args.add_recycle_num:
                    args.add_recycle_num = 1
                if not args.be_cluster:
                    args.be_cluster = "compute_cluster"
                cloud_store_config = self._get_cloud_store_config()
            else:
                args.add_ms_num = 0
                args.add_recycle_num = 0

            if args.remote_master_fe:
                if not args.local_network_ip:
                    args.local_network_ip = utils.get_local_ip()
                parts = args.remote_master_fe.split(":")
                if len(parts) != 2:
                    raise Exception(
                        f"invalid --remote-master-fe-addr {args.remote_master_fe}, should be 'ip:query_port'"
                    )
                if not parts[0]:
                    args.remote_master_fe = args.local_network_ip + ":" + parts[
                        1]
                if args.cloud:
                    args.sql_mode_node_mgr = True

            cluster = CLUSTER.Cluster.new(
                args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
                args.be_config, args.ms_config, args.recycle_config,
                args.remote_master_fe, args.local_network_ip, args.fe_follower,
                args.be_disks, args.be_cluster, args.reg_be, args.coverage_dir,
                cloud_store_config, args.sql_mode_node_mgr,
                args.be_metaservice_endpoint, args.be_cluster_id)
            LOG.info("Create new cluster {} succ, cluster path is {}".format(
                args.NAME, cluster.get_path()))

        if args.be_cluster and cluster.is_cloud:
            cluster.be_cluster = args.be_cluster

        if cluster.is_cloud:
            cluster.fe_follower = args.fe_follower

        _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
                                                    args.be_id, args.ms_id,
                                                    args.recycle_id,
                                                    args.fdb_id)
        add_fe_ids = []
        add_be_ids = []
        add_ms_ids = []
        add_recycle_ids = []
        add_fdb_ids = []

        add_type_nums = [
            (CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids),
            (CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids),
            (CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids),
            (CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids),
            (CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids),
        ]

        if not related_nodes:
            related_nodes = []

        def do_add_node(node_type, add_num, add_ids):
            if not add_num:
                return
            for i in range(add_num):
                node = cluster.add(node_type)
                related_nodes.append(node)
                add_ids.append(node.id)

        for node_type, add_num, add_ids in add_type_nums:
            do_add_node(node_type, add_num, add_ids)

        if args.IMAGE:
            for node in related_nodes:
                node.set_image(args.IMAGE)
            if for_all:
                cluster.set_image(args.IMAGE)

        for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB):
            node.set_image("foundationdb/foundationdb:{}".format(
                args.fdb_version))

        cluster.save()

        options = []
        if not args.start:
            options.append("--no-start")
        else:
            options += ["--remove-orphans"]
            if args.detach:
                options.append("-d")
            if args.force_recreate:
                options.append("--force-recreate")

        related_node_num = len(related_nodes)
        if for_all:
            related_node_num = cluster.get_all_nodes_num()
            related_nodes = None

        output_real_time = args.start and not args.detach
        utils.exec_docker_compose_command(cluster.get_compose_file(),
                                          "up",
                                          options,
                                          related_nodes,
                                          output_real_time=output_real_time)

        if not args.start:
            LOG.info(
                utils.render_green(
                    "Not up cluster cause specific --no-start, related node num {}"
                    .format(related_node_num)))
        else:
            LOG.info("Using SQL mode for node management ? {}".format(
                cluster.sql_mode_node_mgr))

            if cluster.remote_master_fe:
                if is_new_cluster:
                    with open(CLUSTER.get_master_fe_addr_path(cluster.name),
                              "w") as f:
                        f.write(cluster.remote_master_fe)
                    if cluster.is_cloud:
                        cloud_config = "\n".join([
                            f"meta_service_endpoint = {cluster.get_meta_server_addr()}",
                            "deploy_mode = cloud",
                            f"cluster_id = {CLUSTER.CLUSTER_ID}",
                        ])
                        # write add conf to remote_master_fe_add.conf, remote fe can send ssh to get this content.
                        with open(
                                os.path.join(
                                    CLUSTER.get_status_path(cluster.name),
                                    "remote_master_fe_add.conf"), "w") as f:
                            f.write(cloud_config)
                        ans = input(
                            utils.render_red(
                                f"\nAdd remote fe {cluster.remote_master_fe} fe.conf with follow config: "
                            ) + "\n\n" + f"{cloud_config}\n\nConfirm ?  y/n: ")
                        if ans != 'y':
                            LOG.info(
                                "Up cluster failed due to not confirm write the above config."
                            )
                            return

                        LOG.info("Waiting connect to remote FE...")
                        expire_ts = time.time() + 3600 * 5
                        parts = cluster.remote_master_fe.split(":")
                        fe_ip = parts[0]
                        fe_port = int(parts[1])
                        ready = False
                        while expire_ts > time.time():
                            if utils.is_socket_avail(fe_ip, fe_port):
                                ready = True
                                break
                        if not ready:
                            raise Exception(
                                "Cannot connect to remote master fe: " +
                                cluster.remote_master_fe)

                        LOG.info("After connect to remote FE...")
            else:
                # Wait for FE master to be elected
                LOG.info("Waiting for FE master to be elected...")
                expire_ts = time.time() + 30
                while expire_ts > time.time():
                    ready = False
                    db_mgr = database.get_db_mgr(
                        args.NAME, cluster.get_all_node_net_infos(), False)
                    for id in add_fe_ids:
                        fe_state = db_mgr.get_fe(id)
                        if fe_state is not None and fe_state.alive:
                            ready = True
                            break
                    if ready:
                        break
                    LOG.info("there is no fe ready")
                    time.sleep(1)
                LOG.info("after Waiting for FE master to be elected...")
            if cluster.is_cloud and cluster.sql_mode_node_mgr:
                db_mgr = database.get_db_mgr(args.NAME,
                                             cluster.get_all_node_net_infos(),
                                             False)
                master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
                    cluster.name, True)
                # Add FEs except master_fe
                for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
                    fe_querypoint = f"{fe.get_ip()}:{fe.meta['ports']['query_port']}"
                    fe_endpoint = f"{fe.get_ip()}:{fe.meta['ports']['edit_log_port']}"
                    if fe_querypoint != master_fe_endpoint:
                        try:
                            db_mgr.add_fe(
                                fe_endpoint, "FOLLOWER"
                                if cluster.fe_follower else "OBSERVER")
                            LOG.info(f"Added FE {fe_endpoint} successfully.")
                        except Exception as e:
                            LOG.error(
                                f"Failed to add FE {fe_endpoint}: {str(e)}")

                # Add BEs
                for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
                    be_endpoint = f"{be.get_ip()}:{be.meta['ports']['heartbeat_service_port']}"
                    try:
                        db_mgr.add_be(be_endpoint)
                        LOG.info(f"Added BE {be_endpoint} successfully.")
                    except Exception as e:
                        LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}")
                if is_new_cluster:
                    cloud_store_config = self._get_cloud_store_config()
                    db_mgr.create_default_storage_vault(cloud_store_config)

            if not cluster.is_host_network():
                wait_ready_service(args.wait_timeout, cluster, add_fe_ids,
                                   add_be_ids)
            LOG.info(
                utils.render_green(
                    "Up cluster {} succ, related node num {}".format(
                        args.NAME, related_node_num)))

        ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
        LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
        LOG.info(
            "Master fe query address: " +
            utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
            "\n")
        return {
            "fe": {
                "add_list": add_fe_ids,
            },
            "be": {
                "add_list": add_be_ids,
            },
            "ms": {
                "add_list": add_ms_ids,
            },
            "recycle": {
                "add_list": add_recycle_ids,
            },
            "fdb": {
                "add_list": add_fdb_ids,
            },
        }