func()

in pkg/trait/knative.go [391:474]


func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error {
	if !ptr.Deref(t.SinkBinding, false) {
		return nil
	}
	var serviceType knativeapi.CamelServiceType
	services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel)
	if len(services) > 0 {
		serviceType = knativeapi.CamelServiceTypeChannel
	}
	services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...)
	if len(serviceType) == 0 && len(services) > 0 {
		serviceType = knativeapi.CamelServiceTypeEndpoint
	}
	services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...)
	if len(serviceType) == 0 && len(services) > 0 {
		serviceType = knativeapi.CamelServiceTypeEvent
	}

	if len(services) != 1 {
		return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services))
	}

	err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, serviceURI string, _ func() (*url.URL, error)) error {
		// Mark the service which will be used as SinkBinding
		env.SetSinkBinding(ref.Name, knativeapi.CamelEndpointKindSink, serviceType, ref.APIVersion, ref.Kind)

		if !e.IntegrationInPhase(v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
			return nil
		}

		e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error {
			sinkBindingInjected := false
			e.Resources.Visit(func(object runtime.Object) {
				gvk := object.GetObjectKind().GroupVersionKind()
				if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") {
					sinkBindingInjected = true
				}
			})
			if sinkBindingInjected {
				return nil
			}

			controller := e.Resources.GetController(func(object ctrl.Object) bool {
				return true
			})
			if controller != nil && !reflect.ValueOf(controller).IsNil() {
				gvk := controller.GetObjectKind().GroupVersionKind()
				av, k := gvk.ToAPIVersionAndKind()
				source := corev1.ObjectReference{
					Kind:       k,
					Namespace:  e.Integration.Namespace,
					Name:       e.Integration.Name,
					APIVersion: av,
				}
				target := corev1.ObjectReference{
					Kind:       ref.Kind,
					Namespace:  e.Integration.Namespace,
					Name:       ref.Name,
					APIVersion: ref.APIVersion,
				}

				if ptr.Deref(t.NamespaceLabel, true) {
					// set the namespace label to allow automatic sinkbinding injection
					enabled, err := knativeutil.EnableKnativeBindInNamespace(e.Ctx, e.Client, e.Integration.Namespace)
					if err != nil {
						t.L.Errorf(err, "Error setting label 'bindings.knative.dev/include=true' in namespace: %s", e.Integration.Namespace)
					} else if enabled {
						t.L.Infof("Label 'bindings.knative.dev/include=true' set in namespace: %s", e.Integration.Namespace)
					}
				}

				// Add the SinkBinding in first position, to make sure it is created
				// before the reference source, so that the SinkBinding webhook has
				// all the information to perform injection.
				e.Resources.AddFirst(knativeutil.CreateSinkBinding(source, target))
			}
			return nil
		})

		return nil
	})

	return err
}