in pkg/controller/pipe/monitor.go [54:152]
func (action *monitorAction) Handle(ctx context.Context, binding *v1.Pipe) (*v1.Pipe, error) {
key := client.ObjectKey{
Namespace: binding.Namespace,
Name: binding.Name,
}
it := v1.Integration{}
if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
target := binding.DeepCopy()
// Rebuild the integration
target.Status.Phase = v1.PipePhaseNone
target.Status.SetCondition(
v1.PipeConditionReady,
corev1.ConditionFalse,
"",
"",
)
return target, nil
} else if err != nil {
return nil, fmt.Errorf("could not load integration for Pipe %q: %w", binding.Name, err)
}
operatorIDChanged := v1.GetOperatorIDAnnotation(binding) != "" &&
(v1.GetOperatorIDAnnotation(binding) != v1.GetOperatorIDAnnotation(&it))
sameTraits, err := trait.IntegrationAndPipeSameTraits(&it, binding)
if err != nil {
return nil, err
}
// Check if the integration needs to be changed
expected, err := CreateIntegrationFor(ctx, action.client, binding)
if err != nil {
binding.Status.Phase = v1.PipePhaseError
binding.Status.SetErrorCondition(v1.PipeIntegrationConditionError,
"Couldn't create an Integration custom resource", err)
return binding, err
}
semanticEquality := equality.Semantic.DeepDerivative(expected.Spec, it.Spec)
if !semanticEquality || operatorIDChanged || !sameTraits {
action.L.Info(
"Pipe needs a rebuild",
"semantic-equality", !semanticEquality,
"operatorid-changed", operatorIDChanged,
"traits-changed", !sameTraits)
// Pipe has changed and needs rebuild
target := binding.DeepCopy()
// Rebuild the integration
target.Status.Phase = v1.PipePhaseNone
target.Status.SetCondition(
v1.PipeConditionReady,
corev1.ConditionFalse,
"",
"",
)
return target, nil
}
// Map integration phase and conditions to Pipe
target := binding.DeepCopy()
switch it.Status.Phase {
case v1.IntegrationPhaseRunning:
target.Status.Phase = v1.PipePhaseReady
setPipeReadyCondition(target, &it)
case v1.IntegrationPhaseError:
target.Status.Phase = v1.PipePhaseError
setPipeReadyCondition(target, &it)
default:
target.Status.Phase = v1.PipePhaseCreating
c := v1.PipeCondition{
Type: v1.PipeConditionReady,
Status: corev1.ConditionFalse,
Reason: string(target.Status.Phase),
Message: fmt.Sprintf("Integration %q is in %q phase", it.GetName(), target.Status.Phase),
}
if condition := it.Status.GetCondition(v1.IntegrationConditionReady); condition != nil {
if condition.Pods != nil {
c.Pods = make([]v1.PodCondition, 0, len(condition.Pods))
c.Pods = append(c.Pods, condition.Pods...)
}
}
target.Status.SetConditions(c)
}
// Mirror status replicas and selector
target.Status.Replicas = it.Status.Replicas
target.Status.Selector = it.Status.Selector
return target, nil
}