in pkg/trait/knative.go [391:474]
func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error {
if !ptr.Deref(t.SinkBinding, false) {
return nil
}
var serviceType knativeapi.CamelServiceType
services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel)
if len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeChannel
}
services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...)
if len(serviceType) == 0 && len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeEndpoint
}
services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...)
if len(serviceType) == 0 && len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeEvent
}
if len(services) != 1 {
return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services))
}
err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, serviceURI string, _ func() (*url.URL, error)) error {
// Mark the service which will be used as SinkBinding
env.SetSinkBinding(ref.Name, knativeapi.CamelEndpointKindSink, serviceType, ref.APIVersion, ref.Kind)
if !e.IntegrationInPhase(v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
return nil
}
e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error {
sinkBindingInjected := false
e.Resources.Visit(func(object runtime.Object) {
gvk := object.GetObjectKind().GroupVersionKind()
if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") {
sinkBindingInjected = true
}
})
if sinkBindingInjected {
return nil
}
controller := e.Resources.GetController(func(object ctrl.Object) bool {
return true
})
if controller != nil && !reflect.ValueOf(controller).IsNil() {
gvk := controller.GetObjectKind().GroupVersionKind()
av, k := gvk.ToAPIVersionAndKind()
source := corev1.ObjectReference{
Kind: k,
Namespace: e.Integration.Namespace,
Name: e.Integration.Name,
APIVersion: av,
}
target := corev1.ObjectReference{
Kind: ref.Kind,
Namespace: e.Integration.Namespace,
Name: ref.Name,
APIVersion: ref.APIVersion,
}
if ptr.Deref(t.NamespaceLabel, true) {
// set the namespace label to allow automatic sinkbinding injection
enabled, err := knativeutil.EnableKnativeBindInNamespace(e.Ctx, e.Client, e.Integration.Namespace)
if err != nil {
t.L.Errorf(err, "Error setting label 'bindings.knative.dev/include=true' in namespace: %s", e.Integration.Namespace)
} else if enabled {
t.L.Infof("Label 'bindings.knative.dev/include=true' set in namespace: %s", e.Integration.Namespace)
}
}
// Add the SinkBinding in first position, to make sure it is created
// before the reference source, so that the SinkBinding webhook has
// all the information to perform injection.
e.Resources.AddFirst(knativeutil.CreateSinkBinding(source, target))
}
return nil
})
return nil
})
return err
}