in rocketmq-knative/source/pkg/reconciler/rocketmqsource.go [100:141]
func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error {
logger := logging.FromContext(ctx).Desugar()
src, ok := object.(*v1alpha1.RocketMQSource)
if !ok {
logger.Error("could not find RocketMQ source", zap.Any("object", object))
return nil
}
deletionTimestamp := src.DeletionTimestamp
if deletionTimestamp != nil {
r.removeFinalizer(src)
return nil
}
r.addFinalizer(src)
src.Status.InitializeConditions()
sinkURI, err := sinks.GetSinkURI(ctx, r.client, src.Spec.Sink, src.Namespace)
if err != nil {
src.Status.MarkNoSink("NotFound", "")
return err
}
src.Status.MarkSink(sinkURI)
_, err = r.createReceiveAdapter(ctx, src, "knative-eventing-default", sinkURI)
if err != nil {
logger.Error("Unable to create the receive adapter", zap.Error(err))
return err
}
src.Status.MarkDeployed()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
return nil
}