in pkg/controller/pipe/integration.go [47:225]
func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1.Pipe) (*v1.Integration, error) {
controller := true
blockOwnerDeletion := true
annotations := util.CopyMap(binding.Annotations)
// avoid propagating the icon to the integration as it's heavyweight and not needed
delete(annotations, v1.AnnotationIcon)
traits, err := extractAndDeleteTraits(c, annotations)
if err != nil {
return nil, fmt.Errorf("could not marshal trait annotations %w", err)
}
it := v1.Integration{
ObjectMeta: metav1.ObjectMeta{
Namespace: binding.Namespace,
Name: binding.Name,
Annotations: annotations,
Labels: util.CopyMap(binding.Labels),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: binding.APIVersion,
Kind: binding.Kind,
Name: binding.Name,
UID: binding.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
},
},
}
// creator labels
if it.GetLabels() == nil {
it.SetLabels(make(map[string]string))
}
it.GetLabels()[kubernetes.CamelCreatorLabelKind] = binding.Kind
it.GetLabels()[kubernetes.CamelCreatorLabelName] = binding.Name
if traits != nil {
it.Spec.Traits = *traits
}
// Set replicas (or override podspecable value) if present
if binding.Spec.Replicas != nil {
replicas := *binding.Spec.Replicas
it.Spec.Replicas = &replicas
}
profile, err := determineTraitProfile(ctx, c, binding)
if err != nil {
return nil, err
}
it.Spec.Profile = profile
if binding.Spec.ServiceAccountName != "" {
it.Spec.ServiceAccountName = binding.Spec.ServiceAccountName
}
bindingContext := bindings.BindingContext{
Ctx: ctx,
Client: c,
Namespace: it.Namespace,
Profile: profile,
Metadata: it.Annotations,
}
from, err := bindings.Translate(bindingContext, endpointTypeSourceContext, binding.Spec.Source)
if err != nil {
return nil, err
}
to, err := bindings.Translate(bindingContext, endpointTypeSinkContext, binding.Spec.Sink)
if err != nil {
return nil, err
}
// error handler is optional
errorHandler, err := maybeErrorHandler(binding.Spec.ErrorHandler, bindingContext)
if err != nil {
return nil, err
}
steps := make([]*bindings.Binding, 0, len(binding.Spec.Steps))
for idx, step := range binding.Spec.Steps {
position := idx
stepBinding, err := bindings.Translate(bindingContext, bindings.EndpointContext{
Type: v1.EndpointTypeAction,
Position: &position,
}, step)
if err != nil {
return nil, fmt.Errorf("could not determine URI for step %d: %w", idx, err)
}
steps = append(steps, stepBinding)
}
if to.Step == nil && to.URI == "" {
return nil, fmt.Errorf("illegal step definition for sink step: either Step or URI should be provided")
}
if from.URI == "" {
return nil, fmt.Errorf("illegal step definition for source step: URI should be provided")
}
for index, step := range steps {
if step.Step == nil && step.URI == "" {
return nil, fmt.Errorf("illegal step definition for step %d: either Step or URI should be provided", index)
}
}
if err := configureBinding(&it, from); err != nil {
return nil, err
}
if err := configureBinding(&it, steps...); err != nil {
return nil, err
}
if err := configureBinding(&it, to); err != nil {
return nil, err
}
if err := configureBinding(&it, errorHandler); err != nil {
return nil, err
}
if it.Spec.Configuration != nil {
sort.SliceStable(it.Spec.Configuration, func(i, j int) bool {
mi, mj := it.Spec.Configuration[i], it.Spec.Configuration[j]
switch {
case mi.Type != mj.Type:
return mi.Type < mj.Type
default:
return mi.Value < mj.Value
}
})
}
dslSteps := make([]map[string]interface{}, 0)
if from.Step != nil {
dslSteps = append(dslSteps, from.AsYamlDSL())
}
for _, step := range steps {
dslSteps = append(dslSteps, step.AsYamlDSL())
}
if to.Step != nil {
dslSteps = append(dslSteps, to.AsYamlDSL())
}
dslSteps = append(dslSteps, map[string]interface{}{
"to": to.URI,
})
fromWrapper := map[string]interface{}{
"uri": from.URI,
"steps": dslSteps,
}
flowRoute := map[string]interface{}{
"route": map[string]interface{}{
"id": "binding",
"from": fromWrapper,
},
}
if errorHandler != nil {
eh := translateCamelErrorHandler(errorHandler)
encodedErrorHandler, err := json.Marshal(eh)
if err != nil {
return nil, err
}
it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedErrorHandler})
}
encodedRoute, err := json.Marshal(flowRoute)
if err != nil {
return nil, err
}
it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedRoute})
return &it, nil
}