pkg/testutils/collector/collector.go (312 lines of code) (raw):

//go:build !disableDocker package collector import ( "context" "encoding/json" "fmt" "os" "path/filepath" "slices" "strings" "time" adxmonv1 "github.com/Azure/adx-mon/api/v1" "github.com/Azure/adx-mon/pkg/testutils" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/k3s" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" kwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) const ( DefaultImage = "collector" DefaultTag = "latest" ) type CollectorContainer struct { testcontainers.Container } func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*CollectorContainer, error) { var relative string for iter := range 4 { relative = strings.Repeat("../", iter) if _, err := os.Stat(filepath.Join(relative, "build/images/Dockerfile.ingestor")); err == nil { break } } var req testcontainers.ContainerRequest if img == "" { req = testcontainers.ContainerRequest{ FromDockerfile: testcontainers.FromDockerfile{ Repo: DefaultImage, Tag: DefaultTag, Context: relative, // repo base Dockerfile: "build/images/Dockerfile.collector", KeepImage: true, }, } } else { req = testcontainers.ContainerRequest{ Image: img, } } genericContainerReq := testcontainers.GenericContainerRequest{ ContainerRequest: req, } for _, opt := range opts { if err := opt.Customize(&genericContainerReq); err != nil { return nil, err } } container, err := testcontainers.GenericContainer(ctx, genericContainerReq) var c *CollectorContainer if container != nil { c = &CollectorContainer{Container: container} } if err != nil { return c, fmt.Errorf("generic container: %w", err) } return c, nil } // WithStarted will start the container when it is created. // You don't want to do this if you want to load the container into a k8s cluster. func WithStarted() testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { req.Started = true return nil } } func WithCluster(ctx context.Context, k *k3s.K3sContainer) testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { isolateKinds := []string{"DaemonSet", "ConfigMap", "Deployment"} writeTo, err := os.MkdirTemp("", "collector") if err != nil { return fmt.Errorf("failed to create temp dir: %w", err) } var img string if req.FromDockerfile.Context != "" { img = req.FromDockerfile.Repo + ":" + req.FromDockerfile.Tag } else { img = req.Image } req.LifecycleHooks = append(req.LifecycleHooks, testcontainers.ContainerLifecycleHooks{ PreCreates: []testcontainers.ContainerRequestHook{ func(ctx context.Context, req testcontainers.ContainerRequest) error { if err := k.LoadImages(ctx, img); err != nil { return fmt.Errorf("failed to load image: %w", err) } if err := testutils.ExtractManifests(writeTo, "build/k8s/collector.yaml", isolateKinds); err != nil { return fmt.Errorf("failed to extract manifests: %w", err) } manifestsPath := filepath.Join(writeTo, "manifests.yaml") containerPath := filepath.Join(testutils.K3sManifests, "collector-manifests.yaml") if err := k.CopyFileToContainer(ctx, manifestsPath, containerPath, 0644); err != nil { return fmt.Errorf("failed to copy file to container: %w", err) } return testutils.InstallCrds(ctx, k) }, }, PostCreates: []testcontainers.ContainerHook{ func(ctx context.Context, c testcontainers.Container) error { // Deserialize our statefulset manifest and customize it to our needs dsPath := filepath.Join(writeTo, "DaemonSet.yaml") dsData, err := os.ReadFile(dsPath) if err != nil { return fmt.Errorf("failed to read file: %w", err) } var daemonset appsv1.DaemonSet if err := yaml.Unmarshal(dsData, &daemonset); err != nil { return fmt.Errorf("failed to unmarshal daemonset: %w", err) } daemonset.Spec.Template.Spec.Tolerations = nil daemonset.Spec.Template.Spec.Containers[0].Image = img daemonset.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullNever daemonset.Spec.Template.Spec.Volumes = slices.DeleteFunc(daemonset.Spec.Template.Spec.Volumes, func(v corev1.Volume) bool { return v.Name == "etcmachineid" }) daemonset.Spec.Template.Spec.Containers[0].VolumeMounts = slices.DeleteFunc(daemonset.Spec.Template.Spec.Containers[0].VolumeMounts, func(v corev1.VolumeMount) bool { return v.Name == "etcmachineid" }) // Wait for our manifests to be applied restConfig, _, err := testutils.GetKubeConfig(ctx, k) if err != nil { return fmt.Errorf("failed to get kube config: %w", err) } clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { return fmt.Errorf("failed to create clientset: %w", err) } ctrlCli, err := ctrlclient.New(restConfig, ctrlclient.Options{}) if err != nil { return fmt.Errorf("failed to create controller client: %w", err) } err = kwait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { ns, err := clientset.CoreV1().Namespaces().Get(ctx, daemonset.GetNamespace(), metav1.GetOptions{}) return ns != nil && err == nil, nil }) if err != nil { return fmt.Errorf("failed to wait for namespace: %w", err) } collectorConfigMap := makeCollectorConfigMap() patchBytes, err := json.Marshal(collectorConfigMap) if err != nil { return fmt.Errorf("failed to marshal configmap: %w", err) } _, err = clientset.CoreV1().ConfigMaps(collectorConfigMap.Namespace).Patch(ctx, collectorConfigMap.Name, types.ApplyPatchType, patchBytes, metav1.PatchOptions{ FieldManager: "testcontainers", }) if err != nil { return fmt.Errorf("failed to patch configmap: %w", err) } patchBytes, err = json.Marshal(daemonset) if err != nil { return fmt.Errorf("failed to marshal daemonset: %w", err) } _, err = clientset.AppsV1().DaemonSets(daemonset.Namespace).Patch(ctx, daemonset.Name, types.ApplyPatchType, patchBytes, metav1.PatchOptions{ FieldManager: "testcontainers", }) if err != nil { return fmt.Errorf("failed to patch daemonset: %w", err) } // create new function instance since Create will modify the passed-in object collectorFunction := makeCollectorFunction() if err := ctrlCli.Get(ctx, types.NamespacedName{Namespace: collectorFunction.Namespace, Name: collectorFunction.Name}, collectorFunction); err != nil { if err := ctrlCli.Create(ctx, collectorFunction); err != nil { return fmt.Errorf("failed to create function: %w", err) } } return nil }, }, }) return nil } } type KustoTableSchema struct{} func (k *KustoTableSchema) TableName() string { return "Collector" } func (k *KustoTableSchema) CslColumns() []string { return []string{ "msg:string", "lvl:string", "ts:datetime", "namespace:string", "container:string", "pod:string", "host:string", } } func makeCollectorFunction() *adxmonv1.Function { return &adxmonv1.Function{ TypeMeta: metav1.TypeMeta{ APIVersion: "adx-mon.azure.com/v1", Kind: "Function", }, ObjectMeta: metav1.ObjectMeta{ Name: "collector", Namespace: "adx-mon", }, Spec: adxmonv1.FunctionSpec{ Database: "Logs", Body: `.create-or-alter function with (view=true, folder='views') Collector () { table('Collector') | extend msg = tostring(Body.msg), lvl = tostring(Body.lvl), ts = todatetime(Body.ts), namespace = tostring(Resource.namespace), container = tostring(Resource.container), pod = tostring(Resource.pod), host = tostring(Resource.host) | project-away Timestamp, ObservedTimestamp, TraceId, SpanId, SeverityText, SeverityNumber, Body, Resource, Attributes }`, }, } } // collectorConfigMap is a minimal config suitable for use by kustainer func makeCollectorConfigMap() *corev1.ConfigMap { return &corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ConfigMap", }, ObjectMeta: metav1.ObjectMeta{ Name: "collector-config", Namespace: "adx-mon", }, Data: map[string]string{ "config.toml": ` # We have to keep our scape targets very light due to the capacity # constraints of kustainer # Ingestor URL to send collected telemetry. endpoint = 'https://ingestor.adx-mon.svc.cluster.local' # Region is a location identifier region = '$REGION' # Skip TLS verification. insecure-skip-verify = true # Address to listen on for endpoints. listen-addr = ':8080' # Maximum number of connections to accept. max-connections = 100 # Maximum number of samples to send in a single batch. max-batch-size = 10000 # Storage directory for the WAL. storage-dir = '/mnt/data' # Regexes of metrics to drop from all sources. drop-metrics = [] keep-metrics = ['^adxmon.*'] # Disable metrics forwarding to endpoints. disable-metrics-forwarding = false # Key/value pairs of labels to add to all metrics and logs. [add-labels] host = '$(HOSTNAME)' cluster = '$CLUSTER' # Defines a prometheus scrape endpoint. [prometheus-scrape] # Database to store metrics in. database = 'Metrics' default-drop-metrics = false # Defines a static scrape target. static-scrape-target = [ # Scrape our own metrics { host-regex = '.*', url = 'http://$(HOSTNAME):3100/metrics', namespace = 'adx-mon', pod = 'collector', container = 'collector' }, ] # Scrape interval in seconds. scrape-interval = 30 # Scrape timeout in seconds. scrape-timeout = 25 # Disable metrics forwarding to endpoints. disable-metrics-forwarding = false # Regexes of metrics to keep from scraping source. keep-metrics = ['^adxmon.*', '^sample.*'] # Regexes of metrics to drop from scraping source. drop-metrics = ['^go.*', '^process.*', '^promhttp.*'] # Defines a prometheus remote write endpoint. [[prometheus-remote-write]] # Database to store metrics in. database = 'Metrics' # The path to listen on for prometheus remote write requests. Defaults to /receive. path = '/receive' # Regexes of metrics to drop. drop-metrics = [] # Disable metrics forwarding to endpoints. disable-metrics-forwarding = false # Key/value pairs of labels to add to this source. [prometheus-remote-write.add-labels] # Defines an OpenTelemetry log endpoint. [otel-log] # Attributes lifted from the Body and added to Attributes. lift-attributes = ['kusto.database', 'kusto.table'] [[host-log]] parsers = ['json'] journal-target = [ # matches are optional and are parsed like MATCHES in journalctl. # If different fields are matched, only entries matching all terms are included. # If the same fields are matched, entries matching any term are included. # + can be added between to include a disjunction of terms. # See examples under man 1 journalctl { matches = [ '_SYSTEMD_UNIT=kubelet.service' ], database = 'Logs', table = 'Kubelet' } ] kernel-target = [ { database = 'Logs', table = 'Kernel', priority = 'warning' } ] `, }, } }