pkg/testutils/ingestor/ingestor.go (375 lines of code) (raw):

//go:build !disableDocker package ingestor import ( "context" "fmt" "os" "path/filepath" "strconv" "strings" "time" adxmonv1 "github.com/Azure/adx-mon/api/v1" "github.com/Azure/adx-mon/pkg/testutils" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/k3s" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apimacherrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" kwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) const ( DefaultImage = "ingestor" DefaultTag = "latest" ) type IngestorContainer struct { testcontainers.Container } func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*IngestorContainer, error) { var relative string for iter := range 4 { relative = strings.Repeat("../", iter) if _, err := os.Stat(filepath.Join(relative, "build/images/Dockerfile.ingestor")); err == nil { break } } var req testcontainers.ContainerRequest if img == "" { req = testcontainers.ContainerRequest{ FromDockerfile: testcontainers.FromDockerfile{ Repo: DefaultImage, Tag: DefaultTag, Context: relative, // repo base Dockerfile: "build/images/Dockerfile.ingestor", KeepImage: true, }, } } else { req = testcontainers.ContainerRequest{ Image: img, } } genericContainerReq := testcontainers.GenericContainerRequest{ ContainerRequest: req, } for _, opt := range opts { if err := opt.Customize(&genericContainerReq); err != nil { return nil, err } } container, err := testcontainers.GenericContainer(ctx, genericContainerReq) var c *IngestorContainer if container != nil { c = &IngestorContainer{Container: container} } if err != nil { return c, fmt.Errorf("generic container: %w", err) } return c, nil } // WithStarted will start the container when it is created. // You don't want to do this if you want to load the container into a k8s cluster. func WithStarted() testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { req.Started = true return nil } } // WithTmpfsMount configures a tmpfs volume with specific size for the ingestor func WithTmpfsMount(sizeBytes int64) testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { // Store the size in a custom field in the Env map // We'll use a special key that won't conflict with actual environment variables if req.Env == nil { req.Env = make(map[string]string) } req.Env["__TMPFS_SIZE_BYTES__"] = fmt.Sprintf("%d", sizeBytes) return nil } } func WithCluster(ctx context.Context, k *k3s.K3sContainer) testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { isolateKinds := []string{"StatefulSet", "CustomResourceDefinition"} writeTo, err := os.MkdirTemp("", "ingestor") if err != nil { return fmt.Errorf("failed to create temp dir: %w", err) } var img string if req.FromDockerfile.Context != "" { // We build to "ingestor:latest", load that image into the k3s cluster img = req.FromDockerfile.Repo + ":" + req.FromDockerfile.Tag } else { img = req.Image } req.LifecycleHooks = append(req.LifecycleHooks, testcontainers.ContainerLifecycleHooks{ PreCreates: []testcontainers.ContainerRequestHook{ func(ctx context.Context, req testcontainers.ContainerRequest) error { if err := k.LoadImages(ctx, img); err != nil { return fmt.Errorf("failed to load image: %w", err) } if err != nil { return fmt.Errorf("failed to create temp dir: %w", err) } // Our quick-start builds the necessary manifests to install ingestor. We need // to extract the actual StatefulSet that runs the ingestor instances so that we // can customize the launch arguments. In addition, these manifests are installed // as static pod manifests, which means they're immutable, so we install the statefulset // separately so that we can mutate the state at runtime. if err := testutils.ExtractManifests(writeTo, "build/k8s/ingestor.yaml", isolateKinds); err != nil { return fmt.Errorf("failed to extract manifests: %w", err) } manifestsPath := filepath.Join(writeTo, "manifests.yaml") containerPath := filepath.Join(testutils.K3sManifests, "ingestor-manifests.yaml") if err := k.CopyFileToContainer(ctx, manifestsPath, containerPath, 0644); err != nil { return fmt.Errorf("failed to copy file to container: %w", err) } return testutils.InstallCrds(ctx, k) }, }, PostCreates: []testcontainers.ContainerHook{ func(ctx context.Context, c testcontainers.Container) error { // Deserialize our statefulset manifest and customize it to our needs ssPath := filepath.Join(writeTo, isolateKinds[0]+".yaml") ssData, err := os.ReadFile(ssPath) if err != nil { return fmt.Errorf("failed to read file: %w", err) } var statefulSet appsv1.StatefulSet if err := yaml.Unmarshal(ssData, &statefulSet); err != nil { return fmt.Errorf("failed to unmarshal statefulset: %w", err) } // (jesthom) our container doesn't have curl, so we can't provide the kubelet // an alternate way to ignore the self-signed cert. We'll have to disable the readiness probe statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe = nil // Readiness probe is https, but the cert is self-signed in our test env // statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe = &corev1.Probe{ // InitialDelaySeconds: 5, // PeriodSeconds: 5, // FailureThreshold: 3, // SuccessThreshold: 1, // TimeoutSeconds: 1, // ProbeHandler: corev1.ProbeHandler{ // Exec: &corev1.ExecAction{ // Command: []string{ // "/bin/sh", // "-c", // "curl -k https://127.0.0.1:443/readyz", // }, // }, // }, // } // Extract the size if this was configured var ( mountSizeBytes int64 = 21474836480 // Default 20GB storageDir = "/mnt/data" // Default path hasVolumeMount = false ) // Check if a tmpfs size was specified if sizeStr, ok := req.Env["__TMPFS_SIZE_BYTES__"]; ok { if size, err := strconv.ParseInt(sizeStr, 10, 64); err == nil { mountSizeBytes = size hasVolumeMount = true } } statefulSet.Spec.Template.Spec.Containers[0].Image = img statefulSet.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullNever statefulSet.Spec.Template.Spec.Containers[0].Args = []string{ fmt.Sprintf("--storage-dir=%s", storageDir), "--max-segment-age=5s", fmt.Sprintf("--max-disk-usage=%d", mountSizeBytes), "--max-transfer-size=10485760", "--max-connections=1000", "--insecure-skip-verify", "--metrics-kusto-endpoints=Metrics=http://kustainer.default.svc.cluster.local:8080", "--logs-kusto-endpoints=Logs=http://kustainer.default.svc.cluster.local:8080", } statefulSet.Spec.Template.Spec.Affinity = nil statefulSet.Spec.Template.Spec.Tolerations = nil // Update StatefulSet to use hostPath volume if hasVolumeMount { // First, find and remove the existing "metrics" volume if it exists for i, vol := range statefulSet.Spec.Template.Spec.Volumes { if vol.Name == "metrics" { // Remove the existing volume by reslicing statefulSet.Spec.Template.Spec.Volumes = append( statefulSet.Spec.Template.Spec.Volumes[:i], statefulSet.Spec.Template.Spec.Volumes[i+1:]..., ) break } } // Now add our tmpfs volume statefulSet.Spec.Template.Spec.Volumes = append(statefulSet.Spec.Template.Spec.Volumes, corev1.Volume{ Name: "metrics", // Use "metrics" to match the existing volumeMount VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{ Medium: corev1.StorageMediumMemory, SizeLimit: &[]resource.Quantity{ resource.MustParse(fmt.Sprintf("%dMi", mountSizeBytes/1024/1024)), }[0], }, }, }) // Add the filler container to the pod spec fillerContainer := createFillerContainer(storageDir) statefulSet.Spec.Template.Spec.Containers = append( statefulSet.Spec.Template.Spec.Containers, fillerContainer, ) // We're keeping the existing volumeMount since it's already pointing to /mnt/data } restConfig, _, err := testutils.GetKubeConfig(ctx, k) if err != nil { return fmt.Errorf("failed to get kube config: %w", err) } clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { return fmt.Errorf("failed to create clientset: %w", err) } ctrlCli, err := ctrlclient.New(restConfig, ctrlclient.Options{}) if err != nil { return fmt.Errorf("failed to create controller client: %w", err) } err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { ns, err := clientset.CoreV1().Namespaces().Get(ctx, statefulSet.GetNamespace(), metav1.GetOptions{}) return ns != nil && err == nil, nil }) if err != nil { return fmt.Errorf("failed to wait for namespace: %w", err) } // Check if StatefulSet already exists existingStatefulSet := &appsv1.StatefulSet{} err = ctrlCli.Get(ctx, types.NamespacedName{ Namespace: statefulSet.Namespace, Name: statefulSet.Name, }, existingStatefulSet) // If it exists, delete it first if err == nil { // StatefulSet exists, delete it if err := ctrlCli.Delete(ctx, existingStatefulSet); err != nil { return fmt.Errorf("failed to delete existing statefulset: %w", err) } // Wait for deletion to complete err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { err := ctrlCli.Get(ctx, types.NamespacedName{ Namespace: statefulSet.Namespace, Name: statefulSet.Name, }, &appsv1.StatefulSet{}) return apimacherrors.IsNotFound(err), nil }) if err != nil { return fmt.Errorf("failed to wait for statefulset deletion: %w", err) } } // Create the new StatefulSet if err := ctrlCli.Create(ctx, &statefulSet); err != nil { return fmt.Errorf("failed to create statefulset: %w", err) } // create new function instance since Create will modify the passed-in object ingestorFunction := makeIngestorFunction() // Continue with Function creation... if err := ctrlCli.Get(ctx, types.NamespacedName{Namespace: ingestorFunction.Namespace, Name: ingestorFunction.Name}, ingestorFunction); err != nil { if apimacherrors.IsNotFound(err) { if err := ctrlCli.Create(ctx, ingestorFunction); err != nil { return fmt.Errorf("failed to create function: %w", err) } } else { return fmt.Errorf("failed to get function: %w", err) } } return nil }, }, }) return nil } } // IngestorStatus contains details about the ingestor pods type IngestorStatus struct { TotalPods int // Total number of ingestor pods RunningPods int // Number of running pods UnhealthyPods int // Number of pods not in running state PodStatuses map[string]string // Map of pod name to status RestartCounts map[string]int32 // Map of pod name to restart count ContainerState map[string][]string // Map of pod name to container states } // VerifyIngestorRunning checks if ingestor pods are running properly func VerifyIngestorRunning(ctx context.Context, restConfig *rest.Config) (bool, *IngestorStatus, error) { clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { return false, nil, fmt.Errorf("failed to create kubernetes clientset: %w", err) } namespace := "adx-mon" // Namespace where ingestor is deployed labelSelector := "app=ingestor" // List pods with the ingestor label pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ LabelSelector: labelSelector, }) if err != nil { return false, nil, fmt.Errorf("failed to list ingestor pods: %w", err) } status := &IngestorStatus{ TotalPods: len(pods.Items), RunningPods: 0, UnhealthyPods: 0, PodStatuses: make(map[string]string), RestartCounts: make(map[string]int32), ContainerState: make(map[string][]string), } // If no pods found, return false if status.TotalPods == 0 { return false, status, fmt.Errorf("no ingestor pods found") } // Check each pod's status for _, pod := range pods.Items { podName := pod.Name status.PodStatuses[podName] = string(pod.Status.Phase) // Count running pods if pod.Status.Phase == corev1.PodRunning { status.RunningPods++ } else { status.UnhealthyPods++ } // Track container restart counts and states totalRestarts := int32(0) containerStates := make([]string, 0) // Check container statuses for _, containerStatus := range pod.Status.ContainerStatuses { totalRestarts += containerStatus.RestartCount // Determine container state state := "unknown" if containerStatus.State.Running != nil { state = fmt.Sprintf("running (started at %s)", containerStatus.State.Running.StartedAt.Format(time.RFC3339)) } else if containerStatus.State.Waiting != nil { state = fmt.Sprintf("waiting: %s - %s", containerStatus.State.Waiting.Reason, containerStatus.State.Waiting.Message) } else if containerStatus.State.Terminated != nil { state = fmt.Sprintf("terminated: %s (exit code %d) - %s", containerStatus.State.Terminated.Reason, containerStatus.State.Terminated.ExitCode, containerStatus.State.Terminated.Message) } containerStates = append(containerStates, fmt.Sprintf("%s: %s", containerStatus.Name, state)) } status.RestartCounts[podName] = totalRestarts status.ContainerState[podName] = containerStates } // All pods should be running and have no excessive restarts allPodsRunning := status.RunningPods == status.TotalPods return allPodsRunning, status, nil } // createFillerContainer returns a container spec that continuously creates 1MB files // in the specified directory and handles disk-full scenarios gracefully func createFillerContainer(mountPath string) corev1.Container { return corev1.Container{ Name: "storage-filler", Image: "mcr.microsoft.com/cbl-mariner/base/core:2.0.20250304", ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{ "/bin/bash", "-c", }, Args: []string{ `while true; do dd if=/dev/zero of="` + mountPath + `/filler-$(date +%s).dat" bs=10K count=1 2>/dev/null || true sleep 1 done`, }, VolumeMounts: []corev1.VolumeMount{ { Name: "metrics", MountPath: mountPath, }, }, } } type KustoTableSchema struct{} func (k *KustoTableSchema) TableName() string { return "Collector" } func (k *KustoTableSchema) CslColumns() []string { return []string{ "msg:string", "lvl:string", "ts:datetime", "namespace:string", "container:string", "pod:string", "host:string", } } func makeIngestorFunction() *adxmonv1.Function { return &adxmonv1.Function{ TypeMeta: metav1.TypeMeta{ APIVersion: "adx-mon.azure.com/v1", Kind: "Function", }, ObjectMeta: metav1.ObjectMeta{ Name: "ingestor", Namespace: "adx-mon", }, Spec: adxmonv1.FunctionSpec{ Database: "Logs", Body: `.create-or-alter function with (view=true, folder='views') Ingestor () { table('Ingestor') | extend msg = tostring(Body.msg), lvl = tostring(Body.lvl), ts = todatetime(Body.ts), namespace = tostring(Resource.namespace), container = tostring(Resource.container), pod = tostring(Resource.pod), host = tostring(Resource.host) | project-away Timestamp, ObservedTimestamp, TraceId, SpanId, SeverityText, SeverityNumber, Body, Resource, Attributes }`, }, } }