in pkg/testutils/ingestor/ingestor.go [346:424]
func VerifyIngestorRunning(ctx context.Context, restConfig *rest.Config) (bool, *IngestorStatus, error) {
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return false, nil, fmt.Errorf("failed to create kubernetes clientset: %w", err)
}
namespace := "adx-mon" // Namespace where ingestor is deployed
labelSelector := "app=ingestor"
// List pods with the ingestor label
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return false, nil, fmt.Errorf("failed to list ingestor pods: %w", err)
}
status := &IngestorStatus{
TotalPods: len(pods.Items),
RunningPods: 0,
UnhealthyPods: 0,
PodStatuses: make(map[string]string),
RestartCounts: make(map[string]int32),
ContainerState: make(map[string][]string),
}
// If no pods found, return false
if status.TotalPods == 0 {
return false, status, fmt.Errorf("no ingestor pods found")
}
// Check each pod's status
for _, pod := range pods.Items {
podName := pod.Name
status.PodStatuses[podName] = string(pod.Status.Phase)
// Count running pods
if pod.Status.Phase == corev1.PodRunning {
status.RunningPods++
} else {
status.UnhealthyPods++
}
// Track container restart counts and states
totalRestarts := int32(0)
containerStates := make([]string, 0)
// Check container statuses
for _, containerStatus := range pod.Status.ContainerStatuses {
totalRestarts += containerStatus.RestartCount
// Determine container state
state := "unknown"
if containerStatus.State.Running != nil {
state = fmt.Sprintf("running (started at %s)", containerStatus.State.Running.StartedAt.Format(time.RFC3339))
} else if containerStatus.State.Waiting != nil {
state = fmt.Sprintf("waiting: %s - %s",
containerStatus.State.Waiting.Reason,
containerStatus.State.Waiting.Message)
} else if containerStatus.State.Terminated != nil {
state = fmt.Sprintf("terminated: %s (exit code %d) - %s",
containerStatus.State.Terminated.Reason,
containerStatus.State.Terminated.ExitCode,
containerStatus.State.Terminated.Message)
}
containerStates = append(containerStates,
fmt.Sprintf("%s: %s", containerStatus.Name, state))
}
status.RestartCounts[podName] = totalRestarts
status.ContainerState[podName] = containerStates
}
// All pods should be running and have no excessive restarts
allPodsRunning := status.RunningPods == status.TotalPods
return allPodsRunning, status, nil
}