pkg/k8s/k8s.go (139 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8s
import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
appsinformers "k8s.io/client-go/informers/apps/v1"
informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)
func ClientOrDie(kubeconfig string) *kubernetes.Clientset {
// try in-cluster config, and then default to kubeconfig
kConfig, err := rest.InClusterConfig()
if err != nil {
// use the current context in kubeconfig
kConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
}
clientset, err := kubernetes.NewForConfig(kConfig)
if err != nil {
panic(err.Error())
}
return clientset
}
type NodeWatcher interface {
GetPods() []*v1.Pod
GetNodes() []*v1.Node
StartNodeWatches(<-chan struct{})
}
type nodeWatcher struct {
NodeInformer cache.SharedInformer
PodInformer cache.SharedInformer
}
func NewNodeWatcher(cs *kubernetes.Clientset, nodeName string) NodeWatcher {
nodeInformer := informers.NewFilteredNodeInformer(cs, 0, cache.Indexers{}, func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", nodeName).String()
})
podInformer := informers.NewFilteredPodInformer(cs, metav1.NamespaceSystem, 0, cache.Indexers{}, func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String()
})
return &nodeWatcher{
NodeInformer: nodeInformer,
PodInformer: podInformer,
}
}
func (w *nodeWatcher) GetPods() []*v1.Pod {
if !w.PodInformer.HasSynced() {
klog.V(1).Infoln("cache for pod informer has not fully synced")
return nil
}
pods := []*v1.Pod{}
for _, p := range w.PodInformer.GetStore().List() {
p := p.(*v1.Pod)
pods = append(pods, p)
}
return pods
}
func (w *nodeWatcher) GetNodes() []*v1.Node {
if !w.NodeInformer.HasSynced() {
klog.V(1).Infoln("cache for node informer has not fully synced")
return nil
}
nodes := []*v1.Node{}
for _, n := range w.NodeInformer.GetStore().List() {
n := n.(*v1.Node)
nodes = append(nodes, n)
}
return nodes
}
func (w *nodeWatcher) StartNodeWatches(stopCh <-chan struct{}) {
go w.NodeInformer.Run(stopCh)
go w.PodInformer.Run(stopCh)
}
type ClusterWatcher interface {
GetDaemonSets() []*appsv1.DaemonSet
GetDeployments() []*appsv1.Deployment
GetNodes() []*v1.Node
StartClusterWatches(<-chan struct{})
}
type clusterWatcher struct {
DaemonSetInformer cache.SharedInformer
DeploymentInformer cache.SharedInformer
NodeInformer cache.SharedInformer
}
func NewClusterWatcher(cs *kubernetes.Clientset) ClusterWatcher {
daemonSetInformer := appsinformers.NewDaemonSetInformer(cs, metav1.NamespaceSystem, 0, cache.Indexers{})
deploymentInformer := appsinformers.NewDeploymentInformer(cs, metav1.NamespaceSystem, 0, cache.Indexers{})
nodeInformer := informers.NewNodeInformer(cs, 0, cache.Indexers{})
return &clusterWatcher{
DaemonSetInformer: daemonSetInformer,
DeploymentInformer: deploymentInformer,
NodeInformer: nodeInformer,
}
}
func (w *clusterWatcher) StartClusterWatches(stopCh <-chan struct{}) {
// stop := make(chan struct{})
go w.DaemonSetInformer.Run(stopCh)
go w.DeploymentInformer.Run(stopCh)
go w.NodeInformer.Run(stopCh)
}
func (w *clusterWatcher) GetDaemonSets() []*appsv1.DaemonSet {
if !w.DaemonSetInformer.HasSynced() {
klog.V(1).Infoln("cache for Daemonsets informer has not fully synced")
return nil
}
daemonSets := []*appsv1.DaemonSet{}
for _, d := range w.DaemonSetInformer.GetStore().List() {
d := d.(*appsv1.DaemonSet)
daemonSets = append(daemonSets, d)
}
return daemonSets
}
func (w *clusterWatcher) GetDeployments() []*appsv1.Deployment {
if !w.DeploymentInformer.HasSynced() {
klog.V(1).Infoln("cache for deployment informer has not fully synced")
return nil
}
deployments := []*appsv1.Deployment{}
for _, d := range w.DeploymentInformer.GetStore().List() {
d := d.(*appsv1.Deployment)
deployments = append(deployments, d)
}
return deployments
}
func (w *clusterWatcher) GetNodes() []*v1.Node {
if !w.NodeInformer.HasSynced() {
klog.V(1).Infoln("cache for node informer has not fully synced")
return nil
}
nodes := []*v1.Node{}
for _, n := range w.NodeInformer.GetStore().List() {
n := n.(*v1.Node)
nodes = append(nodes, n)
}
return nodes
}