in pkg/controller/integration/monitor.go [516:642]
func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pods []corev1.Pod) (int32, bool, error) {
// as a default we assume the Integration is Ready
readyCondition := v1.IntegrationCondition{
Type: v1.IntegrationConditionReady,
Status: corev1.ConditionTrue,
Pods: make([]v1.PodCondition, len(pods)),
}
readyPods := int32(0)
unreadyPods := int32(0)
runtimeReady := true
runtimeFailed := false
probeReadinessOk := true
for i := range pods {
pod := &pods[i]
readyCondition.Pods[i].Name = pod.Name
for p := range pod.Status.Conditions {
if pod.Status.Conditions[p].Type == corev1.PodReady {
readyCondition.Pods[i].Condition = pod.Status.Conditions[p]
break
}
}
// If it's in ready status, then we don't care to probe.
if ready := kubernetes.GetPodCondition(*pod, corev1.PodReady); ready.Status == corev1.ConditionTrue {
readyPods++
continue
}
unreadyPods++
container := getIntegrationContainer(environment, pod)
if container == nil {
return readyPods, false, fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name)
}
// TODO: this code must be moved to a dedicated function
//
//nolint:nestif
if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil {
body, err := proxyGetHTTPProbe(ctx, action.client, probe, pod, container)
// When invoking the HTTP probe, the kubernetes client exposes a very
// specific behavior:
//
// - if there is no error, that means the pod in not ready just because
// the probe has to be called few time as per configuration, so it means
// it's not ready, but the probe is OK, and the pod could become ready
// at some point
// - if the error is Service Unavailable (HTTP 503) then it means the pod
// is not ready and the probe is failing, in this case we can use the
// response to scrape for camel info
//
// Here an example of a failed probe (from curl):
//
// Trying 127.0.0.1:8080...
// TCP_NODELAY set
// Connected to localhost (127.0.0.1) port 8080 (#0)
// GET /q/health/ready HTTP/1.1
// Host: localhost:8080
// User-Agent: curl/7.68.0
// Accept: */*
//
// Mark bundle as not supporting multiuse
// HTTP/1.1 503 Service Unavailable
// content-type: application/json; charset=UTF-8
// content-length: 871
//
// {
// "status": "DOWN",
// "checks": [ {
// "name": "camel-routes",
// "status": "DOWN",
// "data": {
// "route.id": "route1",
// "route.status": "Stopped",
// "check.kind": "READINESS"
// }
// }]
// }
if err == nil {
continue
}
if errors.Is(err, context.DeadlineExceeded) {
readyCondition.Pods[i].Condition.Message = fmt.Sprintf("readiness probe timed out for Pod %s/%s", pod.Namespace, pod.Name)
runtimeReady = false
continue
}
if !k8serrors.IsServiceUnavailable(err) {
readyCondition.Pods[i].Condition.Message = fmt.Sprintf("readiness probe failed for Pod %s/%s: %s", pod.Namespace, pod.Name, err.Error())
runtimeReady = false
continue
}
health, err := NewHealthCheck(body)
if err != nil {
return readyPods, false, err
}
for _, check := range health.Checks {
if check.Status == v1.HealthCheckStatusUp {
continue
}
runtimeReady = false
runtimeFailed = true
readyCondition.Pods[i].Health = append(readyCondition.Pods[i].Health, check)
}
}
}
if runtimeFailed {
probeReadinessOk = false
readyCondition.Reason = v1.IntegrationConditionErrorReason
readyCondition.Status = corev1.ConditionFalse
readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods)
integration.Status.SetConditions(readyCondition)
}
if !runtimeReady {
probeReadinessOk = false
readyCondition.Reason = v1.IntegrationConditionRuntimeNotReadyReason
readyCondition.Status = corev1.ConditionFalse
readyCondition.Message = fmt.Sprintf("%d/%d pods are not ready", unreadyPods, unreadyPods+readyPods)
integration.Status.SetConditions(readyCondition)
}
return readyPods, probeReadinessOk, nil
}