in rocketmq-knative/source/pkg/controller/sdk/reconciler.go [50:92]
func (r *Reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := logging.WithLogger(context.TODO(), r.logger.With(zap.Any("request", request)))
logger := logging.FromContext(ctx)
logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request)
original := r.provider.Parent.DeepCopyObject()
err := r.client.Get(context.TODO(), request.NamespacedName, original)
if errors.IsNotFound(err) {
logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request)
return reconcile.Result{}, nil
}
if err != nil {
logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request)
return reconcile.Result{}, err
}
// Don't modify the cache's copy
obj := original.DeepCopyObject()
// Reconcile this copy of the Source and then write back any status
// updates regardless of whether the reconcile error out.
reconcileErr := r.provider.Reconciler.Reconcile(ctx, obj)
if reconcileErr != nil {
logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), reconcileErr)
}
if needsUpdate, err := r.needsUpdate(ctx, original, obj); err != nil {
logger.Desugar().Error("Unable to determine if an update is needed", zap.Error(err), zap.Any("original", original), zap.Any("obj", obj))
return reconcile.Result{}, err
} else if needsUpdate {
if _, err := r.update(ctx, request, obj); err != nil {
logger.Desugar().Error("Failed to update", zap.Error(err), zap.Any("objectKind", r.provider.Parent.GetObjectKind()))
return reconcile.Result{}, err
}
}
// Requeue if the resource is not ready:
return reconcile.Result{}, reconcileErr
}