func()

in shardingsphere-operator/pkg/controllers/storage_node_controller.go [193:242]


func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) {
	var err error
	var oldStatus = node.Status.DeepCopy()

	// reconcile storage node with storageProvider
	switch storageProvider.Spec.Provisioner {
	case v1alpha1.ProvisionerAWSRDSInstance:
		if err := r.reconcileAwsRdsInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
		}
	case v1alpha1.ProvisionerAWSRDSCluster:
		if err := r.reconcileAwsRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
		}
	case v1alpha1.ProvisionerAWSAurora:
		if err := r.reconcileAwsAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
		}
	case v1alpha1.ProvisionerCloudNativePG:
		if err := r.reconcileCloudNativePG(ctx, node, storageProvider); err != nil {
			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile CloudNative PG %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
		}
	default:
		r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", storageProvider.Spec.Provisioner))
		return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
	}

	// register storage unit if needed.
	if err := r.registerStorageUnit(ctx, node, storageProvider); err != nil {
		r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName())
		return ctrl.Result{Requeue: true}, err
	}

	desiredState := computeDesiredState(node.Status)

	if !reflect.DeepEqual(oldStatus, desiredState) {
		node.Status = desiredState
		err := r.Status().Update(ctx, node)
		if err != nil {
			r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName()))
			return ctrl.Result{Requeue: true}, err
		}
	}

	return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}