in mysqloperator/controller/innodbcluster/operator_cluster.py [0:0]
def on_innodbcluster_create(name: str, namespace: Optional[str], body: Body,
logger: Logger, **kwargs) -> None:
logger.info(
f"Initializing InnoDB Cluster name={name} namespace={namespace} on K8s {k8s_version()}")
cluster = InnoDBCluster(body)
# TODO: If we set the status here it will be emptied for unknown reasons later
# and hide other later set status (i.e. when using an invalid spec.version)
#
#cluster.set_status({
# "cluster": {
# "status": diagnose.ClusterDiagStatus.INITIALIZING.value,
# "onlineInstances": 0,
# "lastProbeTime": utils.isotime()
# }})
try:
cluster.parse_spec()
cluster.parsed_spec.validate(logger)
except ApiSpecError as e:
cluster.set_status({
"cluster": {
"status": diagnose.ClusterDiagStatus.INVALID.value,
"onlineInstances": 0,
"lastProbeTime": utils.isotime()
}})
cluster.error(action="CreateCluster",
reason="InvalidArgument", message=str(e))
raise kopf.TemporaryError(f"Error in InnoDBCluster spec: {e}")
icspec = cluster.parsed_spec
#print(f"Default operator IC edition: {config.MYSQL_OPERATOR_DEFAULT_IC_EDITION} Edition")
cluster.log_cluster_info(logger)
cluster.update_cluster_fqdn()
if not cluster.ready:
try:
print("0. Components ConfigMaps and Secrets")
for cm in cluster_objects.prepare_component_config_configmaps(icspec, logger):
if not cluster.get_configmap(cm['metadata']['name']):
print(f"\tCreating CM {cm['metadata']['name']} ...")
kopf.adopt(cm)
api_core.create_namespaced_config_map(namespace, cm)
for secret in cluster_objects.prepare_component_config_secrets(icspec, logger):
if not cluster.get_secret(secret['metadata']['name']):
print(f"\tCreating Secret {secret['metadata']['name']} ...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace, secret)
print("0.5. Additional ConfigMaps")
for cm in cluster_objects.prepare_additional_configmaps(icspec, logger):
if not cluster.get_configmap(cm['metadata']['name']):
print(f"\tCreating CM {cm['metadata']['name']} ...")
kopf.adopt(cm)
api_core.create_namespaced_config_map(namespace, cm)
print("1. Initial Configuration ConfigMap and Container Probes")
if not ignore_404(lambda: cluster.get_initconf(icspec)):
print("\tPreparing...")
configs = cluster_objects.prepare_initconf(cluster, icspec, logger)
print("\tCreating...")
kopf.adopt(configs)
api_core.create_namespaced_config_map(namespace, configs)
print("2. Cluster Accounts")
if not ignore_404(cluster.get_private_secrets):
print("\tPreparing...")
secret = cluster_objects.prepare_secrets(icspec)
print("\tCreating...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace=namespace, body=secret)
print("3. Router Accounts")
if not ignore_404(cluster.get_router_account):
print("\tPreparing...")
secret = router_objects.prepare_router_secrets(icspec)
print("\tCreating...")
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace=namespace, body=secret)
print("4. Cluster Service")
if not ignore_404(cluster.get_service):
print("\tPreparing...")
service = cluster_objects.prepare_cluster_service(icspec, logger)
print(f"\tCreating Service {service['metadata']['name']}...{service}")
kopf.adopt(service)
api_core.create_namespaced_service(namespace=namespace, body=service)
print("5. Sidecar ServiceAccount")
existing_sa = ignore_404(lambda: cluster.get_service_account_sidecar(icspec))
print(f"\tExisting SA: {existing_sa}")
print(f"\tImagePullSecrets: {icspec.imagePullSecrets}")
if not existing_sa:
print("\tPreparing...")
sa = cluster_objects.prepare_service_account_sidecar(icspec)
print(f"\tCreating...{sa}")
kopf.adopt(sa)
api_core.create_namespaced_service_account(namespace=namespace, body=sa)
elif icspec.imagePullSecrets:
patch = cluster_objects.prepare_service_account_patch_for_image_pull_secrets(icspec)
print(f"\tPatching existing SA with {patch}")
api_core.patch_namespaced_service_account(name=existing_sa.metadata.name, namespace=namespace, body=patch)
print("6. Switchover ServiceAccount")
if not ignore_404(lambda: cluster.get_service_account_switchover(icspec)):
print("\tPreparing...")
sa = cluster_objects.prepare_service_account_switchover(icspec)
print(f"\tCreating...{sa}")
kopf.adopt(sa)
api_core.create_namespaced_service_account(namespace=namespace, body=sa)
print("7. Sidecar RoleBinding ")
if not ignore_404(lambda: cluster.get_role_binding_sidecar(icspec)):
print("\tPreparing...")
rb = cluster_objects.prepare_role_binding_sidecar(icspec)
print(f"\tCreating RoleBinding {rb['metadata']['name']} ...")
kopf.adopt(rb)
api_rbac.create_namespaced_role_binding(namespace=namespace, body=rb)
print("8. Switchover RoleBinding")
if not ignore_404(lambda: cluster.get_role_binding_switchover(icspec)):
print("\tPreparing...")
rb = cluster_objects.prepare_role_binding_switchover(icspec)
print(f"\tCreating RoleBinding {rb['metadata']['name']} ...")
kopf.adopt(rb)
api_rbac.create_namespaced_role_binding(namespace=namespace, body=rb)
print("9. Cluster StatefulSet")
if not ignore_404(cluster.get_stateful_set):
print("\tPreparing...")
statefulset = cluster_objects.prepare_cluster_stateful_set(cluster, icspec, logger)
print(f"\tCreating...{statefulset}")
kopf.adopt(statefulset)
api_apps.create_namespaced_stateful_set(namespace=namespace, body=statefulset)
print("10. Cluster PodDisruptionBudget")
if not ignore_404(cluster.get_disruption_budget):
print("\tPreparing...")
disruption_budget = cluster_objects.prepare_cluster_pod_disruption_budget(icspec)
print("\tCreating...")
kopf.adopt(disruption_budget)
api_policy.create_namespaced_pod_disruption_budget(namespace=namespace, body=disruption_budget)
print("11. Read Replica StatefulSets")
if len(icspec.readReplicas) > 0:
print(f"\t{len(icspec.readReplicas)} Read Replica STS ...")
for rr in icspec.readReplicas:
do_create_read_replica(cluster, rr, True, "\t\t", logger)
else:
print("\tNo Read Replica")
print("12. Router Service")
if not ignore_404(cluster.get_router_service):
print("\tPreparing...")
router_service = router_objects.prepare_router_service(icspec)
print("\tCreating...")
kopf.adopt(router_service)
api_core.create_namespaced_service(namespace=namespace, body=router_service)
print("13. Router Deployment")
if not ignore_404(cluster.get_router_deployment):
if icspec.router.instances > 0:
print("\tPreparing...")
# This will create the deployment but 0 instances. When the cluster is created (first
# instance joins it) the instance count will be set to icspec.router.instances
router_deployment = router_objects.prepare_router_deployment(cluster, logger, init_only=True)
print(f"\tCreating...{router_deployment}")
kopf.adopt(router_deployment)
api_apps.create_namespaced_deployment(namespace=namespace, body=router_deployment)
else:
# If the user decides to set !0 routers, the routine that handles that that
# will create the deployment
print("\tRouter count is 0. No Deployment is created.")
print("14. Backup Secrets")
if not ignore_404(cluster.get_backup_account):
print("\tPreparing...")
secrets = backup_objects.prepare_backup_secrets(icspec)
print("\tCreating...")
for secret in secrets:
print("\t\t", secret["metadata"]["name"])
kopf.adopt(secret)
api_core.create_namespaced_secret(namespace=namespace, body=secret)
print("15. Service Monitors")
monitors = cluster_objects.prepare_metrics_service_monitors(cluster.parsed_spec, logger)
if len(monitors) == 0:
print("\tNone requested")
for monitor in monitors:
if not ignore_404(lambda: cluster.get_service_monitor(monitor['metadata']['name'])):
print(f"\tCreating ServiceMonitor {monitor} ...")
kopf.adopt(monitor)
try:
api_customobj.create_namespaced_custom_object(
"monitoring.coreos.com", "v1", cluster.namespace,
"servicemonitors", monitor)
except Exception as exc:
# This might be caused by Prometheus Operator missing
# we won't fail for that
print(f"\tServiceMonitor {monitor['metadata']['name']} NOT created!")
print(exc)
cluster.warn(action="CreateCluster", reason="CreateResourceFailed", message=f"{exc}")
except Exception as exc:
cluster.warn(action="CreateCluster", reason="CreateResourceFailed",
message=f"{exc}")
raise
print(f"13. Setting operator version for the IC to {DEFAULT_OPERATOR_VERSION_TAG}")
cluster.set_operator_version(DEFAULT_OPERATOR_VERSION_TAG)
cluster.info(action="CreateCluster", reason="ResourcesCreated",
message="Dependency resources created, switching status to PENDING")
cluster.set_status({
"cluster": {
"status": diagnose.ClusterDiagStatus.PENDING.value,
"onlineInstances": 0,
"type": diagnose.ClusterInClusterSetType.PRIMARY.value if (not icspec.initDB or not icspec.initDB.cluster_set) else diagnose.ClusterInClusterSetType.REPLICA_CANDIDATE.value,
"lastProbeTime": utils.isotime()
}})