# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import configparser
import filelock
import getpass
import hashlib
import jsonpickle
import os
import os.path
import utils
import time

DOCKER_DORIS_PATH = "/opt/apache-doris"
LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")

# an integer between 128 and 191, generally no need to set
DORIS_SUBNET_START = os.getenv("DORIS_SUBNET_START")

LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                   "resource")
DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
CLOUD_CFG_FILE = os.getenv("DORIS_CLOUD_CFG_FILE",
                           os.path.join(LOCAL_DORIS_PATH, 'cloud.ini'))

FE_HTTP_PORT = 8030
FE_RPC_PORT = 9020
FE_QUERY_PORT = 9030
FE_EDITLOG_PORT = 9010
FE_JAVA_DBG_PORT = 5005
FE_ARROW_FLIGHT_SQL_PORT = 8070

BE_PORT = 9060
BE_WEBSVR_PORT = 8040
BE_HEARTBEAT_PORT = 9050
BE_BRPC_PORT = 8060
BE_ARROW_FLIGHT_SQL_PORT = 8050

FDB_PORT = 4500

MS_PORT = 5000

ID_LIMIT = 10000

IP_PART4_SIZE = 200

CLUSTER_ID = "12345678"

LOG = utils.get_logger()


def get_cluster_path(cluster_name):
    return os.path.join(LOCAL_DORIS_PATH, cluster_name)


def get_node_name(node_type, id):
    return "{}-{}".format(node_type, id)


def get_node_path(cluster_name, node_type, id):
    return os.path.join(get_cluster_path(cluster_name),
                        get_node_name(node_type, id))


def get_compose_file(cluster_name):
    return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")


def get_status_path(cluster_name):
    return os.path.join(get_cluster_path(cluster_name), "status")


def get_master_fe_addr_path(cluster_name):
    return get_status_path(cluster_name) + "/master_fe_query_addr"


def get_all_cluster_names():
    if not os.path.exists(LOCAL_DORIS_PATH):
        return []
    else:
        return [
            subdir for subdir in os.listdir(LOCAL_DORIS_PATH)
            if os.path.isdir(os.path.join(LOCAL_DORIS_PATH, subdir))
        ]


def gen_subnet_prefix16():
    used_subnet = utils.get_docker_subnets_prefix16()
    for cluster_name in get_all_cluster_names():
        try:
            cluster = Cluster.load(cluster_name)
            used_subnet[cluster.subnet] = True
        except:
            pass

    subnet_begin = 128
    subnet_end = 192

    subnet_part_1 = None
    subnet_part_2 = None
    if DORIS_SUBNET_START:
        subnet_part_1 = int(DORIS_SUBNET_START)
        subnet_part_2 = 0
    else:
        m = hashlib.md5()
        m.update(getpass.getuser().encode("utf-8"))
        hash_val = int(m.hexdigest(), 16)
        # want subnet part ii to be a small num, just less than 100, so don't use 256 here.
        small_width = 100
        slot_num = (subnet_end - subnet_begin) * small_width
        idx = hash_val % slot_num
        if idx < 0:
            idx += slot_num
        subnet_part_1 = subnet_begin + int(idx / small_width)
        subnet_part_2 = idx % small_width

    intervals = [
        [(subnet_part_1, subnet_part_1 + 1), (subnet_part_2, 256)],
        [(subnet_part_1 + 1, subnet_end), (0, 256)],
        [(subnet_begin, subnet_part_1), (0, 256)],
        [(subnet_part_1, subnet_part_1 + 1), (0, subnet_part_2)],
    ]
    for interval in intervals:
        for i in range(interval[0][0], interval[0][1]):
            for j in range(interval[1][0], interval[1][1]):
                subnet = "{}.{}".format(i, j)
                if not used_subnet.get(subnet, False):
                    return subnet

    raise Exception("Failed to gen subnet")


def get_master_fe_endpoint(cluster_name, wait_master_fe_query_addr_file=False):
    cluster_path = get_cluster_path(cluster_name)
    if os.path.exists(cluster_path):
        master_fe_query_addr_file = get_master_fe_addr_path(cluster_name)
        max_retries = 10 if wait_master_fe_query_addr_file else 0
        i = 0
        while True:
            if os.path.exists(master_fe_query_addr_file):
                with open(master_fe_query_addr_file, "r") as f:
                    return f.read().strip()
            i += 1
            if i < max_retries:
                time.sleep(1)
            else:
                break
    try:
        cluster = Cluster.load(cluster_name)
        LOG.info("master file not exist, master ip get from node 1")
        if cluster.is_host_network():
            return cluster.remote_master_fe
        else:
            master_fe = cluster.get_node(Node.TYPE_FE, 1)
            return "{}:{}".format(master_fe.get_ip(),
                                  master_fe.meta["ports"]["query_port"])
    except:
        return ""


def get_node_seq(node_type, id):
    seq = id
    seq += IP_PART4_SIZE
    if node_type == Node.TYPE_FE:
        seq += 0 * ID_LIMIT
    elif node_type == Node.TYPE_BE:
        seq += 1 * ID_LIMIT
    elif node_type == Node.TYPE_MS:
        seq += 2 * ID_LIMIT
    elif node_type == Node.TYPE_RECYCLE:
        seq += 3 * ID_LIMIT
    elif node_type == Node.TYPE_FDB:
        seq += 4 * ID_LIMIT
    else:
        seq += 5 * ID_LIMIT
    return seq


class Group(object):

    def __init__(self, node_type):
        self.node_type = node_type
        self.nodes = {}  # id : NodeMeta
        self.next_id = 1

    def add(self, id, node_meta):
        assert node_meta["image"]
        if not id:
            id = self.next_id
            self.next_id += 1
        if self.get_node(id):
            raise Exception(
                "Failed to add {} with id {}, id has exists".format(
                    self.node_type, id))
        if id > ID_LIMIT:
            raise Exception(
                "Failed to add {} with id {}, id exceeds {}".format(
                    self.node_type, id, ID_LIMIT))
        self.nodes[id] = node_meta

        return id

    def remove(self, id):
        self.nodes.pop(id, None)

    def get_node_num(self):
        return len(self.nodes)

    def get_all_nodes(self):
        return self.nodes

    def get_node(self, id):
        return self.nodes.get(id, None)

    def on_loaded(self):
        nodes = {}
        for id, node in self.nodes.items():
            nodes[int(id)] = node
        self.nodes = nodes


class NodeNetInfo(object):

    def __init__(self, type, id, ip, ports):
        self.type = type
        self.id = id
        self.ip = ip
        self.ports = ports


class Node(object):
    TYPE_FE = "fe"
    TYPE_BE = "be"
    TYPE_MS = "ms"
    TYPE_RECYCLE = "recycle"
    TYPE_FDB = "fdb"
    TYPE_ALL = [TYPE_FE, TYPE_BE, TYPE_MS, TYPE_RECYCLE, TYPE_FDB]

    def __init__(self, cluster, id, meta):
        self.cluster = cluster
        self.id = id
        self.meta = meta

    @staticmethod
    def new(cluster, node_type, id, meta):
        if node_type == Node.TYPE_FE:
            return FE(cluster, id, meta)
        elif node_type == Node.TYPE_BE:
            return BE(cluster, id, meta)
        elif node_type == Node.TYPE_MS:
            return MS(cluster, id, meta)
        elif node_type == Node.TYPE_RECYCLE:
            return RECYCLE(cluster, id, meta)
        elif node_type == Node.TYPE_FDB:
            return FDB(cluster, id, meta)
        else:
            raise Exception("Unknown node type {}".format(node_type))

    # only run once at create the node, later restart or upgrade image will not run
    def init(self):
        self.init_ports()
        self.init_conf()

    def init_ports(self):
        if self.cluster.is_host_network():
            self.meta["ports"] = dict(
                (port_name, utils.get_avail_port())
                for port_name in self.get_default_named_ports().keys())
        else:
            self.meta["ports"] = self.get_default_named_ports()

    def init_conf(self):
        path = self.get_path()
        os.makedirs(path, exist_ok=True)

        # copy config to local
        conf_dir = os.path.join(path, "conf")
        if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
            self.copy_conf_to_local(conf_dir)
            assert not utils.is_dir_empty(conf_dir), "conf directory {} is empty, " \
                    "check doris path in image is correct".format(conf_dir)
            utils.enable_dir_with_rw_perm(conf_dir)
            config = self.get_add_init_config()
            if config:
                with open(os.path.join(conf_dir, self.conf_file_name()),
                          "a") as f:
                    f.write("\n")
                    f.write("#### start doris-compose add config ####\n\n")
                    for item in config:
                        f.write(item + "\n")
                    f.write("\n#### end doris-compose add config ####\n")
        for sub_dir in self.expose_sub_dirs():
            os.makedirs(os.path.join(path, sub_dir), exist_ok=True)

    def copy_conf_to_local(self, local_conf_dir):
        utils.copy_image_directory(self.get_image(),
                                   "{}/conf".format(self.docker_home_dir()),
                                   local_conf_dir)

    def is_fe(self):
        return self.node_type() == Node.TYPE_FE

    def is_be(self):
        return self.node_type() == Node.TYPE_BE

    def conf_file_name(self):
        return self.node_type() + ".conf"

    def node_type(self):
        raise Exception("No implemented")

    def expose_sub_dirs(self):
        return ["conf", "log"]

    def get_name(self):
        return get_node_name(self.node_type(), self.id)

    def get_path(self):
        return get_node_path(self.cluster.name, self.node_type(), self.id)

    def get_image(self):
        return self.meta["image"]

    def set_image(self, image):
        self.meta["image"] = image

    def get_ip(self):
        if self.cluster.is_host_network():
            # this is a remote node
            if self.meta.get("is_remote", False):
                return self.cluster.remote_master_fe.split(":")[0]
            else:
                return self.cluster.local_network_ip
        else:
            seq = get_node_seq(self.node_type(), self.id)
            return "{}.{}.{}".format(self.cluster.subnet,
                                     int(seq / IP_PART4_SIZE),
                                     seq % IP_PART4_SIZE)

    def get_default_named_ports(self):
        # port_name : default_port
        # the port_name come from fe.conf, be.conf, cloud.conf, etc
        return {}

    @staticmethod
    def get_id_from_ip(ip):
        pos2 = ip.rfind(".")
        pos1 = ip.rfind(".", 0, pos2 - 1)
        num3 = int(ip[pos1 + 1:pos2])
        num4 = int(ip[pos2 + 1:])
        seq = num3 * IP_PART4_SIZE + num4
        while seq > ID_LIMIT:
            seq -= ID_LIMIT
        seq -= IP_PART4_SIZE
        return seq

    def service_name(self):
        return utils.with_doris_prefix("{}-{}".format(self.cluster.name,
                                                      self.get_name()))

    def docker_env(self):
        enable_coverage = self.cluster.coverage_dir

        envs = {
            "MY_IP": self.get_ip(),
            "MY_ID": self.id,
            "MY_TYPE": self.node_type(),
            "DORIS_HOME": os.path.join(self.docker_home_dir()),
            "STOP_GRACE": 1 if enable_coverage else 0,
            "IS_CLOUD": 1 if self.cluster.is_cloud else 0,
            "SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
        }

        if self.cluster.is_cloud:
            envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr()

        # run as host user
        if not self.cluster.is_root_user:
            envs["HOST_USER"] = getpass.getuser()
            envs["HOST_UID"] = os.getuid()
            envs["HOST_GID"] = os.getgid()

        if enable_coverage:
            outfile = "{}/coverage/{}-coverage-{}-{}".format(
                DOCKER_DORIS_PATH, self.node_type(), self.cluster.name,
                self.id)
            if self.node_type() == Node.TYPE_FE:
                envs["JACOCO_COVERAGE_OPT"] = "-javaagent:/jacoco/lib/jacocoagent.jar" \
                    "=excludes=org.apache.doris.thrift:org.apache.doris.proto:org.apache.parquet.format" \
                    ":com.aliyun*:com.amazonaws*:org.apache.hadoop.hive.metastore:org.apache.parquet.format," \
                    "output=file,append=true,destfile=" + outfile
            elif self.node_type() == Node.TYPE_BE:
                envs["LLVM_PROFILE_FILE_PREFIX"] = outfile

        return envs

    def entrypoint(self):
        if self.start_script():
            return [
                "bash",
                os.path.join(DOCKER_RESOURCE_PATH, "entrypoint.sh")
            ] + self.start_script()
        else:
            return None

    def get_add_init_config(self):
        cfg = []
        if self.cluster.is_host_network():
            cfg.append(f"priority_networks = {self.cluster.local_network_ip}")
            cfg += [
                f"{port_name} = {port}"
                for port_name, port in self.meta["ports"].items()
            ]
        return cfg

    def docker_ports(self):
        return list(self.get_default_named_ports().values())

    def docker_home_dir(self):
        raise Exception("No implemented")

    def compose(self):
        volumes = [
            "{}:{}/{}".format(os.path.join(self.get_path(), sub_dir),
                              self.docker_home_dir(), sub_dir)
            for sub_dir in self.expose_sub_dirs()
        ] + [
            "{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH),
            "{}:{}/status".format(get_status_path(self.cluster.name),
                                  self.docker_home_dir()),
        ] + [
            "{0}:{0}:ro".format(path)
            for path in ("/etc/localtime", "/etc/timezone",
                         "/usr/share/zoneinfo") if os.path.exists(path)
        ]

        if self.cluster.coverage_dir:
            volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir,
                                                   DOCKER_DORIS_PATH))

        content = {
            "cap_add": ["SYS_PTRACE"],
            "container_name": self.service_name(),
            "environment": self.docker_env(),
            "image": self.get_image(),
            "ulimits": {
                "core": -1
            },
            "security_opt": ["seccomp:unconfined"],
            "volumes": volumes,
        }

        if self.cluster.is_host_network():
            content["network_mode"] = "host"
        else:
            content["hostname"] = self.get_name()
            content["networks"] = {
                utils.with_doris_prefix(self.cluster.name): {
                    "ipv4_address": self.get_ip(),
                }
            }
            content["extra_hosts"] = [
                "{}:{}".format(node.get_name(), node.get_ip())
                for node in self.cluster.get_all_nodes()
            ]
            content["ports"] = self.docker_ports()

        if self.entrypoint():
            content["entrypoint"] = self.entrypoint()

        return content


class FE(Node):

    def init(self):
        # for cloud mode, fe is follower or observer, default is observer
        self.meta[
            "is_cloud_follower"] = self.cluster.is_cloud and self.cluster.fe_follower
        super().init()

    def get_add_init_config(self):
        cfg = super().get_add_init_config()
        if self.cluster.fe_config:
            cfg += self.cluster.fe_config
        if self.cluster.is_cloud:
            cfg += [
                "meta_service_endpoint = {}".format(
                    self.cluster.get_meta_server_addr()),
                "deploy_mode = cloud",
            ]

            if self.cluster.sql_mode_node_mgr:
                cfg += [
                    "cluster_id = " + CLUSTER_ID,
                ]
            else:
                cfg += [
                    "cloud_unique_id = " + self.cloud_unique_id(),
                ]

        with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()),
                  "r") as f:
            java_debug_port = self.meta["ports"]["java_debug_port"]
            parser = configparser.ConfigParser()
            parser.read_string('[dummy_section]\n' + f.read())
            section = parser['dummy_section']
            for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
                value = section.get(key)
                if value:
                    value = value.strip().strip('"')
                    if key == "JAVA_OPTS":
                        # java 8
                        cfg.append(
                            f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address={java_debug_port}\""
                        )
                    else:
                        # JAVA_OPTS_FOR_JDK_17
                        # >= java 9
                        cfg.append(
                            f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{java_debug_port}\""
                        )

        return cfg

    def docker_env(self):
        envs = super().docker_env()
        if self.cluster.is_cloud:
            envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
            if self.meta["is_cloud_follower"]:
                envs["is_fe_follower"] = 1
        envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"]
        envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"]
        return envs

    def get_default_named_ports(self):
        return {
            "http_port": FE_HTTP_PORT,
            "rpc_port": FE_RPC_PORT,
            "query_port": FE_QUERY_PORT,
            "edit_log_port": FE_EDITLOG_PORT,
            "arrow_flight_sql_port": FE_ARROW_FLIGHT_SQL_PORT,
            "java_debug_port": FE_JAVA_DBG_PORT,
        }

    def cloud_unique_id(self):
        return "sql_server_{}".format(self.id)

    def start_script(self):
        return ["init_fe.sh"]

    def docker_home_dir(self):
        return os.path.join(DOCKER_DORIS_PATH, "fe")

    def node_type(self):
        return Node.TYPE_FE

    def expose_sub_dirs(self):
        return super().expose_sub_dirs() + ["doris-meta"]


class BE(Node):

    def init(self):
        super().init()
        if self.cluster.is_cloud:
            self.meta["cluster_name"] = self.cluster.be_cluster
        self.init_disk(self.cluster.be_disks)

    def get_add_init_config(self):
        cfg = super().get_add_init_config()
        if self.cluster.be_config:
            cfg += self.cluster.be_config
        if self.cluster.is_cloud:
            cfg += [
                'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]',
                'enable_file_cache = true',
                'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]'
                .format(self.docker_home_dir()),
                "deploy_mode = cloud",
            ]

            if self.cluster.be_metaservice_endpoint:
                cfg += [
                    "meta_service_endpoint = {}".format(
                        self.cluster.get_meta_server_addr()),
                ]
            if self.cluster.be_cluster_id:
                cfg += [
                    "cluster_id = " + CLUSTER_ID,
                ]
            if not self.cluster.sql_mode_node_mgr:
                cfg += [
                    "cloud_unique_id = " + self.cloud_unique_id(),
                ]
        return cfg

    def init_disk(self, be_disks):
        path = self.get_path()
        dirs = []
        dir_descs = []
        index = 0
        for disks in be_disks:
            parts = disks.split(",")
            if len(parts) != 1 and len(parts) != 2:
                raise Exception("be disks has error: {}".format(disks))
            type_and_num = parts[0].split("=")
            if len(type_and_num) != 2:
                raise Exception("be disks has error: {}".format(disks))
            tp = type_and_num[0].strip().upper()
            if tp != "HDD" and tp != "SSD":
                raise Exception(
                    "error be disk type: '{}', should be 'HDD' or 'SSD'".
                    format(tp))
            num = int(type_and_num[1].strip())
            capactity = int(parts[1].strip()) if len(parts) >= 2 else -1
            capactity_desc = "_{}gb".format(capactity) if capactity > 0 else ""

            for i in range(num):
                index += 1
                dir_name = "{}{}.{}".format(index, capactity_desc, tp)
                dirs.append("{}/storage/{}".format(path, dir_name))
                dir_descs.append("${{DORIS_HOME}}/storage/{}{}".format(
                    dir_name,
                    ",capacity:" + str(capactity) if capactity > 0 else ""))

        for dir in dirs:
            os.makedirs(dir, exist_ok=True)

        os.makedirs(path + "/storage/file_cache", exist_ok=True)

        with open("{}/conf/{}".format(path, self.conf_file_name()), "a") as f:
            storage_root_path = ";".join(dir_descs) if dir_descs else '""'
            f.write("\nstorage_root_path = {}\n".format(storage_root_path))

    def start_script(self):
        return ["init_be.sh"]

    def docker_env(self):
        envs = super().docker_env()
        envs["MY_HEARTBEAT_PORT"] = self.meta["ports"][
            "heartbeat_service_port"]
        if self.cluster.is_cloud:
            envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
            envs["REG_BE_TO_MS"] = 1 if self.cluster.reg_be else 0
            envs["CLUSTER_NAME"] = self.meta["cluster_name"]
        return envs

    def get_default_named_ports(self):
        return {
            "be_port": BE_PORT,
            "webserver_port": BE_WEBSVR_PORT,
            "heartbeat_service_port": BE_HEARTBEAT_PORT,
            "brpc_port": BE_BRPC_PORT,
            "arrow_flight_sql_port": BE_ARROW_FLIGHT_SQL_PORT,
        }

    def cloud_unique_id(self):
        return "compute_node_{}".format(self.id)

    def docker_home_dir(self):
        return os.path.join(DOCKER_DORIS_PATH, "be")

    def node_type(self):
        return Node.TYPE_BE

    def expose_sub_dirs(self):
        return super().expose_sub_dirs() + ["storage"]


class CLOUD(Node):

    def get_add_init_config(self):
        cfg = super().get_add_init_config()
        cfg.append("fdb_cluster = " + self.cluster.get_fdb_cluster())
        return cfg

    def docker_home_dir(self):
        return os.path.join(DOCKER_DORIS_PATH, "cloud")

    def get_default_named_ports(self):
        return {
            "brpc_listen_port": MS_PORT,
        }

    def conf_file_name(self):
        for file in os.listdir(os.path.join(self.get_path(), "conf")):
            if file == "doris_cloud.conf" or file == "selectdb_cloud.conf":
                return file
        return "Not found conf file for ms or recycler"


class MS(CLOUD):

    def get_add_init_config(self):
        cfg = super().get_add_init_config()
        if self.cluster.ms_config:
            cfg += self.cluster.ms_config
        return cfg

    def start_script(self):
        return ["init_cloud.sh", "--meta-service"]

    def node_type(self):
        return Node.TYPE_MS

    def docker_env(self):
        envs = super().docker_env()
        for key, value in self.cluster.cloud_store_config.items():
            envs[key] = value
        return envs


class RECYCLE(CLOUD):

    def get_add_init_config(self):
        cfg = super().get_add_init_config()
        if self.cluster.recycle_config:
            cfg += self.cluster.recycle_config
        return cfg

    def start_script(self):
        return ["init_cloud.sh", "--recycler"]

    def node_type(self):
        return Node.TYPE_RECYCLE


class FDB(Node):

    def get_add_init_config(self):
        return []

    def copy_conf_to_local(self, local_conf_dir):
        os.makedirs(local_conf_dir, exist_ok=True)
        with open(os.path.join(LOCAL_RESOURCE_PATH, "fdb.conf"),
                  "r") as read_file:
            with open(os.path.join(local_conf_dir, self.conf_file_name()),
                      "w") as f:
                publish_addr = "{}:{}".format(self.get_ip(),
                                              self.meta["ports"]["fdb_port"])
                f.write(read_file.read().replace("${PUBLISH-ADDRESS}",
                                                 publish_addr))

        with open(os.path.join(local_conf_dir, "fdb.cluster"), "w") as f:
            f.write(self.cluster.get_fdb_cluster())

    def start_script(self):
        return ["init_fdb.sh"]

    def docker_home_dir(self):
        return os.path.join(DOCKER_DORIS_PATH, "fdb")

    def get_default_named_ports(self):
        return {"fdb_port": FDB_PORT}

    def node_type(self):
        return Node.TYPE_FDB

    def expose_sub_dirs(self):
        return super().expose_sub_dirs() + ["data"]


class Cluster(object):

    def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
                 be_config, ms_config, recycle_config, remote_master_fe,
                 local_network_ip, fe_follower, be_disks, be_cluster, reg_be,
                 coverage_dir, cloud_store_config, sql_mode_node_mgr,
                 be_metaservice_endpoint, be_cluster_id):
        self.name = name
        self.subnet = subnet
        self.image = image
        self.is_cloud = is_cloud
        self.is_root_user = is_root_user
        self.fe_config = fe_config
        self.be_config = be_config
        self.ms_config = ms_config
        self.recycle_config = recycle_config
        self.remote_master_fe = remote_master_fe
        self.local_network_ip = local_network_ip
        self.fe_follower = fe_follower
        self.be_disks = be_disks
        self.be_cluster = be_cluster
        self.reg_be = reg_be
        self.coverage_dir = coverage_dir
        self.cloud_store_config = cloud_store_config
        self.groups = {
            node_type: Group(node_type)
            for node_type in Node.TYPE_ALL
        }
        if self.remote_master_fe:
            # preserve fe id = 1 for the remote master:fe
            self.groups[Node.TYPE_FE].next_id = 2
        self.sql_mode_node_mgr = sql_mode_node_mgr
        self.be_metaservice_endpoint = be_metaservice_endpoint
        self.be_cluster_id = be_cluster_id

    @staticmethod
    def new(name, image, is_cloud, is_root_user, fe_config, be_config,
            ms_config, recycle_config, remote_master_fe, local_network_ip,
            fe_follower, be_disks, be_cluster, reg_be, coverage_dir,
            cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint,
            be_cluster_id):
        if not os.path.exists(LOCAL_DORIS_PATH):
            os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
            os.chmod(LOCAL_DORIS_PATH, 0o777)
        lock_file = os.path.join(LOCAL_DORIS_PATH, "lock")
        with filelock.FileLock(lock_file):
            if os.getuid() == utils.get_path_uid(lock_file):
                os.chmod(lock_file, 0o666)
            subnet = gen_subnet_prefix16() if not remote_master_fe else ""
            cluster = Cluster(name, subnet, image, is_cloud, is_root_user,
                              fe_config, be_config, ms_config, recycle_config,
                              remote_master_fe, local_network_ip, fe_follower,
                              be_disks, be_cluster, reg_be, coverage_dir,
                              cloud_store_config, sql_mode_node_mgr,
                              be_metaservice_endpoint, be_cluster_id)
            os.makedirs(cluster.get_path(), exist_ok=True)
            os.makedirs(get_status_path(name), exist_ok=True)
            cluster._save_meta()
            return cluster

    @staticmethod
    def load(name):
        if not name:
            raise Exception("Failed to load cluster, name is empty")
        path = get_cluster_path(name)
        if not os.path.exists(path):
            raise Exception(
                "Failed to load cluster, its directory {} not exists.".format(
                    path))
        meta_path = Cluster._get_meta_file(name)
        if not os.path.exists(meta_path):
            raise Exception(
                "Failed to load cluster, its meta file {} not exists.".format(
                    meta_path))
        with open(meta_path, "r") as f:
            cluster = jsonpickle.loads(f.read())
            for group in cluster.groups.values():
                group.on_loaded()
            return cluster

    @staticmethod
    def _get_meta_file(name):
        return os.path.join(get_cluster_path(name), "meta")

    def is_host_network(self):
        return getattr(self, "remote_master_fe", "")

    def get_remote_fe_node(self):
        if not self.is_host_network():
            return None
        meta = {
            "image": "",
            "is_remote": True,
            "ports": {
                "query_port": int(self.remote_master_fe.split(":")[1])
            },
        }
        return Node.new(self, Node.TYPE_FE, 1, meta)

    def get_image(self):
        return self.image

    # cluster's nodes will update image too if cluster update.
    def set_image(self, image):
        self.image = image
        for node_type, group in self.groups.items():
            if node_type == Node.TYPE_FDB:
                continue
            for _, node_meta in group.nodes.items():
                node_meta["image"] = image

    def get_path(self):
        return get_cluster_path(self.name)

    def get_group(self, node_type):
        group = self.groups.get(node_type, None)
        if not group:
            raise Exception("Unknown node_type: {}".format(node_type))
        return group

    def get_all_node_net_infos(self):
        return [
            NodeNetInfo(node.node_type(), node.id, node.get_ip(),
                        node.meta["ports"])
            for node in self.get_all_nodes(None, True)
        ]

    def get_node(self, node_type, id, include_remote=False):
        group = self.get_group(node_type)
        meta = group.get_node(id)
        if not meta:
            if include_remote and node_type == Node.TYPE_FE:
                node = self.get_remote_fe_node()
                if node and node.id == id:
                    return node
            raise Exception("No found {} with id {}".format(node_type, id))
        return Node.new(self, node_type, id, meta)

    def get_all_nodes(self, node_type=None, include_remote=False):
        if node_type is None:
            nodes = []
            for nt, group in self.groups.items():
                for id, meta in group.get_all_nodes().items():
                    nodes.append(Node.new(self, nt, id, meta))
            if include_remote:
                node = self.get_remote_fe_node()
                if node:
                    nodes.append(node)
            return nodes

        group = self.groups.get(node_type, None)
        if not group:
            raise Exception("Unknown node_type: {}".format(node_type))
        nodes = [
            Node.new(self, node_type, id, meta)
            for id, meta in group.get_all_nodes().items()
        ]
        if include_remote:
            node = self.get_remote_fe_node()
            if node:
                nodes.append(node)
        return nodes

    def get_all_nodes_num(self):
        num = 0
        for group in self.groups.values():
            num += group.get_node_num()
        return num

    def add(self, node_type, id=None):
        node_meta = {}
        node_meta["image"] = self.image
        id = self.get_group(node_type).add(id, node_meta)
        node = self.get_node(node_type, id)
        if not os.path.exists(node.get_path()):
            node.init()

        return node

    def get_fdb_cluster(self):
        fdb = self.get_node(Node.TYPE_FDB, 1)
        return "123456:123456@{}:{}".format(fdb.get_ip(),
                                            fdb.meta["ports"]["fdb_port"])

    def get_meta_server_addr(self):
        meta_server = self.get_node(Node.TYPE_MS, 1)
        return "{}:{}".format(meta_server.get_ip(),
                              meta_server.meta["ports"]["brpc_listen_port"])

    def get_recycle_addr(self):
        recycler = self.get_node(Node.TYPE_RECYCLE, 1)
        return "{}:{}".format(recycler.get_ip(),
                              recycler.meta["ports"]["brpc_listen_port"])

    def remove(self, node_type, id):
        group = self.get_group(node_type)
        group.remove(id)

    def save(self):
        self._save_meta()
        self._save_compose()

    def _save_meta(self):
        with open(Cluster._get_meta_file(self.name), "w") as f:
            f.write(jsonpickle.dumps(self, indent=2))

    def _save_compose(self):
        services = {}
        for node_type in self.groups.keys():
            for node in self.get_all_nodes(node_type):
                services[node.service_name()] = node.compose()
        compose = {
            "version": "3",
            "services": services,
        }
        if not self.is_host_network():
            compose["networks"] = {
                utils.with_doris_prefix(self.name): {
                    "driver": "bridge",
                    "ipam": {
                        "config": [{
                            "subnet": "{}.0.0/16".format(self.subnet),
                        }]
                    },
                },
            }

        utils.write_compose_file(self.get_compose_file(), compose)

    def get_compose_file(self):
        global get_compose_file
        return get_compose_file(self.name)
