func WithCluster()

in pkg/testutils/ingestor/ingestor.go [112:333]


func WithCluster(ctx context.Context, k *k3s.K3sContainer) testcontainers.CustomizeRequestOption {
	return func(req *testcontainers.GenericContainerRequest) error {
		isolateKinds := []string{"StatefulSet", "CustomResourceDefinition"}
		writeTo, err := os.MkdirTemp("", "ingestor")
		if err != nil {
			return fmt.Errorf("failed to create temp dir: %w", err)
		}

		var img string
		if req.FromDockerfile.Context != "" {
			// We build to "ingestor:latest", load that image into the k3s cluster
			img = req.FromDockerfile.Repo + ":" + req.FromDockerfile.Tag
		} else {
			img = req.Image
		}

		req.LifecycleHooks = append(req.LifecycleHooks, testcontainers.ContainerLifecycleHooks{
			PreCreates: []testcontainers.ContainerRequestHook{
				func(ctx context.Context, req testcontainers.ContainerRequest) error {

					if err := k.LoadImages(ctx, img); err != nil {
						return fmt.Errorf("failed to load image: %w", err)
					}
					if err != nil {
						return fmt.Errorf("failed to create temp dir: %w", err)
					}

					// Our quick-start builds the necessary manifests to install ingestor. We need
					// to extract the actual StatefulSet that runs the ingestor instances so that we
					// can customize the launch arguments. In addition, these manifests are installed
					// as static pod manifests, which means they're immutable, so we install the statefulset
					// separately so that we can mutate the state at runtime.
					if err := testutils.ExtractManifests(writeTo, "build/k8s/ingestor.yaml", isolateKinds); err != nil {
						return fmt.Errorf("failed to extract manifests: %w", err)
					}

					manifestsPath := filepath.Join(writeTo, "manifests.yaml")
					containerPath := filepath.Join(testutils.K3sManifests, "ingestor-manifests.yaml")
					if err := k.CopyFileToContainer(ctx, manifestsPath, containerPath, 0644); err != nil {
						return fmt.Errorf("failed to copy file to container: %w", err)
					}

					return testutils.InstallCrds(ctx, k)
				},
			},
			PostCreates: []testcontainers.ContainerHook{
				func(ctx context.Context, c testcontainers.Container) error {
					// Deserialize our statefulset manifest and customize it to our needs
					ssPath := filepath.Join(writeTo, isolateKinds[0]+".yaml")
					ssData, err := os.ReadFile(ssPath)
					if err != nil {
						return fmt.Errorf("failed to read file: %w", err)
					}

					var statefulSet appsv1.StatefulSet
					if err := yaml.Unmarshal(ssData, &statefulSet); err != nil {
						return fmt.Errorf("failed to unmarshal statefulset: %w", err)
					}

					// (jesthom) our container doesn't have curl, so we can't provide the kubelet
					// an alternate way to ignore the self-signed cert. We'll have to disable the readiness probe
					statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe = nil
					// Readiness probe is https, but the cert is self-signed in our test env
					// statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe = &corev1.Probe{
					// 	InitialDelaySeconds: 5,
					// 	PeriodSeconds:       5,
					// 	FailureThreshold:    3,
					// 	SuccessThreshold:    1,
					// 	TimeoutSeconds:      1,
					// 	ProbeHandler: corev1.ProbeHandler{
					// 		Exec: &corev1.ExecAction{
					// 			Command: []string{
					// 				"/bin/sh",
					// 				"-c",
					// 				"curl -k https://127.0.0.1:443/readyz",
					// 			},
					// 		},
					// 	},
					// }

					// Extract the size if this was configured
					var (
						mountSizeBytes int64 = 21474836480 // Default 20GB
						storageDir           = "/mnt/data" // Default path
						hasVolumeMount       = false
					)
					// Check if a tmpfs size was specified
					if sizeStr, ok := req.Env["__TMPFS_SIZE_BYTES__"]; ok {
						if size, err := strconv.ParseInt(sizeStr, 10, 64); err == nil {
							mountSizeBytes = size
							hasVolumeMount = true
						}
					}

					statefulSet.Spec.Template.Spec.Containers[0].Image = img
					statefulSet.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullNever
					statefulSet.Spec.Template.Spec.Containers[0].Args = []string{
						fmt.Sprintf("--storage-dir=%s", storageDir),
						"--max-segment-age=5s",
						fmt.Sprintf("--max-disk-usage=%d", mountSizeBytes),
						"--max-transfer-size=10485760",
						"--max-connections=1000",
						"--insecure-skip-verify",
						"--metrics-kusto-endpoints=Metrics=http://kustainer.default.svc.cluster.local:8080",
						"--logs-kusto-endpoints=Logs=http://kustainer.default.svc.cluster.local:8080",
					}
					statefulSet.Spec.Template.Spec.Affinity = nil
					statefulSet.Spec.Template.Spec.Tolerations = nil

					// Update StatefulSet to use hostPath volume
					if hasVolumeMount {
						// First, find and remove the existing "metrics" volume if it exists
						for i, vol := range statefulSet.Spec.Template.Spec.Volumes {
							if vol.Name == "metrics" {
								// Remove the existing volume by reslicing
								statefulSet.Spec.Template.Spec.Volumes = append(
									statefulSet.Spec.Template.Spec.Volumes[:i],
									statefulSet.Spec.Template.Spec.Volumes[i+1:]...,
								)
								break
							}
						}

						// Now add our tmpfs volume
						statefulSet.Spec.Template.Spec.Volumes = append(statefulSet.Spec.Template.Spec.Volumes, corev1.Volume{
							Name: "metrics", // Use "metrics" to match the existing volumeMount
							VolumeSource: corev1.VolumeSource{
								EmptyDir: &corev1.EmptyDirVolumeSource{
									Medium: corev1.StorageMediumMemory,
									SizeLimit: &[]resource.Quantity{
										resource.MustParse(fmt.Sprintf("%dMi", mountSizeBytes/1024/1024)),
									}[0],
								},
							},
						})

						// Add the filler container to the pod spec
						fillerContainer := createFillerContainer(storageDir)
						statefulSet.Spec.Template.Spec.Containers = append(
							statefulSet.Spec.Template.Spec.Containers,
							fillerContainer,
						)

						// We're keeping the existing volumeMount since it's already pointing to /mnt/data
					}

					restConfig, _, err := testutils.GetKubeConfig(ctx, k)
					if err != nil {
						return fmt.Errorf("failed to get kube config: %w", err)
					}

					clientset, err := kubernetes.NewForConfig(restConfig)
					if err != nil {
						return fmt.Errorf("failed to create clientset: %w", err)
					}

					ctrlCli, err := ctrlclient.New(restConfig, ctrlclient.Options{})
					if err != nil {
						return fmt.Errorf("failed to create controller client: %w", err)
					}

					err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
						ns, err := clientset.CoreV1().Namespaces().Get(ctx, statefulSet.GetNamespace(), metav1.GetOptions{})
						return ns != nil && err == nil, nil
					})
					if err != nil {
						return fmt.Errorf("failed to wait for namespace: %w", err)
					}

					// Check if StatefulSet already exists
					existingStatefulSet := &appsv1.StatefulSet{}
					err = ctrlCli.Get(ctx, types.NamespacedName{
						Namespace: statefulSet.Namespace,
						Name:      statefulSet.Name,
					}, existingStatefulSet)

					// If it exists, delete it first
					if err == nil {
						// StatefulSet exists, delete it
						if err := ctrlCli.Delete(ctx, existingStatefulSet); err != nil {
							return fmt.Errorf("failed to delete existing statefulset: %w", err)
						}

						// Wait for deletion to complete
						err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
							err := ctrlCli.Get(ctx, types.NamespacedName{
								Namespace: statefulSet.Namespace,
								Name:      statefulSet.Name,
							}, &appsv1.StatefulSet{})
							return apimacherrors.IsNotFound(err), nil
						})
						if err != nil {
							return fmt.Errorf("failed to wait for statefulset deletion: %w", err)
						}
					}

					// Create the new StatefulSet
					if err := ctrlCli.Create(ctx, &statefulSet); err != nil {
						return fmt.Errorf("failed to create statefulset: %w", err)
					}

					// create new function instance since Create will modify the passed-in object
					ingestorFunction := makeIngestorFunction()
					// Continue with Function creation...
					if err := ctrlCli.Get(ctx, types.NamespacedName{Namespace: ingestorFunction.Namespace, Name: ingestorFunction.Name}, ingestorFunction); err != nil {
						if apimacherrors.IsNotFound(err) {
							if err := ctrlCli.Create(ctx, ingestorFunction); err != nil {
								return fmt.Errorf("failed to create function: %w", err)
							}
						} else {
							return fmt.Errorf("failed to get function: %w", err)
						}
					}

					return nil
				},
			},
		})

		return nil
	}
}