in mysqloperator/controller/innodbcluster/cluster_controller.py [0:0]
def create_cluster(self, seed_pod: MySQLPod, logger: Logger) -> None:
logger.info("Creating cluster at %s" % seed_pod.name)
assume_gtid_set_complete = False
initial_data_source = "blank"
if self.cluster.parsed_spec.initDB:
# TODO store version
# TODO store last known quorum
if self.cluster.parsed_spec.initDB.clone:
initial_data_source = f"clone={self.cluster.parsed_spec.initDB.clone.uri}"
elif self.cluster.parsed_spec.initDB.dump and seed_pod.index == 0: # A : Should we check for index?
if self.cluster.parsed_spec.initDB.dump.storage.ociObjectStorage:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.ociObjectStorage.bucketName}"
elif self.cluster.parsed_spec.initDB.dump.storage.s3:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.s3.bucketName}"
elif self.cluster.parsed_spec.initDB.dump.storage.azure:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.azure.containerName}",
elif self.cluster.parsed_spec.initDB.dump.storage.persistentVolumeClaim:
initial_data_source = f"dump={self.cluster.parsed_spec.initDB.dump.storage.persistentVolumeClaim}"
else:
assert 0, "Unknown Dump storage mechanism"
elif self.cluster.parsed_spec.initDB.meb:
initial_data_source = "meb"
elif self.cluster.parsed_spec.initDB.cluster_set:
initial_data_source = f"clusterSet={self.cluster.parsed_spec.initDB.cluster_set.uri}"
else:
print(f"{self.cluster.parsed_spec.initDB=} -> {self.cluster.parsed_spec.initDB.cluster_set=}")
assert 0, "Unknown initDB source"
else:
# We're creating the cluster from scratch, so GTID set is sure to be complete
assume_gtid_set_complete = True
# The operator manages GR, so turn off start_on_boot to avoid conflicts
create_options = {
"gtidSetIsComplete": assume_gtid_set_complete,
"manualStartOnBoot": True,
"memberSslMode": "REQUIRED" if self.cluster.parsed_spec.tlsUseSelfSigned else "VERIFY_IDENTITY",
}
if not self.cluster.parsed_spec.tlsUseSelfSigned:
logger.info("Using TLS GR authentication")
rdns = seed_pod.get_cluster().get_tls_issuer_and_subject_rdns()
create_options["memberAuthType"] = "CERT_SUBJECT"
create_options["certIssuer"] = rdns["issuer"]
create_options["certSubject"] = rdns["subject"]
else:
logger.info("Using PASSWORD GR authentication")
create_options.update(common_gr_options)
cluster_info = {
"initialDataSource" : initial_data_source,
"createOptions" : create_options,
}
self.cluster.update_cluster_info(cluster_info)
def should_retry(err):
if seed_pod.deleting:
return False
return True
with DbaWrap(shellutils.connect_dba(seed_pod.endpoint_co, logger, is_retriable=should_retry)) as dba:
try:
self.dba_cluster = dba.get_cluster()
# maybe from a previous incomplete create attempt
logger.info("Cluster already exists")
except:
self.dba_cluster = None
seed_pod.add_member_finalizer()
if not self.dba_cluster:
self.log_mysql_info(seed_pod, dba.session, logger)
logger.info(f"CREATE CLUSTER: seed={seed_pod.name}, options={create_options}")
try:
self.dba_cluster = dba.create_cluster(
self.dba_cluster_name, create_options)
logger.info("create_cluster OK")
except mysqlsh.Error as e:
# If creating the cluster failed, remove the membership finalizer
seed_pod.remove_member_finalizer()
# can happen when retrying
if e.code == errors.SHERR_DBA_BADARG_INSTANCE_ALREADY_IN_GR:
logger.info(
f"GR already running at {seed_pod.endpoint}, stopping before retrying...")
try:
dba.session.run_sql("STOP GROUP_REPLICATION")
except mysqlsh.Error as e:
logger.info(f"Could not stop GR plugin: {e}")
# throw a temporary error for a full retry later
raise kopf.TemporaryError(
"GR already running while creating cluster but could not stop it", delay=3)
raise
# Only on first cluster! Not if this is supposed to join some other
if not self.cluster.parsed_spec.initDB or not self.cluster.parsed_spec.initDB.cluster_set:
try:
_ = self.dba_cluster.get_cluster_set()
# maybe from a previous incomplete create attempt
logger.info("ClusterSet already exists")
except:
# TODO: what to do if this fails?
self.dba_cluster_set = self.dba_cluster.create_cluster_set(self.dba_cluster_name)
routing_options = self.cluster.parsed_spec.router.routingOptions
for routing_option in routing_options:
try:
routing_value = routing_options[routing_option]
logger.info(f"Setting Router Option [{routing_option}] to [{routing_value}]")
self.dba_cluster_set.set_routing_option(routing_option, routing_value)
except mysqlsh.Error as e:
# We don't fail when setting an option fails
logger.warning(f"Failed setting routing option {routing_option} to {routing_value}: {e}")
self.probe_member_status(seed_pod, dba.session, True, logger)
logger.debug("Cluster created %s" % self.dba_cluster.status())
# if there's just 1 pod, then the cluster is ready... otherwise, we
# need to wait until all pods have joined
if self.cluster.parsed_spec.instances == 1:
self.post_create_actions(dba.session, self.dba_cluster, logger)
self.probe_member_status(seed_pod, dba.session, True, logger)