func()

in pkg/controller/pipe/monitor.go [53:153]


func (action *monitorAction) Handle(ctx context.Context, pipe *v1.Pipe) (*v1.Pipe, error) {
	key := client.ObjectKey{
		Namespace: pipe.Namespace,
		Name:      pipe.Name,
	}
	it := v1.Integration{}
	if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
		action.L.Info("Re-initializing Pipe")
		return initializePipe(ctx, action.client, action.L, pipe)
	} else if err != nil {
		return nil, fmt.Errorf("could not load integration for Pipe %q: %w", pipe.Name, err)
	}

	operatorIDChanged := v1.GetOperatorIDAnnotation(pipe) != "" &&
		(v1.GetOperatorIDAnnotation(pipe) != v1.GetOperatorIDAnnotation(&it))

	integrationProfileChanged := v1.GetIntegrationProfileAnnotation(pipe) != "" &&
		(v1.GetIntegrationProfileAnnotation(pipe) != v1.GetIntegrationProfileAnnotation(&it))

	integrationProfileNamespaceChanged := v1.GetIntegrationProfileNamespaceAnnotation(pipe) != "" &&
		(v1.GetIntegrationProfileNamespaceAnnotation(pipe) != v1.GetIntegrationProfileNamespaceAnnotation(&it))

	sameTraits, err := trait.IntegrationAndPipeSameTraits(action.client, &it, pipe)
	if err != nil {
		return nil, err
	}

	// Check if the integration needs to be changed
	expected, err := CreateIntegrationFor(ctx, action.client, pipe)
	if err != nil {
		pipe.Status.Phase = v1.PipePhaseError
		pipe.Status.SetErrorCondition(
			v1.PipeConditionReady,
			"IntegrationError",
			err,
		)
		return pipe, err
	}

	semanticEquality := equality.Semantic.DeepDerivative(expected.Spec, it.Spec)

	if !semanticEquality || operatorIDChanged || integrationProfileChanged || integrationProfileNamespaceChanged || !sameTraits {
		action.L.Info(
			"Pipe needs a rebuild",
			"semantic-equality", !semanticEquality,
			"operatorid-changed", operatorIDChanged,
			"integration-profile-changed", integrationProfileChanged || integrationProfileNamespaceChanged,
			"traits-changed", !sameTraits)

		// Pipe has changed and needs rebuild
		target := pipe.DeepCopy()
		// Rebuild the integration
		target.Status.Phase = v1.PipePhaseNone
		target.Status.SetCondition(
			v1.PipeConditionReady,
			corev1.ConditionFalse,
			"",
			"",
		)
		return target, nil
	}

	// Map integration phase and conditions to Pipe
	target := pipe.DeepCopy()

	switch it.Status.Phase {

	case v1.IntegrationPhaseRunning:
		target.Status.Phase = v1.PipePhaseReady
		setPipeReadyCondition(target, &it)

	case v1.IntegrationPhaseError:
		target.Status.Phase = v1.PipePhaseError
		setPipeReadyCondition(target, &it)

	default:
		target.Status.Phase = v1.PipePhaseCreating

		c := v1.PipeCondition{
			Type:    v1.PipeConditionReady,
			Status:  corev1.ConditionFalse,
			Reason:  string(target.Status.Phase),
			Message: fmt.Sprintf("Integration %q is in %q phase", it.GetName(), target.Status.Phase),
		}

		if condition := it.Status.GetCondition(v1.IntegrationConditionReady); condition != nil {
			if condition.Pods != nil {
				c.Pods = make([]v1.PodCondition, 0, len(condition.Pods))
				c.Pods = append(c.Pods, condition.Pods...)
			}
		}

		target.Status.SetConditions(c)
	}

	// Mirror status replicas and selector
	target.Status.Replicas = it.Status.Replicas
	target.Status.Selector = it.Status.Selector

	return target, nil
}