in pkg/controller/integration/monitor.go [62:162]
func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
// When in InitializationFailed condition a kit is not available for the integration
// so handle it differently from the rest
if isInInitializationFailed(integration.Status) {
// Only check if the Integration requires a rebuild
return action.checkDigestAndRebuild(integration, nil)
}
// At that staged the Integration must have a Kit
if integration.Status.IntegrationKit == nil {
return nil, fmt.Errorf("no kit set on integration %s", integration.Name)
}
kit, err := kubernetes.GetIntegrationKit(ctx, action.client,
integration.Status.IntegrationKit.Name, integration.Status.IntegrationKit.Namespace)
if err != nil {
return nil, fmt.Errorf("unable to find integration kit %s/%s: %w",
integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err)
}
// Check if the Integration requires a rebuild
if changed, err := action.checkDigestAndRebuild(integration, kit); err != nil {
return nil, err
} else if changed != nil {
return changed, nil
}
// Check if an IntegrationKit with higher priority is ready
priority, ok := kit.Labels[v1.IntegrationKitPriorityLabel]
if !ok {
priority = "0"
}
withHigherPriority, err := labels.NewRequirement(v1.IntegrationKitPriorityLabel,
selection.GreaterThan, []string{priority})
if err != nil {
return nil, err
}
kits, err := lookupKitsForIntegration(ctx, action.client, integration, ctrl.MatchingLabelsSelector{
Selector: labels.NewSelector().Add(*withHigherPriority),
})
if err != nil {
return nil, err
}
priorityReadyKit, err := findHighestPriorityReadyKit(kits)
if err != nil {
return nil, err
}
if priorityReadyKit != nil {
integration.SetIntegrationKit(priorityReadyKit)
}
// Run traits that are enabled for the phase
environment, err := trait.Apply(ctx, action.client, integration, kit)
if err != nil {
return nil, err
}
// Enforce the scale sub-resource label selector.
// It is used by the HPA that queries the scale sub-resource endpoint,
// to list the pods owned by the integration.
integration.Status.Selector = v1.IntegrationLabel + "=" + integration.Name
// Update the replicas count
pendingPods := &corev1.PodList{}
err = action.client.List(ctx, pendingPods,
ctrl.InNamespace(integration.Namespace),
ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
ctrl.MatchingFields{"status.phase": string(corev1.PodPending)})
if err != nil {
return nil, err
}
runningPods := &corev1.PodList{}
err = action.client.List(ctx, runningPods,
ctrl.InNamespace(integration.Namespace),
ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)})
if err != nil {
return nil, err
}
nonTerminatingPods := 0
for _, pod := range runningPods.Items {
if pod.DeletionTimestamp != nil {
continue
}
nonTerminatingPods++
}
podCount := int32(len(pendingPods.Items) + nonTerminatingPods)
integration.Status.Replicas = &podCount
// Reconcile Integration phase and ready condition
if integration.Status.Phase == v1.IntegrationPhaseDeploying {
integration.Status.Phase = v1.IntegrationPhaseRunning
}
if err = action.updateIntegrationPhaseAndReadyCondition(
ctx, environment, integration, pendingPods.Items, runningPods.Items,
); err != nil {
return nil, err
}
return integration, nil
}