kubernetes/informer.go (281 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kubernetes
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
func nodeSelector(options *metav1.ListOptions, opt WatchOptions) {
if opt.Node != "" {
options.FieldSelector = "spec.nodeName=" + opt.Node
}
}
func nameSelector(options *metav1.ListOptions, name string) {
if name != "" {
options.FieldSelector = "metadata.name=" + name
}
}
// NewInformer creates an informer for a given resource
func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (cache.SharedInformer, string, error) {
var objType string
var listwatch *cache.ListWatch
ctx := context.TODO()
switch resource.(type) {
case *Pod:
p := client.CoreV1().Pods(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
nodeSelector(&options, opts)
return p.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
nodeSelector(&options, opts)
return p.Watch(ctx, options)
},
}
objType = "pod"
case *Event:
e := client.CoreV1().Events(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return e.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return e.Watch(ctx, options)
},
}
objType = "event"
case *Node:
n := client.CoreV1().Nodes()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
nameSelector(&options, opts.Node)
return n.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
nameSelector(&options, opts.Node)
return n.Watch(ctx, options)
},
}
objType = "node"
case *Namespace:
ns := client.CoreV1().Namespaces()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
nameSelector(&options, opts.Namespace)
return ns.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
nameSelector(&options, opts.Namespace)
return ns.Watch(ctx, options)
},
}
objType = "namespace"
case *Deployment:
d := client.AppsV1().Deployments(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.Watch(ctx, options)
},
}
objType = "deployment"
case *ReplicaSet:
rs := client.AppsV1().ReplicaSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return rs.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return rs.Watch(ctx, options)
},
}
objType = "replicaset"
case *StatefulSet:
ss := client.AppsV1().StatefulSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}
objType = "statefulset"
case *DaemonSet:
ss := client.AppsV1().DaemonSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}
objType = "daemonset"
case *Service:
svc := client.CoreV1().Services(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return svc.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return svc.Watch(ctx, options)
},
}
objType = "service"
case *ServiceAccount:
sa := client.CoreV1().ServiceAccounts(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return sa.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return sa.Watch(ctx, options)
},
}
objType = "serviceAccount"
case *CronJob:
cronjob := client.BatchV1().CronJobs(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cronjob.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cronjob.Watch(ctx, options)
},
}
objType = "cronjob"
case *Job:
job := client.BatchV1().Jobs(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return job.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return job.Watch(ctx, options)
},
}
objType = "job"
case *PersistentVolume:
ss := client.CoreV1().PersistentVolumes()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}
objType = "persistentvolume"
case *PersistentVolumeClaim:
ss := client.CoreV1().PersistentVolumeClaims(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}
objType = "persistentvolumeclaim"
case *StorageClass:
sc := client.StorageV1().StorageClasses()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return sc.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return sc.Watch(ctx, options)
},
}
objType = "storageclass"
case *Role:
r := client.RbacV1().Roles(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return r.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return r.Watch(ctx, options)
},
}
objType = "role"
case *RoleBinding:
rb := client.RbacV1().RoleBindings(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return rb.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return rb.Watch(ctx, options)
},
}
objType = "rolebinding"
case *ClusterRole:
cr := client.RbacV1().ClusterRoles()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cr.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cr.Watch(ctx, options)
},
}
objType = "clusterrole"
case *ClusterRoleBinding:
crb := client.RbacV1().ClusterRoleBindings()
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return crb.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return crb.Watch(ctx, options)
},
}
objType = "clusterrolebinding"
case *NetworkPolicy:
np := client.ExtensionsV1beta1().NetworkPolicies(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return np.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return np.Watch(ctx, options)
},
}
objType = "networkpolicy"
default:
return nil, "", fmt.Errorf("unsupported resource type for watching %T", resource)
}
if indexers == nil {
indexers = cache.Indexers{}
}
return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil
}
// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata.
func NewMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer {
ctx := context.Background()
if indexers == nil {
indexers = cache.Indexers{}
}
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.Resource(gvr).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Resource(gvr).Watch(ctx, options)
},
},
&metav1.PartialObjectMetadata{},
opts.SyncTimeout,
indexers,
)
return informer
}