in shardingsphere-operator/pkg/controllers/storage_node_controller.go [193:242]
func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) {
var err error
var oldStatus = node.Status.DeepCopy()
// reconcile storage node with storageProvider
switch storageProvider.Spec.Provisioner {
case v1alpha1.ProvisionerAWSRDSInstance:
if err := r.reconcileAwsRdsInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerAWSRDSCluster:
if err := r.reconcileAwsRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerAWSAurora:
if err := r.reconcileAwsAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerCloudNativePG:
if err := r.reconcileCloudNativePG(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile CloudNative PG %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
default:
r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", storageProvider.Spec.Provisioner))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
// register storage unit if needed.
if err := r.registerStorageUnit(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName())
return ctrl.Result{Requeue: true}, err
}
desiredState := computeDesiredState(node.Status)
if !reflect.DeepEqual(oldStatus, desiredState) {
node.Status = desiredState
err := r.Status().Update(ctx, node)
if err != nil {
r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName()))
return ctrl.Result{Requeue: true}, err
}
}
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}