in pkg/testutils/ingestor/ingestor.go [112:333]
func WithCluster(ctx context.Context, k *k3s.K3sContainer) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
isolateKinds := []string{"StatefulSet", "CustomResourceDefinition"}
writeTo, err := os.MkdirTemp("", "ingestor")
if err != nil {
return fmt.Errorf("failed to create temp dir: %w", err)
}
var img string
if req.FromDockerfile.Context != "" {
// We build to "ingestor:latest", load that image into the k3s cluster
img = req.FromDockerfile.Repo + ":" + req.FromDockerfile.Tag
} else {
img = req.Image
}
req.LifecycleHooks = append(req.LifecycleHooks, testcontainers.ContainerLifecycleHooks{
PreCreates: []testcontainers.ContainerRequestHook{
func(ctx context.Context, req testcontainers.ContainerRequest) error {
if err := k.LoadImages(ctx, img); err != nil {
return fmt.Errorf("failed to load image: %w", err)
}
if err != nil {
return fmt.Errorf("failed to create temp dir: %w", err)
}
// Our quick-start builds the necessary manifests to install ingestor. We need
// to extract the actual StatefulSet that runs the ingestor instances so that we
// can customize the launch arguments. In addition, these manifests are installed
// as static pod manifests, which means they're immutable, so we install the statefulset
// separately so that we can mutate the state at runtime.
if err := testutils.ExtractManifests(writeTo, "build/k8s/ingestor.yaml", isolateKinds); err != nil {
return fmt.Errorf("failed to extract manifests: %w", err)
}
manifestsPath := filepath.Join(writeTo, "manifests.yaml")
containerPath := filepath.Join(testutils.K3sManifests, "ingestor-manifests.yaml")
if err := k.CopyFileToContainer(ctx, manifestsPath, containerPath, 0644); err != nil {
return fmt.Errorf("failed to copy file to container: %w", err)
}
return testutils.InstallCrds(ctx, k)
},
},
PostCreates: []testcontainers.ContainerHook{
func(ctx context.Context, c testcontainers.Container) error {
// Deserialize our statefulset manifest and customize it to our needs
ssPath := filepath.Join(writeTo, isolateKinds[0]+".yaml")
ssData, err := os.ReadFile(ssPath)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
var statefulSet appsv1.StatefulSet
if err := yaml.Unmarshal(ssData, &statefulSet); err != nil {
return fmt.Errorf("failed to unmarshal statefulset: %w", err)
}
// (jesthom) our container doesn't have curl, so we can't provide the kubelet
// an alternate way to ignore the self-signed cert. We'll have to disable the readiness probe
statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe = nil
// Readiness probe is https, but the cert is self-signed in our test env
// statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe = &corev1.Probe{
// InitialDelaySeconds: 5,
// PeriodSeconds: 5,
// FailureThreshold: 3,
// SuccessThreshold: 1,
// TimeoutSeconds: 1,
// ProbeHandler: corev1.ProbeHandler{
// Exec: &corev1.ExecAction{
// Command: []string{
// "/bin/sh",
// "-c",
// "curl -k https://127.0.0.1:443/readyz",
// },
// },
// },
// }
// Extract the size if this was configured
var (
mountSizeBytes int64 = 21474836480 // Default 20GB
storageDir = "/mnt/data" // Default path
hasVolumeMount = false
)
// Check if a tmpfs size was specified
if sizeStr, ok := req.Env["__TMPFS_SIZE_BYTES__"]; ok {
if size, err := strconv.ParseInt(sizeStr, 10, 64); err == nil {
mountSizeBytes = size
hasVolumeMount = true
}
}
statefulSet.Spec.Template.Spec.Containers[0].Image = img
statefulSet.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullNever
statefulSet.Spec.Template.Spec.Containers[0].Args = []string{
fmt.Sprintf("--storage-dir=%s", storageDir),
"--max-segment-age=5s",
fmt.Sprintf("--max-disk-usage=%d", mountSizeBytes),
"--max-transfer-size=10485760",
"--max-connections=1000",
"--insecure-skip-verify",
"--metrics-kusto-endpoints=Metrics=http://kustainer.default.svc.cluster.local:8080",
"--logs-kusto-endpoints=Logs=http://kustainer.default.svc.cluster.local:8080",
}
statefulSet.Spec.Template.Spec.Affinity = nil
statefulSet.Spec.Template.Spec.Tolerations = nil
// Update StatefulSet to use hostPath volume
if hasVolumeMount {
// First, find and remove the existing "metrics" volume if it exists
for i, vol := range statefulSet.Spec.Template.Spec.Volumes {
if vol.Name == "metrics" {
// Remove the existing volume by reslicing
statefulSet.Spec.Template.Spec.Volumes = append(
statefulSet.Spec.Template.Spec.Volumes[:i],
statefulSet.Spec.Template.Spec.Volumes[i+1:]...,
)
break
}
}
// Now add our tmpfs volume
statefulSet.Spec.Template.Spec.Volumes = append(statefulSet.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "metrics", // Use "metrics" to match the existing volumeMount
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
SizeLimit: &[]resource.Quantity{
resource.MustParse(fmt.Sprintf("%dMi", mountSizeBytes/1024/1024)),
}[0],
},
},
})
// Add the filler container to the pod spec
fillerContainer := createFillerContainer(storageDir)
statefulSet.Spec.Template.Spec.Containers = append(
statefulSet.Spec.Template.Spec.Containers,
fillerContainer,
)
// We're keeping the existing volumeMount since it's already pointing to /mnt/data
}
restConfig, _, err := testutils.GetKubeConfig(ctx, k)
if err != nil {
return fmt.Errorf("failed to get kube config: %w", err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create clientset: %w", err)
}
ctrlCli, err := ctrlclient.New(restConfig, ctrlclient.Options{})
if err != nil {
return fmt.Errorf("failed to create controller client: %w", err)
}
err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
ns, err := clientset.CoreV1().Namespaces().Get(ctx, statefulSet.GetNamespace(), metav1.GetOptions{})
return ns != nil && err == nil, nil
})
if err != nil {
return fmt.Errorf("failed to wait for namespace: %w", err)
}
// Check if StatefulSet already exists
existingStatefulSet := &appsv1.StatefulSet{}
err = ctrlCli.Get(ctx, types.NamespacedName{
Namespace: statefulSet.Namespace,
Name: statefulSet.Name,
}, existingStatefulSet)
// If it exists, delete it first
if err == nil {
// StatefulSet exists, delete it
if err := ctrlCli.Delete(ctx, existingStatefulSet); err != nil {
return fmt.Errorf("failed to delete existing statefulset: %w", err)
}
// Wait for deletion to complete
err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
err := ctrlCli.Get(ctx, types.NamespacedName{
Namespace: statefulSet.Namespace,
Name: statefulSet.Name,
}, &appsv1.StatefulSet{})
return apimacherrors.IsNotFound(err), nil
})
if err != nil {
return fmt.Errorf("failed to wait for statefulset deletion: %w", err)
}
}
// Create the new StatefulSet
if err := ctrlCli.Create(ctx, &statefulSet); err != nil {
return fmt.Errorf("failed to create statefulset: %w", err)
}
// create new function instance since Create will modify the passed-in object
ingestorFunction := makeIngestorFunction()
// Continue with Function creation...
if err := ctrlCli.Get(ctx, types.NamespacedName{Namespace: ingestorFunction.Namespace, Name: ingestorFunction.Name}, ingestorFunction); err != nil {
if apimacherrors.IsNotFound(err) {
if err := ctrlCli.Create(ctx, ingestorFunction); err != nil {
return fmt.Errorf("failed to create function: %w", err)
}
} else {
return fmt.Errorf("failed to get function: %w", err)
}
}
return nil
},
},
})
return nil
}
}