data/data.go (198 lines of code) (raw):
// Package data provides data types for readers. All data types for readers are
// packages inside an Entry. This allows for a single channel to be used for all
// data types.
package data
import (
"errors"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)
var (
// ErrInvalidType is returned when the type is invalid.
ErrInvalidType = errors.New("invalid type")
)
//go:generate stringer -type=SourceType -linecomment
type SourceType uint8
const (
//
STUnknown SourceType = 0 // Unknown
STInformer SourceType = 1 // Informer
STWatchList SourceType = 2 // WatchList
STEtcd SourceType = 3 // Etcd
)
//go:generate stringer -type=ObjectType -linecomment
// ObjectType is the type of the object held in a type.
type ObjectType uint8
const (
// OTUnknown indicates a bug in the code.
OTUnknown ObjectType = 0 // Unknown
// OTNode indicates the data is a node.
OTNode ObjectType = 1 // Node
// OTPod indicates the data is a pod.
OTPod ObjectType = 2 // Pod
// OTNamespace indicates the data is a namespace.
OTNamespace ObjectType = 3 // Namespace
// OTPersistentVolume indicates the data is a persistent volume.
OTPersistentVolume ObjectType = 4 // PersistentVolume
// OTClusterRole indicates the data is a cluster role.
OTClusterRole ObjectType = 5 // ClusterRole
// OTClusterRoleBinding indicates the data is a cluster role binding.
OTClusterRoleBinding ObjectType = 6 // ClusterRoleBinding
// OTRole indicates the data is a role.
OTRole ObjectType = 7 // Role
// OTRoleBinding indicates the data is a role binding.
OTRoleBinding ObjectType = 8 // RoleBinding
// OTService indicates the data is a service.
OTService ObjectType = 9 // Service
// OTDeployment indicates the data is a deployment.
OTDeployment ObjectType = 10 // Deployment
// OTIngressController indicates the data is an ingress controller.
OTIngressController ObjectType = 11 // IngressController
// OTEndpoint indicates the data is an endpoint.
OTEndpoint ObjectType = 12 // Endpoint
)
//go:generate stringer -type=ChangeType -linecomment
// ChangeType is the type of change.
type ChangeType uint8
const (
// CTUnknown indicates a bug in the code.
CTUnknown ChangeType = 0 // Unknown
// CTAdd indicates the data was added.
CTAdd ChangeType = 1 // Add
// CTUpdate indicates the data was updated.
CTUpdate ChangeType = 2 // Update
// CTDelete indicates the data was deleted.
CTDelete ChangeType = 3 // Delete
// CTSnapshot indicates the data is a snapshot. A snapshot is
// when we relist the same data object in order to make sure our
// data is up to date.
CTSnapshot ChangeType = 4 // Snapshot
)
// ingestObj is a generic type for objects that can be ingested.
type ingestObj interface {
*corev1.Node | *corev1.Pod | *corev1.Namespace | *corev1.PersistentVolume |
*rbacv1.ClusterRole | *rbacv1.ClusterRoleBinding | *rbacv1.Role | *rbacv1.RoleBinding |
*corev1.Service | *appsv1.Deployment | *networkingv1.Ingress | *corev1.Endpoints
runtime.Object
GetUID() types.UID
}
// Entry is a data entry.
// This is field aligned for better performance.
type Entry struct {
// data holds the data.
data runtime.Object
// uid is the UID of the object.
uid types.UID
// SourceType is the type of the entry.
sourceType SourceType
// ChangeType is the type of change.
changeType ChangeType
// ObjectType is the type of the object.
objectType ObjectType
}
// NewEntry creates a new Entry. The underlying object must be a corev1.Node, corev1.Pod,
// corev1.PersistentVolume or corev1.Namespace.
func NewEntry(obj runtime.Object, st SourceType, ct ChangeType) (Entry, error) {
switch v := any(obj).(type) {
case *corev1.Node:
return newEntry(v, st, ct)
case *corev1.Pod:
return newEntry(v, st, ct)
case *corev1.Namespace:
return newEntry(v, st, ct)
case *corev1.PersistentVolume:
return newEntry(v, st, ct)
case *rbacv1.ClusterRole:
return newEntry(v, st, ct)
case *rbacv1.ClusterRoleBinding:
return newEntry(v, st, ct)
case *rbacv1.Role:
return newEntry(v, st, ct)
case *rbacv1.RoleBinding:
return newEntry(v, st, ct)
case *corev1.Service:
return newEntry(v, st, ct)
case *appsv1.Deployment:
return newEntry(v, st, ct)
case *networkingv1.Ingress:
return newEntry(v, st, ct)
case *corev1.Endpoints:
return newEntry(v, st, ct)
}
return Entry{}, ErrInvalidType
}
// newEntry creates a new Entry.
func newEntry[O ingestObj](obj O, st SourceType, ct ChangeType) (Entry, error) {
if obj == nil {
return Entry{}, fmt.Errorf("new object is nil")
}
var ot ObjectType
switch v := any(obj).(type) {
case *corev1.Node:
ot = OTNode
case *corev1.Pod:
ot = OTPod
case *corev1.Namespace:
ot = OTNamespace
case *corev1.PersistentVolume:
ot = OTPersistentVolume
case *rbacv1.ClusterRole:
ot = OTClusterRole
case *rbacv1.ClusterRoleBinding:
ot = OTClusterRoleBinding
case *rbacv1.Role:
ot = OTRole
case *rbacv1.RoleBinding:
ot = OTRoleBinding
case *corev1.Service:
ot = OTService
case *appsv1.Deployment:
ot = OTDeployment
case *networkingv1.Ingress:
ot = OTIngressController
case *corev1.Endpoints:
ot = OTEndpoint
default:
return Entry{}, fmt.Errorf("unknown object type(%T)", v)
}
return Entry{
data: obj,
uid: obj.GetUID(),
sourceType: st,
changeType: ct,
objectType: ot,
}, nil
}
// MustNewEntry creates a new Entry. It panics if an error occurs.
func MustNewEntry[T runtime.Object](obj T, st SourceType, ct ChangeType) Entry {
e, err := NewEntry(obj, st, ct)
if err != nil {
panic(err)
}
return e
}
// UID returns the UID of the underlying object.
func (e Entry) UID() types.UID {
return e.uid
}
// ObjectType returns the type of the underlying object to allow
// for calling the correct decoder method such as Node, Pod, Namespace, etc.
func (e Entry) ObjectType() ObjectType {
return e.objectType
}
// ChangeType returns the type of change that occurred, add, update or delete.
func (e Entry) ChangeType() ChangeType {
return e.changeType
}
// SourceType returns the data source of the entry.
func (e Entry) SourceType() SourceType {
return e.sourceType
}
// Object returns the data as a runtime.Object.
func (e Entry) Object() runtime.Object {
return e.data
}
// Node returns the data for a Node type change. An error is returned if the type is not Node.
func (e Entry) Node() (*corev1.Node, error) {
return assert[*corev1.Node](e.data)
}
// Pod returns the data a pod type change. An error is returned if the type is not Pod.
func (e Entry) Pod() (*corev1.Pod, error) {
return assert[*corev1.Pod](e.data)
}
// Namespace returns the data as a namespace type change. An error is returned if the type is not Namespace.
func (e Entry) Namespace() (*corev1.Namespace, error) {
return assert[*corev1.Namespace](e.data)
}
// PersistentVolume returns the data as a persistent volume type change. An error is returned if the type is not PersistentVolume.
func (e Entry) PersistentVolume() (*corev1.PersistentVolume, error) {
return assert[*corev1.PersistentVolume](e.data)
}
// ClusterRole returns the data as a cluster role type change. An error is returned if the type is not ClusterRole.
func (e Entry) ClusterRole() (*rbacv1.ClusterRole, error) {
return assert[*rbacv1.ClusterRole](e.data)
}
// ClusterRoleBinding returns the data as a cluster role binding type change. An error is returned if the type is not ClusterRoleBinding.
func (e Entry) ClusterRoleBinding() (*rbacv1.ClusterRoleBinding, error) {
return assert[*rbacv1.ClusterRoleBinding](e.data)
}
// Role returns the data as a role type change. An error is returned if the type is not Role.
func (e Entry) Role() (*rbacv1.Role, error) {
return assert[*rbacv1.Role](e.data)
}
// RoleBinding returns the data as a role binding type change. An error is returned if the type is not RoleBinding.
func (e Entry) RoleBinding() (*rbacv1.RoleBinding, error) {
return assert[*rbacv1.RoleBinding](e.data)
}
// Service returns the data as a service type change. An error is returned if the type is not Service.
func (e Entry) Service() (*corev1.Service, error) {
return assert[*corev1.Service](e.data)
}
// Deployment returns the data as a deployment type change. An error is returned if the type is not Deployment.
func (e Entry) Deployment() (*appsv1.Deployment, error) {
return assert[*appsv1.Deployment](e.data)
}
// IngressController returns the data as an ingress controller type change. An error is returned if the type is not IngressController.
func (e Entry) IngressController() (*networkingv1.Ingress, error) {
return assert[*networkingv1.Ingress](e.data)
}
// Endpoint returns the data as an endpoint type change. An error is returned if the type is not Endpoint.
func (e Entry) Endpoint() (*corev1.Endpoints, error) {
return assert[*corev1.Endpoints](e.data)
}
// assert asserts the type of the object to the type specfied by the AssertTo generic type.
func assert[AssertTo runtime.Object](obj runtime.Object) (AssertTo, error) {
var empty AssertTo
if obj == nil {
return empty, fmt.Errorf("object is nil")
}
v, ok := obj.(AssertTo)
if !ok {
return empty, fmt.Errorf("object is not of type %T", empty)
}
return v, nil
}