in pkg/controller/pipe/monitor.go [53:153]
func (action *monitorAction) Handle(ctx context.Context, pipe *v1.Pipe) (*v1.Pipe, error) {
key := client.ObjectKey{
Namespace: pipe.Namespace,
Name: pipe.Name,
}
it := v1.Integration{}
if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
action.L.Info("Re-initializing Pipe")
return initializePipe(ctx, action.client, action.L, pipe)
} else if err != nil {
return nil, fmt.Errorf("could not load integration for Pipe %q: %w", pipe.Name, err)
}
operatorIDChanged := v1.GetOperatorIDAnnotation(pipe) != "" &&
(v1.GetOperatorIDAnnotation(pipe) != v1.GetOperatorIDAnnotation(&it))
integrationProfileChanged := v1.GetIntegrationProfileAnnotation(pipe) != "" &&
(v1.GetIntegrationProfileAnnotation(pipe) != v1.GetIntegrationProfileAnnotation(&it))
integrationProfileNamespaceChanged := v1.GetIntegrationProfileNamespaceAnnotation(pipe) != "" &&
(v1.GetIntegrationProfileNamespaceAnnotation(pipe) != v1.GetIntegrationProfileNamespaceAnnotation(&it))
sameTraits, err := trait.IntegrationAndPipeSameTraits(action.client, &it, pipe)
if err != nil {
return nil, err
}
// Check if the integration needs to be changed
expected, err := CreateIntegrationFor(ctx, action.client, pipe)
if err != nil {
pipe.Status.Phase = v1.PipePhaseError
pipe.Status.SetErrorCondition(
v1.PipeConditionReady,
"IntegrationError",
err,
)
return pipe, err
}
semanticEquality := equality.Semantic.DeepDerivative(expected.Spec, it.Spec)
if !semanticEquality || operatorIDChanged || integrationProfileChanged || integrationProfileNamespaceChanged || !sameTraits {
action.L.Info(
"Pipe needs a rebuild",
"semantic-equality", !semanticEquality,
"operatorid-changed", operatorIDChanged,
"integration-profile-changed", integrationProfileChanged || integrationProfileNamespaceChanged,
"traits-changed", !sameTraits)
// Pipe has changed and needs rebuild
target := pipe.DeepCopy()
// Rebuild the integration
target.Status.Phase = v1.PipePhaseNone
target.Status.SetCondition(
v1.PipeConditionReady,
corev1.ConditionFalse,
"",
"",
)
return target, nil
}
// Map integration phase and conditions to Pipe
target := pipe.DeepCopy()
switch it.Status.Phase {
case v1.IntegrationPhaseRunning:
target.Status.Phase = v1.PipePhaseReady
setPipeReadyCondition(target, &it)
case v1.IntegrationPhaseError:
target.Status.Phase = v1.PipePhaseError
setPipeReadyCondition(target, &it)
default:
target.Status.Phase = v1.PipePhaseCreating
c := v1.PipeCondition{
Type: v1.PipeConditionReady,
Status: corev1.ConditionFalse,
Reason: string(target.Status.Phase),
Message: fmt.Sprintf("Integration %q is in %q phase", it.GetName(), target.Status.Phase),
}
if condition := it.Status.GetCondition(v1.IntegrationConditionReady); condition != nil {
if condition.Pods != nil {
c.Pods = make([]v1.PodCondition, 0, len(condition.Pods))
c.Pods = append(c.Pods, condition.Pods...)
}
}
target.Status.SetConditions(c)
}
// Mirror status replicas and selector
target.Status.Replicas = it.Status.Replicas
target.Status.Selector = it.Status.Selector
return target, nil
}