in pkg/controller/kameletbinding/integration.go [45:208]
func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (*v1.Integration, error) {
controller := true
blockOwnerDeletion := true
annotations := util.CopyMap(binding.Annotations)
// avoid propagating the icon to the integration as it's heavyweight and not needed
delete(annotations, v1alpha1.AnnotationIcon)
it := v1.Integration{
ObjectMeta: metav1.ObjectMeta{
Namespace: binding.Namespace,
Name: binding.Name,
Annotations: annotations,
Labels: util.CopyMap(binding.Labels),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: binding.APIVersion,
Kind: binding.Kind,
Name: binding.Name,
UID: binding.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
},
},
}
// creator labels
if it.GetLabels() == nil {
it.SetLabels(make(map[string]string))
}
it.GetLabels()[kubernetes.CamelCreatorLabelKind] = binding.Kind
it.GetLabels()[kubernetes.CamelCreatorLabelName] = binding.Name
// start from the integration spec defined in the binding
if binding.Spec.Integration != nil {
it.Spec = *binding.Spec.Integration.DeepCopy()
}
// Set replicas (or override podspecable value) if present
if binding.Spec.Replicas != nil {
replicas := *binding.Spec.Replicas
it.Spec.Replicas = &replicas
}
profile, err := determineProfile(ctx, c, binding)
if err != nil {
return nil, err
}
it.Spec.Profile = profile
if binding.Spec.ServiceAccountName != "" {
it.Spec.ServiceAccountName = binding.Spec.ServiceAccountName
}
bindingContext := bindings.V1alpha1BindingContext{
Ctx: ctx,
Client: c,
Namespace: it.Namespace,
Profile: profile,
}
from, err := bindings.TranslateV1alpha1(bindingContext, endpointTypeSourceContext, binding.Spec.Source)
if err != nil {
return nil, fmt.Errorf("could not determine source URI: %w", err)
}
to, err := bindings.TranslateV1alpha1(bindingContext, endpointTypeSinkContext, binding.Spec.Sink)
if err != nil {
return nil, fmt.Errorf("could not determine sink URI: %w", err)
}
// error handler is optional
errorHandler, err := maybeErrorHandler(binding.Spec.ErrorHandler, bindingContext)
if err != nil {
return nil, fmt.Errorf("could not determine error handler: %w", err)
}
steps := make([]*bindings.Binding, 0, len(binding.Spec.Steps))
for idx, step := range binding.Spec.Steps {
position := idx
stepKameletBinding, err := bindings.TranslateV1alpha1(bindingContext, bindings.V1alpha1EndpointContext{
Type: v1alpha1.EndpointTypeAction,
Position: &position,
}, step)
if err != nil {
return nil, fmt.Errorf("could not determine URI for step %d: %w", idx, err)
}
steps = append(steps, stepKameletBinding)
}
if to.Step == nil && to.URI == "" {
return nil, fmt.Errorf("illegal step definition for sink step: either Step or URI should be provided")
}
if from.URI == "" {
return nil, fmt.Errorf("illegal step definition for source step: URI should be provided")
}
for index, step := range steps {
if step.Step == nil && step.URI == "" {
return nil, fmt.Errorf("illegal step definition for step %d: either Step or URI should be provided", index)
}
}
if err := configureKameletBinding(&it, from); err != nil {
return nil, err
}
if err := configureKameletBinding(&it, steps...); err != nil {
return nil, err
}
if err := configureKameletBinding(&it, to); err != nil {
return nil, err
}
if err := configureKameletBinding(&it, errorHandler); err != nil {
return nil, err
}
if it.Spec.Configuration != nil {
sort.SliceStable(it.Spec.Configuration, func(i, j int) bool {
mi, mj := it.Spec.Configuration[i], it.Spec.Configuration[j]
switch {
case mi.Type != mj.Type:
return mi.Type < mj.Type
default:
return mi.Value < mj.Value
}
})
}
dslSteps := make([]map[string]interface{}, 0)
if from.Step != nil {
dslSteps = append(dslSteps, from.AsYamlDSL())
}
for _, step := range steps {
dslSteps = append(dslSteps, step.AsYamlDSL())
}
if to.Step != nil {
dslSteps = append(dslSteps, to.AsYamlDSL())
}
dslSteps = append(dslSteps, map[string]interface{}{
"to": to.URI,
})
fromWrapper := map[string]interface{}{
"uri": from.URI,
"steps": dslSteps,
}
flowRoute := map[string]interface{}{
"route": map[string]interface{}{
"id": "binding",
"from": fromWrapper,
},
}
encodedRoute, err := json.Marshal(flowRoute)
if err != nil {
return nil, err
}
it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedRoute})
return &it, nil
}