func()

in rocketmq-knative/source/pkg/controller/sdk/reconciler.go [50:92]


func (r *Reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	ctx := logging.WithLogger(context.TODO(), r.logger.With(zap.Any("request", request)))
	logger := logging.FromContext(ctx)

	logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request)

	original := r.provider.Parent.DeepCopyObject()

	err := r.client.Get(context.TODO(), request.NamespacedName, original)

	if errors.IsNotFound(err) {
		logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request)
		return reconcile.Result{}, nil
	}

	if err != nil {
		logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request)
		return reconcile.Result{}, err
	}

	// Don't modify the cache's copy
	obj := original.DeepCopyObject()

	// Reconcile this copy of the Source and then write back any status
	// updates regardless of whether the reconcile error out.
	reconcileErr := r.provider.Reconciler.Reconcile(ctx, obj)
	if reconcileErr != nil {
		logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), reconcileErr)
	}

	if needsUpdate, err := r.needsUpdate(ctx, original, obj); err != nil {
		logger.Desugar().Error("Unable to determine if an update is needed", zap.Error(err), zap.Any("original", original), zap.Any("obj", obj))
		return reconcile.Result{}, err
	} else if needsUpdate {
		if _, err := r.update(ctx, request, obj); err != nil {
			logger.Desugar().Error("Failed to update", zap.Error(err), zap.Any("objectKind", r.provider.Parent.GetObjectKind()))
			return reconcile.Result{}, err
		}
	}

	// Requeue if the resource is not ready:
	return reconcile.Result{}, reconcileErr
}