internal/resources/fetching/fetchers/k8s/kube_provider.go (107 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package fetchers
import (
"reflect"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/elastic/cloudbeat/internal/infra/clog"
"github.com/elastic/cloudbeat/internal/resources/fetching"
"github.com/elastic/cloudbeat/internal/resources/fetching/cycle"
)
type K8sResource struct {
log *clog.Logger
Data any
}
const (
k8sObjMetadataField = "ObjectMeta"
k8sTypeMetadataField = "TypeMeta"
K8sObjType = "k8s_object"
ecsResourceTypeField = "orchestrator.resource.type"
ecsResourceIdField = "orchestrator.resource.id"
ecsResourceNameField = "orchestrator.resource.name"
)
func getKubeData(log *clog.Logger, watchers []kubernetes.Watcher, resCh chan fetching.ResourceInfo, cycleMetadata cycle.Metadata) {
log.Debug("Starting getKubeData")
for _, watcher := range watchers {
rs := watcher.Store().List()
for _, r := range rs {
nullifyManagedFields(r)
resource, ok := r.(kubernetes.Resource)
if !ok {
log.Errorf("Bad resource: %#v does not implement kubernetes.Resource", r)
continue
}
err := addTypeInformationToKubeResource(resource)
if err != nil {
log.Errorf("Bad resource: %v", err)
continue
} // See https://github.com/kubernetes/kubernetes/issues/3030
resCh <- fetching.ResourceInfo{Resource: K8sResource{log, resource}, CycleMetadata: cycleMetadata}
}
}
}
func (r K8sResource) GetData() any {
return r.Data
}
func (r K8sResource) GetIds() []string {
return nil
}
func (r K8sResource) GetMetadata() (fetching.ResourceMetadata, error) {
k8sObj := reflect.Indirect(reflect.ValueOf(r.Data))
k8sObjMeta := getK8sObjectMeta(r.log, k8sObj)
resourceID := k8sObjMeta.UID
resourceName := k8sObjMeta.Name
return fetching.ResourceMetadata{
ID: string(resourceID),
Type: K8sObjType,
SubType: getK8sSubType(r.log, k8sObj),
Name: resourceName,
}, nil
}
func (r K8sResource) GetElasticCommonData() (map[string]any, error) {
metadata, err := r.GetMetadata()
if err != nil {
return nil, err
}
fields := map[string]any{
ecsResourceTypeField: metadata.SubType,
ecsResourceNameField: metadata.Name,
ecsResourceIdField: metadata.ID,
}
return fields, nil
}
func getK8sObjectMeta(log *clog.Logger, k8sObj reflect.Value) metav1.ObjectMeta {
metadata, ok := k8sObj.FieldByName(k8sObjMetadataField).Interface().(metav1.ObjectMeta)
if !ok {
log.Errorf("Failed to retrieve object metadata, Resource: %#v", k8sObj)
return metav1.ObjectMeta{}
}
return metadata
}
func getK8sSubType(log *clog.Logger, k8sObj reflect.Value) string {
typeMeta, ok := k8sObj.FieldByName(k8sTypeMetadataField).Interface().(metav1.TypeMeta)
if !ok {
log.Errorf("Failed to retrieve type metadata, Resource: %#v", k8sObj)
return ""
}
return typeMeta.Kind
}
// nullifyManagedFields ManagedFields field contains fields with dot that prevent from elasticsearch to index
// the events.
func nullifyManagedFields(resource any) {
switch val := resource.(type) {
case *kubernetes.Pod:
val.ManagedFields = nil
case *kubernetes.Role:
val.ManagedFields = nil
case *kubernetes.RoleBinding:
val.ManagedFields = nil
case *kubernetes.ClusterRole:
val.ManagedFields = nil
case *kubernetes.ClusterRoleBinding:
val.ManagedFields = nil
case *kubernetes.ServiceAccount:
val.ManagedFields = nil
case *kubernetes.NetworkPolicy:
val.ManagedFields = nil
case *kubernetes.Node:
val.ManagedFields = nil
}
}