rocketmq-knative/source/pkg/controller/sdk/reconciler.go (128 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sdk
import (
"context"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)
type Reconciler struct {
client client.Client
recorder record.EventRecorder
scheme *runtime.Scheme
logger zap.SugaredLogger
provider Provider
}
// Verify the struct implements reconcile.Reconciler
var _ reconcile.Reconciler = &Reconciler{}
// Reconcile compares the actual state with the desired, and attempts to
// converge the two.
func (r *Reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := logging.WithLogger(context.TODO(), r.logger.With(zap.Any("request", request)))
logger := logging.FromContext(ctx)
logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request)
original := r.provider.Parent.DeepCopyObject()
err := r.client.Get(context.TODO(), request.NamespacedName, original)
if errors.IsNotFound(err) {
logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request)
return reconcile.Result{}, nil
}
if err != nil {
logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request)
return reconcile.Result{}, err
}
// Don't modify the cache's copy
obj := original.DeepCopyObject()
// Reconcile this copy of the Source and then write back any status
// updates regardless of whether the reconcile error out.
reconcileErr := r.provider.Reconciler.Reconcile(ctx, obj)
if reconcileErr != nil {
logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), reconcileErr)
}
if needsUpdate, err := r.needsUpdate(ctx, original, obj); err != nil {
logger.Desugar().Error("Unable to determine if an update is needed", zap.Error(err), zap.Any("original", original), zap.Any("obj", obj))
return reconcile.Result{}, err
} else if needsUpdate {
if _, err := r.update(ctx, request, obj); err != nil {
logger.Desugar().Error("Failed to update", zap.Error(err), zap.Any("objectKind", r.provider.Parent.GetObjectKind()))
return reconcile.Result{}, err
}
}
// Requeue if the resource is not ready:
return reconcile.Result{}, reconcileErr
}
func (r *Reconciler) InjectClient(c client.Client) error {
r.client = c
_, err := inject.ClientInto(c, r.provider.Reconciler)
return err
}
func (r *Reconciler) InjectConfig(c *rest.Config) error {
_, err := inject.ConfigInto(c, r.provider.Reconciler)
return err
}
func (r *Reconciler) needsUpdate(ctx context.Context, old, new runtime.Object) (bool, error) {
if old == nil {
return true, nil
}
// Check Status.
os, err := NewReflectedStatusAccessor(old)
if err != nil {
return false, err
}
ns, err := NewReflectedStatusAccessor(new)
if err != nil {
return false, err
}
oStatus := os.GetStatus()
nStatus := ns.GetStatus()
if !equality.Semantic.DeepEqual(oStatus, nStatus) {
return true, nil
}
// Check finalizers.
of, err := NewReflectedFinalizersAccessor(old)
if err != nil {
return false, err
}
nf, err := NewReflectedFinalizersAccessor(new)
if err != nil {
return false, err
}
oFinalizers := of.GetFinalizers()
nFinalizers := nf.GetFinalizers()
if !equality.Semantic.DeepEqual(oFinalizers, nFinalizers) {
return true, nil
}
return false, nil
}
func (r *Reconciler) update(ctx context.Context, request reconcile.Request, object runtime.Object) (runtime.Object, error) {
freshObj := r.provider.Parent.DeepCopyObject()
if err := r.client.Get(ctx, request.NamespacedName, freshObj); err != nil {
return nil, err
}
// Finalizers
freshFinalizers, err := NewReflectedFinalizersAccessor(freshObj)
if err != nil {
return nil, err
}
orgFinalizers, err := NewReflectedFinalizersAccessor(object)
if err != nil {
return nil, err
}
freshFinalizers.SetFinalizers(orgFinalizers.GetFinalizers())
if err := r.client.Update(ctx, freshObj); err != nil {
return nil, err
}
// Refetch
freshObj = r.provider.Parent.DeepCopyObject()
if err := r.client.Get(ctx, request.NamespacedName, freshObj); err != nil {
return nil, err
}
// Status
freshStatus, err := NewReflectedStatusAccessor(freshObj)
if err != nil {
return nil, err
}
orgStatus, err := NewReflectedStatusAccessor(object)
if err != nil {
return nil, err
}
freshStatus.SetStatus(orgStatus.GetStatus())
if err := r.client.Status().Update(ctx, freshObj); err != nil {
return nil, err
}
return freshObj, nil
}