docker/runtime/doris-compose/cluster.py (806 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import configparser
import filelock
import getpass
import hashlib
import jsonpickle
import os
import os.path
import utils
import time
DOCKER_DORIS_PATH = "/opt/apache-doris"
LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")
# an integer between 128 and 191, generally no need to set
DORIS_SUBNET_START = os.getenv("DORIS_SUBNET_START")
LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"resource")
DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
CLOUD_CFG_FILE = os.getenv("DORIS_CLOUD_CFG_FILE",
os.path.join(LOCAL_DORIS_PATH, 'cloud.ini'))
FE_HTTP_PORT = 8030
FE_RPC_PORT = 9020
FE_QUERY_PORT = 9030
FE_EDITLOG_PORT = 9010
FE_JAVA_DBG_PORT = 5005
FE_ARROW_FLIGHT_SQL_PORT = 8070
BE_PORT = 9060
BE_WEBSVR_PORT = 8040
BE_HEARTBEAT_PORT = 9050
BE_BRPC_PORT = 8060
BE_ARROW_FLIGHT_SQL_PORT = 8050
FDB_PORT = 4500
MS_PORT = 5000
ID_LIMIT = 10000
IP_PART4_SIZE = 200
CLUSTER_ID = "12345678"
LOG = utils.get_logger()
def get_cluster_path(cluster_name):
return os.path.join(LOCAL_DORIS_PATH, cluster_name)
def get_node_name(node_type, id):
return "{}-{}".format(node_type, id)
def get_node_path(cluster_name, node_type, id):
return os.path.join(get_cluster_path(cluster_name),
get_node_name(node_type, id))
def get_compose_file(cluster_name):
return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")
def get_status_path(cluster_name):
return os.path.join(get_cluster_path(cluster_name), "status")
def get_master_fe_addr_path(cluster_name):
return get_status_path(cluster_name) + "/master_fe_query_addr"
def get_all_cluster_names():
if not os.path.exists(LOCAL_DORIS_PATH):
return []
else:
return [
subdir for subdir in os.listdir(LOCAL_DORIS_PATH)
if os.path.isdir(os.path.join(LOCAL_DORIS_PATH, subdir))
]
def gen_subnet_prefix16():
used_subnet = utils.get_docker_subnets_prefix16()
for cluster_name in get_all_cluster_names():
try:
cluster = Cluster.load(cluster_name)
used_subnet[cluster.subnet] = True
except:
pass
subnet_begin = 128
subnet_end = 192
subnet_part_1 = None
subnet_part_2 = None
if DORIS_SUBNET_START:
subnet_part_1 = int(DORIS_SUBNET_START)
subnet_part_2 = 0
else:
m = hashlib.md5()
m.update(getpass.getuser().encode("utf-8"))
hash_val = int(m.hexdigest(), 16)
# want subnet part ii to be a small num, just less than 100, so don't use 256 here.
small_width = 100
slot_num = (subnet_end - subnet_begin) * small_width
idx = hash_val % slot_num
if idx < 0:
idx += slot_num
subnet_part_1 = subnet_begin + int(idx / small_width)
subnet_part_2 = idx % small_width
intervals = [
[(subnet_part_1, subnet_part_1 + 1), (subnet_part_2, 256)],
[(subnet_part_1 + 1, subnet_end), (0, 256)],
[(subnet_begin, subnet_part_1), (0, 256)],
[(subnet_part_1, subnet_part_1 + 1), (0, subnet_part_2)],
]
for interval in intervals:
for i in range(interval[0][0], interval[0][1]):
for j in range(interval[1][0], interval[1][1]):
subnet = "{}.{}".format(i, j)
if not used_subnet.get(subnet, False):
return subnet
raise Exception("Failed to gen subnet")
def get_master_fe_endpoint(cluster_name, wait_master_fe_query_addr_file=False):
cluster_path = get_cluster_path(cluster_name)
if os.path.exists(cluster_path):
master_fe_query_addr_file = get_master_fe_addr_path(cluster_name)
max_retries = 10 if wait_master_fe_query_addr_file else 0
i = 0
while True:
if os.path.exists(master_fe_query_addr_file):
with open(master_fe_query_addr_file, "r") as f:
return f.read().strip()
i += 1
if i < max_retries:
time.sleep(1)
else:
break
try:
cluster = Cluster.load(cluster_name)
LOG.info("master file not exist, master ip get from node 1")
if cluster.is_host_network():
return cluster.remote_master_fe
else:
master_fe = cluster.get_node(Node.TYPE_FE, 1)
return "{}:{}".format(master_fe.get_ip(),
master_fe.meta["ports"]["query_port"])
except:
return ""
def get_node_seq(node_type, id):
seq = id
seq += IP_PART4_SIZE
if node_type == Node.TYPE_FE:
seq += 0 * ID_LIMIT
elif node_type == Node.TYPE_BE:
seq += 1 * ID_LIMIT
elif node_type == Node.TYPE_MS:
seq += 2 * ID_LIMIT
elif node_type == Node.TYPE_RECYCLE:
seq += 3 * ID_LIMIT
elif node_type == Node.TYPE_FDB:
seq += 4 * ID_LIMIT
else:
seq += 5 * ID_LIMIT
return seq
class Group(object):
def __init__(self, node_type):
self.node_type = node_type
self.nodes = {} # id : NodeMeta
self.next_id = 1
def add(self, id, node_meta):
assert node_meta["image"]
if not id:
id = self.next_id
self.next_id += 1
if self.get_node(id):
raise Exception(
"Failed to add {} with id {}, id has exists".format(
self.node_type, id))
if id > ID_LIMIT:
raise Exception(
"Failed to add {} with id {}, id exceeds {}".format(
self.node_type, id, ID_LIMIT))
self.nodes[id] = node_meta
return id
def remove(self, id):
self.nodes.pop(id, None)
def get_node_num(self):
return len(self.nodes)
def get_all_nodes(self):
return self.nodes
def get_node(self, id):
return self.nodes.get(id, None)
def on_loaded(self):
nodes = {}
for id, node in self.nodes.items():
nodes[int(id)] = node
self.nodes = nodes
class NodeNetInfo(object):
def __init__(self, type, id, ip, ports):
self.type = type
self.id = id
self.ip = ip
self.ports = ports
class Node(object):
TYPE_FE = "fe"
TYPE_BE = "be"
TYPE_MS = "ms"
TYPE_RECYCLE = "recycle"
TYPE_FDB = "fdb"
TYPE_ALL = [TYPE_FE, TYPE_BE, TYPE_MS, TYPE_RECYCLE, TYPE_FDB]
def __init__(self, cluster, id, meta):
self.cluster = cluster
self.id = id
self.meta = meta
@staticmethod
def new(cluster, node_type, id, meta):
if node_type == Node.TYPE_FE:
return FE(cluster, id, meta)
elif node_type == Node.TYPE_BE:
return BE(cluster, id, meta)
elif node_type == Node.TYPE_MS:
return MS(cluster, id, meta)
elif node_type == Node.TYPE_RECYCLE:
return RECYCLE(cluster, id, meta)
elif node_type == Node.TYPE_FDB:
return FDB(cluster, id, meta)
else:
raise Exception("Unknown node type {}".format(node_type))
# only run once at create the node, later restart or upgrade image will not run
def init(self):
self.init_ports()
self.init_conf()
def init_ports(self):
if self.cluster.is_host_network():
self.meta["ports"] = dict(
(port_name, utils.get_avail_port())
for port_name in self.get_default_named_ports().keys())
else:
self.meta["ports"] = self.get_default_named_ports()
def init_conf(self):
path = self.get_path()
os.makedirs(path, exist_ok=True)
# copy config to local
conf_dir = os.path.join(path, "conf")
if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
self.copy_conf_to_local(conf_dir)
assert not utils.is_dir_empty(conf_dir), "conf directory {} is empty, " \
"check doris path in image is correct".format(conf_dir)
utils.enable_dir_with_rw_perm(conf_dir)
config = self.get_add_init_config()
if config:
with open(os.path.join(conf_dir, self.conf_file_name()),
"a") as f:
f.write("\n")
f.write("#### start doris-compose add config ####\n\n")
for item in config:
f.write(item + "\n")
f.write("\n#### end doris-compose add config ####\n")
for sub_dir in self.expose_sub_dirs():
os.makedirs(os.path.join(path, sub_dir), exist_ok=True)
def copy_conf_to_local(self, local_conf_dir):
utils.copy_image_directory(self.get_image(),
"{}/conf".format(self.docker_home_dir()),
local_conf_dir)
def is_fe(self):
return self.node_type() == Node.TYPE_FE
def is_be(self):
return self.node_type() == Node.TYPE_BE
def conf_file_name(self):
return self.node_type() + ".conf"
def node_type(self):
raise Exception("No implemented")
def expose_sub_dirs(self):
return ["conf", "log"]
def get_name(self):
return get_node_name(self.node_type(), self.id)
def get_path(self):
return get_node_path(self.cluster.name, self.node_type(), self.id)
def get_image(self):
return self.meta["image"]
def set_image(self, image):
self.meta["image"] = image
def get_ip(self):
if self.cluster.is_host_network():
# this is a remote node
if self.meta.get("is_remote", False):
return self.cluster.remote_master_fe.split(":")[0]
else:
return self.cluster.local_network_ip
else:
seq = get_node_seq(self.node_type(), self.id)
return "{}.{}.{}".format(self.cluster.subnet,
int(seq / IP_PART4_SIZE),
seq % IP_PART4_SIZE)
def get_default_named_ports(self):
# port_name : default_port
# the port_name come from fe.conf, be.conf, cloud.conf, etc
return {}
@staticmethod
def get_id_from_ip(ip):
pos2 = ip.rfind(".")
pos1 = ip.rfind(".", 0, pos2 - 1)
num3 = int(ip[pos1 + 1:pos2])
num4 = int(ip[pos2 + 1:])
seq = num3 * IP_PART4_SIZE + num4
while seq > ID_LIMIT:
seq -= ID_LIMIT
seq -= IP_PART4_SIZE
return seq
def service_name(self):
return utils.with_doris_prefix("{}-{}".format(self.cluster.name,
self.get_name()))
def docker_env(self):
enable_coverage = self.cluster.coverage_dir
envs = {
"MY_IP": self.get_ip(),
"MY_ID": self.id,
"MY_TYPE": self.node_type(),
"DORIS_HOME": os.path.join(self.docker_home_dir()),
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
}
if self.cluster.is_cloud:
envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr()
# run as host user
if not self.cluster.is_root_user:
envs["HOST_USER"] = getpass.getuser()
envs["HOST_UID"] = os.getuid()
envs["HOST_GID"] = os.getgid()
if enable_coverage:
outfile = "{}/coverage/{}-coverage-{}-{}".format(
DOCKER_DORIS_PATH, self.node_type(), self.cluster.name,
self.id)
if self.node_type() == Node.TYPE_FE:
envs["JACOCO_COVERAGE_OPT"] = "-javaagent:/jacoco/lib/jacocoagent.jar" \
"=excludes=org.apache.doris.thrift:org.apache.doris.proto:org.apache.parquet.format" \
":com.aliyun*:com.amazonaws*:org.apache.hadoop.hive.metastore:org.apache.parquet.format," \
"output=file,append=true,destfile=" + outfile
elif self.node_type() == Node.TYPE_BE:
envs["LLVM_PROFILE_FILE_PREFIX"] = outfile
return envs
def entrypoint(self):
if self.start_script():
return [
"bash",
os.path.join(DOCKER_RESOURCE_PATH, "entrypoint.sh")
] + self.start_script()
else:
return None
def get_add_init_config(self):
cfg = []
if self.cluster.is_host_network():
cfg.append(f"priority_networks = {self.cluster.local_network_ip}")
cfg += [
f"{port_name} = {port}"
for port_name, port in self.meta["ports"].items()
]
return cfg
def docker_ports(self):
return list(self.get_default_named_ports().values())
def docker_home_dir(self):
raise Exception("No implemented")
def compose(self):
volumes = [
"{}:{}/{}".format(os.path.join(self.get_path(), sub_dir),
self.docker_home_dir(), sub_dir)
for sub_dir in self.expose_sub_dirs()
] + [
"{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH),
"{}:{}/status".format(get_status_path(self.cluster.name),
self.docker_home_dir()),
] + [
"{0}:{0}:ro".format(path)
for path in ("/etc/localtime", "/etc/timezone",
"/usr/share/zoneinfo") if os.path.exists(path)
]
if self.cluster.coverage_dir:
volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir,
DOCKER_DORIS_PATH))
content = {
"cap_add": ["SYS_PTRACE"],
"container_name": self.service_name(),
"environment": self.docker_env(),
"image": self.get_image(),
"ulimits": {
"core": -1
},
"security_opt": ["seccomp:unconfined"],
"volumes": volumes,
}
if self.cluster.is_host_network():
content["network_mode"] = "host"
else:
content["hostname"] = self.get_name()
content["networks"] = {
utils.with_doris_prefix(self.cluster.name): {
"ipv4_address": self.get_ip(),
}
}
content["extra_hosts"] = [
"{}:{}".format(node.get_name(), node.get_ip())
for node in self.cluster.get_all_nodes()
]
content["ports"] = self.docker_ports()
if self.entrypoint():
content["entrypoint"] = self.entrypoint()
return content
class FE(Node):
def init(self):
# for cloud mode, fe is follower or observer, default is observer
self.meta[
"is_cloud_follower"] = self.cluster.is_cloud and self.cluster.fe_follower
super().init()
def get_add_init_config(self):
cfg = super().get_add_init_config()
if self.cluster.fe_config:
cfg += self.cluster.fe_config
if self.cluster.is_cloud:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
"deploy_mode = cloud",
]
if self.cluster.sql_mode_node_mgr:
cfg += [
"cluster_id = " + CLUSTER_ID,
]
else:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
with open("{}/conf/{}".format(self.get_path(), self.conf_file_name()),
"r") as f:
java_debug_port = self.meta["ports"]["java_debug_port"]
parser = configparser.ConfigParser()
parser.read_string('[dummy_section]\n' + f.read())
section = parser['dummy_section']
for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
value = section.get(key)
if value:
value = value.strip().strip('"')
if key == "JAVA_OPTS":
# java 8
cfg.append(
f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address={java_debug_port}\""
)
else:
# JAVA_OPTS_FOR_JDK_17
# >= java 9
cfg.append(
f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{java_debug_port}\""
)
return cfg
def docker_env(self):
envs = super().docker_env()
if self.cluster.is_cloud:
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
if self.meta["is_cloud_follower"]:
envs["is_fe_follower"] = 1
envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"]
envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"]
return envs
def get_default_named_ports(self):
return {
"http_port": FE_HTTP_PORT,
"rpc_port": FE_RPC_PORT,
"query_port": FE_QUERY_PORT,
"edit_log_port": FE_EDITLOG_PORT,
"arrow_flight_sql_port": FE_ARROW_FLIGHT_SQL_PORT,
"java_debug_port": FE_JAVA_DBG_PORT,
}
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)
def start_script(self):
return ["init_fe.sh"]
def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "fe")
def node_type(self):
return Node.TYPE_FE
def expose_sub_dirs(self):
return super().expose_sub_dirs() + ["doris-meta"]
class BE(Node):
def init(self):
super().init()
if self.cluster.is_cloud:
self.meta["cluster_name"] = self.cluster.be_cluster
self.init_disk(self.cluster.be_disks)
def get_add_init_config(self):
cfg = super().get_add_init_config()
if self.cluster.be_config:
cfg += self.cluster.be_config
if self.cluster.is_cloud:
cfg += [
'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]',
'enable_file_cache = true',
'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]'
.format(self.docker_home_dir()),
"deploy_mode = cloud",
]
if self.cluster.be_metaservice_endpoint:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.be_cluster_id:
cfg += [
"cluster_id = " + CLUSTER_ID,
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg
def init_disk(self, be_disks):
path = self.get_path()
dirs = []
dir_descs = []
index = 0
for disks in be_disks:
parts = disks.split(",")
if len(parts) != 1 and len(parts) != 2:
raise Exception("be disks has error: {}".format(disks))
type_and_num = parts[0].split("=")
if len(type_and_num) != 2:
raise Exception("be disks has error: {}".format(disks))
tp = type_and_num[0].strip().upper()
if tp != "HDD" and tp != "SSD":
raise Exception(
"error be disk type: '{}', should be 'HDD' or 'SSD'".
format(tp))
num = int(type_and_num[1].strip())
capactity = int(parts[1].strip()) if len(parts) >= 2 else -1
capactity_desc = "_{}gb".format(capactity) if capactity > 0 else ""
for i in range(num):
index += 1
dir_name = "{}{}.{}".format(index, capactity_desc, tp)
dirs.append("{}/storage/{}".format(path, dir_name))
dir_descs.append("${{DORIS_HOME}}/storage/{}{}".format(
dir_name,
",capacity:" + str(capactity) if capactity > 0 else ""))
for dir in dirs:
os.makedirs(dir, exist_ok=True)
os.makedirs(path + "/storage/file_cache", exist_ok=True)
with open("{}/conf/{}".format(path, self.conf_file_name()), "a") as f:
storage_root_path = ";".join(dir_descs) if dir_descs else '""'
f.write("\nstorage_root_path = {}\n".format(storage_root_path))
def start_script(self):
return ["init_be.sh"]
def docker_env(self):
envs = super().docker_env()
envs["MY_HEARTBEAT_PORT"] = self.meta["ports"][
"heartbeat_service_port"]
if self.cluster.is_cloud:
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
envs["REG_BE_TO_MS"] = 1 if self.cluster.reg_be else 0
envs["CLUSTER_NAME"] = self.meta["cluster_name"]
return envs
def get_default_named_ports(self):
return {
"be_port": BE_PORT,
"webserver_port": BE_WEBSVR_PORT,
"heartbeat_service_port": BE_HEARTBEAT_PORT,
"brpc_port": BE_BRPC_PORT,
"arrow_flight_sql_port": BE_ARROW_FLIGHT_SQL_PORT,
}
def cloud_unique_id(self):
return "compute_node_{}".format(self.id)
def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "be")
def node_type(self):
return Node.TYPE_BE
def expose_sub_dirs(self):
return super().expose_sub_dirs() + ["storage"]
class CLOUD(Node):
def get_add_init_config(self):
cfg = super().get_add_init_config()
cfg.append("fdb_cluster = " + self.cluster.get_fdb_cluster())
return cfg
def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "cloud")
def get_default_named_ports(self):
return {
"brpc_listen_port": MS_PORT,
}
def conf_file_name(self):
for file in os.listdir(os.path.join(self.get_path(), "conf")):
if file == "doris_cloud.conf" or file == "selectdb_cloud.conf":
return file
return "Not found conf file for ms or recycler"
class MS(CLOUD):
def get_add_init_config(self):
cfg = super().get_add_init_config()
if self.cluster.ms_config:
cfg += self.cluster.ms_config
return cfg
def start_script(self):
return ["init_cloud.sh", "--meta-service"]
def node_type(self):
return Node.TYPE_MS
def docker_env(self):
envs = super().docker_env()
for key, value in self.cluster.cloud_store_config.items():
envs[key] = value
return envs
class RECYCLE(CLOUD):
def get_add_init_config(self):
cfg = super().get_add_init_config()
if self.cluster.recycle_config:
cfg += self.cluster.recycle_config
return cfg
def start_script(self):
return ["init_cloud.sh", "--recycler"]
def node_type(self):
return Node.TYPE_RECYCLE
class FDB(Node):
def get_add_init_config(self):
return []
def copy_conf_to_local(self, local_conf_dir):
os.makedirs(local_conf_dir, exist_ok=True)
with open(os.path.join(LOCAL_RESOURCE_PATH, "fdb.conf"),
"r") as read_file:
with open(os.path.join(local_conf_dir, self.conf_file_name()),
"w") as f:
publish_addr = "{}:{}".format(self.get_ip(),
self.meta["ports"]["fdb_port"])
f.write(read_file.read().replace("${PUBLISH-ADDRESS}",
publish_addr))
with open(os.path.join(local_conf_dir, "fdb.cluster"), "w") as f:
f.write(self.cluster.get_fdb_cluster())
def start_script(self):
return ["init_fdb.sh"]
def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "fdb")
def get_default_named_ports(self):
return {"fdb_port": FDB_PORT}
def node_type(self):
return Node.TYPE_FDB
def expose_sub_dirs(self):
return super().expose_sub_dirs() + ["data"]
class Cluster(object):
def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
be_config, ms_config, recycle_config, remote_master_fe,
local_network_ip, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cluster_id):
self.name = name
self.subnet = subnet
self.image = image
self.is_cloud = is_cloud
self.is_root_user = is_root_user
self.fe_config = fe_config
self.be_config = be_config
self.ms_config = ms_config
self.recycle_config = recycle_config
self.remote_master_fe = remote_master_fe
self.local_network_ip = local_network_ip
self.fe_follower = fe_follower
self.be_disks = be_disks
self.be_cluster = be_cluster
self.reg_be = reg_be
self.coverage_dir = coverage_dir
self.cloud_store_config = cloud_store_config
self.groups = {
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
}
if self.remote_master_fe:
# preserve fe id = 1 for the remote master:fe
self.groups[Node.TYPE_FE].next_id = 2
self.sql_mode_node_mgr = sql_mode_node_mgr
self.be_metaservice_endpoint = be_metaservice_endpoint
self.be_cluster_id = be_cluster_id
@staticmethod
def new(name, image, is_cloud, is_root_user, fe_config, be_config,
ms_config, recycle_config, remote_master_fe, local_network_ip,
fe_follower, be_disks, be_cluster, reg_be, coverage_dir,
cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint,
be_cluster_id):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
lock_file = os.path.join(LOCAL_DORIS_PATH, "lock")
with filelock.FileLock(lock_file):
if os.getuid() == utils.get_path_uid(lock_file):
os.chmod(lock_file, 0o666)
subnet = gen_subnet_prefix16() if not remote_master_fe else ""
cluster = Cluster(name, subnet, image, is_cloud, is_root_user,
fe_config, be_config, ms_config, recycle_config,
remote_master_fe, local_network_ip, fe_follower,
be_disks, be_cluster, reg_be, coverage_dir,
cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cluster_id)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
return cluster
@staticmethod
def load(name):
if not name:
raise Exception("Failed to load cluster, name is empty")
path = get_cluster_path(name)
if not os.path.exists(path):
raise Exception(
"Failed to load cluster, its directory {} not exists.".format(
path))
meta_path = Cluster._get_meta_file(name)
if not os.path.exists(meta_path):
raise Exception(
"Failed to load cluster, its meta file {} not exists.".format(
meta_path))
with open(meta_path, "r") as f:
cluster = jsonpickle.loads(f.read())
for group in cluster.groups.values():
group.on_loaded()
return cluster
@staticmethod
def _get_meta_file(name):
return os.path.join(get_cluster_path(name), "meta")
def is_host_network(self):
return getattr(self, "remote_master_fe", "")
def get_remote_fe_node(self):
if not self.is_host_network():
return None
meta = {
"image": "",
"is_remote": True,
"ports": {
"query_port": int(self.remote_master_fe.split(":")[1])
},
}
return Node.new(self, Node.TYPE_FE, 1, meta)
def get_image(self):
return self.image
# cluster's nodes will update image too if cluster update.
def set_image(self, image):
self.image = image
for node_type, group in self.groups.items():
if node_type == Node.TYPE_FDB:
continue
for _, node_meta in group.nodes.items():
node_meta["image"] = image
def get_path(self):
return get_cluster_path(self.name)
def get_group(self, node_type):
group = self.groups.get(node_type, None)
if not group:
raise Exception("Unknown node_type: {}".format(node_type))
return group
def get_all_node_net_infos(self):
return [
NodeNetInfo(node.node_type(), node.id, node.get_ip(),
node.meta["ports"])
for node in self.get_all_nodes(None, True)
]
def get_node(self, node_type, id, include_remote=False):
group = self.get_group(node_type)
meta = group.get_node(id)
if not meta:
if include_remote and node_type == Node.TYPE_FE:
node = self.get_remote_fe_node()
if node and node.id == id:
return node
raise Exception("No found {} with id {}".format(node_type, id))
return Node.new(self, node_type, id, meta)
def get_all_nodes(self, node_type=None, include_remote=False):
if node_type is None:
nodes = []
for nt, group in self.groups.items():
for id, meta in group.get_all_nodes().items():
nodes.append(Node.new(self, nt, id, meta))
if include_remote:
node = self.get_remote_fe_node()
if node:
nodes.append(node)
return nodes
group = self.groups.get(node_type, None)
if not group:
raise Exception("Unknown node_type: {}".format(node_type))
nodes = [
Node.new(self, node_type, id, meta)
for id, meta in group.get_all_nodes().items()
]
if include_remote:
node = self.get_remote_fe_node()
if node:
nodes.append(node)
return nodes
def get_all_nodes_num(self):
num = 0
for group in self.groups.values():
num += group.get_node_num()
return num
def add(self, node_type, id=None):
node_meta = {}
node_meta["image"] = self.image
id = self.get_group(node_type).add(id, node_meta)
node = self.get_node(node_type, id)
if not os.path.exists(node.get_path()):
node.init()
return node
def get_fdb_cluster(self):
fdb = self.get_node(Node.TYPE_FDB, 1)
return "123456:123456@{}:{}".format(fdb.get_ip(),
fdb.meta["ports"]["fdb_port"])
def get_meta_server_addr(self):
meta_server = self.get_node(Node.TYPE_MS, 1)
return "{}:{}".format(meta_server.get_ip(),
meta_server.meta["ports"]["brpc_listen_port"])
def get_recycle_addr(self):
recycler = self.get_node(Node.TYPE_RECYCLE, 1)
return "{}:{}".format(recycler.get_ip(),
recycler.meta["ports"]["brpc_listen_port"])
def remove(self, node_type, id):
group = self.get_group(node_type)
group.remove(id)
def save(self):
self._save_meta()
self._save_compose()
def _save_meta(self):
with open(Cluster._get_meta_file(self.name), "w") as f:
f.write(jsonpickle.dumps(self, indent=2))
def _save_compose(self):
services = {}
for node_type in self.groups.keys():
for node in self.get_all_nodes(node_type):
services[node.service_name()] = node.compose()
compose = {
"version": "3",
"services": services,
}
if not self.is_host_network():
compose["networks"] = {
utils.with_doris_prefix(self.name): {
"driver": "bridge",
"ipam": {
"config": [{
"subnet": "{}.0.0/16".format(self.subnet),
}]
},
},
}
utils.write_compose_file(self.get_compose_file(), compose)
def get_compose_file(self):
global get_compose_file
return get_compose_file(self.name)