v2/internal/controllers/controller_resources.go (344 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
package controllers
import (
"context"
"fmt"
"reflect"
"regexp"
"strings"
"time"
. "github.com/Azure/azure-service-operator/v2/internal/logging"
"github.com/go-logr/logr"
"github.com/rotisserie/eris"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
mysqlv1 "github.com/Azure/azure-service-operator/v2/api/dbformysql/v1"
mysqlv1webhook "github.com/Azure/azure-service-operator/v2/api/dbformysql/v1/webhook"
postgresqlv1 "github.com/Azure/azure-service-operator/v2/api/dbforpostgresql/v1"
postgresqlv1webhook "github.com/Azure/azure-service-operator/v2/api/dbforpostgresql/v1/webhook"
azuresqlv1 "github.com/Azure/azure-service-operator/v2/api/sql/v1"
azuresqlv1webhook "github.com/Azure/azure-service-operator/v2/api/sql/v1/webhook"
"github.com/Azure/azure-service-operator/v2/internal/identity"
"github.com/Azure/azure-service-operator/v2/internal/reconcilers"
"github.com/Azure/azure-service-operator/v2/internal/reconcilers/arm"
azuresqlreconciler "github.com/Azure/azure-service-operator/v2/internal/reconcilers/azuresql"
"github.com/Azure/azure-service-operator/v2/internal/reconcilers/generic"
mysqlreconciler "github.com/Azure/azure-service-operator/v2/internal/reconcilers/mysql"
postgresqlreconciler "github.com/Azure/azure-service-operator/v2/internal/reconcilers/postgresql"
"github.com/Azure/azure-service-operator/v2/internal/reflecthelpers"
"github.com/Azure/azure-service-operator/v2/internal/resolver"
asocel "github.com/Azure/azure-service-operator/v2/internal/util/cel"
"github.com/Azure/azure-service-operator/v2/internal/util/kubeclient"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/conditions"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/registration"
)
type Schemer interface {
GetScheme() *runtime.Scheme
}
func GetKnownStorageTypes(
schemer Schemer,
armConnectionFactory arm.ARMConnectionFactory,
credentialProvider identity.CredentialProvider,
kubeClient kubeclient.Client,
positiveConditions *conditions.PositiveConditionBuilder,
expressionEvaluator asocel.ExpressionEvaluator,
options generic.Options,
) ([]*registration.StorageType, error) {
resourceResolver := resolver.NewResolver(kubeClient)
knownStorageTypes, err := getGeneratedStorageTypes(
schemer,
armConnectionFactory,
kubeClient,
resourceResolver,
positiveConditions,
expressionEvaluator,
options)
if err != nil {
return nil, err
}
for _, t := range knownStorageTypes {
err := augmentWithControllerName(t)
if err != nil {
return nil, err
}
augmentWithPredicate(t)
}
knownStorageTypes = append(
knownStorageTypes,
®istration.StorageType{
Obj: &mysqlv1.User{},
Name: "mysql_user",
Reconciler: mysqlreconciler.NewMySQLUserReconciler(
kubeClient,
resourceResolver,
positiveConditions,
credentialProvider,
options.Config),
Predicate: makeStandardPredicate(),
Indexes: []registration.Index{
{
Key: ".spec.localUser.password",
Func: indexMySQLUserPassword,
},
},
Watches: []registration.Watch{
{
Type: &corev1.Secret{},
MakeEventHandler: watchSecretsFactory([]string{".spec.localUser.password"}, &mysqlv1.UserList{}),
},
},
})
knownStorageTypes = append(
knownStorageTypes,
®istration.StorageType{
Obj: &postgresqlv1.User{},
Name: "postgresql_user",
Reconciler: postgresqlreconciler.NewPostgreSQLUserReconciler(
kubeClient,
resourceResolver,
positiveConditions,
options.Config),
Predicate: makeStandardPredicate(),
Indexes: []registration.Index{
{
Key: ".spec.localUser.password",
Func: indexPostgreSQLUserPassword,
},
},
Watches: []registration.Watch{
{
Type: &corev1.Secret{},
MakeEventHandler: watchSecretsFactory([]string{".spec.localUser.password"}, &postgresqlv1.UserList{}),
},
},
})
knownStorageTypes = append(
knownStorageTypes,
®istration.StorageType{
Obj: &azuresqlv1.User{},
Name: "sql_user",
Reconciler: azuresqlreconciler.NewAzureSQLUserReconciler(
kubeClient,
resourceResolver,
positiveConditions,
credentialProvider,
options.Config),
Predicate: makeStandardPredicate(),
Indexes: []registration.Index{
{
Key: ".spec.localUser.password",
Func: indexAzureSQLUserPassword,
},
},
Watches: []registration.Watch{
{
Type: &corev1.Secret{},
MakeEventHandler: watchSecretsFactory([]string{".spec.localUser.password"}, &azuresqlv1.UserList{}),
},
},
})
return knownStorageTypes, nil
}
func getGeneratedStorageTypes(
schemer Schemer,
armConnectionFactory arm.ARMConnectionFactory,
kubeClient kubeclient.Client,
resourceResolver *resolver.Resolver,
positiveConditions *conditions.PositiveConditionBuilder,
expressionEvaluator asocel.ExpressionEvaluator,
options generic.Options,
) ([]*registration.StorageType, error) {
knownStorageTypes := getKnownStorageTypes()
err := resourceResolver.IndexStorageTypes(schemer.GetScheme(), knownStorageTypes)
if err != nil {
return nil, eris.Wrap(err, "failed add storage types to resource resolver")
}
var extensions map[schema.GroupVersionKind]genruntime.ResourceExtension
extensions, err = GetResourceExtensions(schemer.GetScheme())
if err != nil {
return nil, eris.Wrap(err, "failed getting extensions")
}
for _, t := range knownStorageTypes {
// Use the provided GVK to construct a new runtime object of the desired concrete type.
var gvk schema.GroupVersionKind
gvk, err = apiutil.GVKForObject(t.Obj, schemer.GetScheme())
if err != nil {
return nil, eris.Wrapf(err, "creating GVK for obj %T", t.Obj)
}
extension := extensions[gvk]
augmentWithARMReconciler(
armConnectionFactory,
kubeClient,
resourceResolver,
positiveConditions,
expressionEvaluator,
options,
extension,
t)
}
return knownStorageTypes, nil
}
func augmentWithARMReconciler(
armConnectionFactory arm.ARMConnectionFactory,
kubeClient kubeclient.Client,
resourceResolver *resolver.Resolver,
positiveConditions *conditions.PositiveConditionBuilder,
expressionEvaluator asocel.ExpressionEvaluator,
options generic.Options,
extension genruntime.ResourceExtension,
t *registration.StorageType,
) {
t.Reconciler = arm.NewAzureDeploymentReconciler(
armConnectionFactory,
kubeClient,
resourceResolver,
positiveConditions,
expressionEvaluator,
options.Config,
extension)
}
func augmentWithPredicate(t *registration.StorageType) {
t.Predicate = makeStandardPredicate()
}
func makeStandardPredicate() predicate.Predicate {
// Note: These predicates prevent status updates from triggering a reconcile.
// to learn more look at https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/predicate#GenerationChangedPredicate
return predicate.Or(
predicate.GenerationChangedPredicate{},
reconcilers.ARMReconcilerAnnotationChangedPredicate(),
reconcilers.ARMPerResourceSecretAnnotationChangedPredicate())
}
func augmentWithControllerName(t *registration.StorageType) error {
controllerName, err := getControllerName(t.Obj)
if err != nil {
return eris.Wrapf(err, "failed to get controller name for obj %T", t.Obj)
}
t.Name = controllerName
return nil
}
var groupRegex = regexp.MustCompile(`.*/v2/api/([a-zA-Z0-9.]+)/`)
func getControllerName(obj client.Object) (string, error) {
v, err := conversion.EnforcePtr(obj)
if err != nil {
return "", eris.Wrap(err, "t.Obj was expected to be ptr but was not")
}
typ := v.Type()
pkgPath := typ.PkgPath()
matches := groupRegex.FindStringSubmatch(pkgPath)
if len(matches) == 0 {
return "", eris.Errorf("couldn't parse package path %s", pkgPath)
}
group := strings.ReplaceAll(matches[1], ".", "") // elide . for groups like network.frontdoor
name := fmt.Sprintf("%s_%s", group, strings.ToLower(typ.Name()))
return name, nil
}
func GetKnownTypes() []*registration.KnownType {
knownTypes := getKnownTypes()
knownTypes = append(
knownTypes,
®istration.KnownType{
Obj: &mysqlv1.User{},
Defaulter: &mysqlv1webhook.User_Webhook{},
Validator: &mysqlv1webhook.User_Webhook{},
})
knownTypes = append(
knownTypes,
®istration.KnownType{
Obj: &postgresqlv1.User{},
Defaulter: &postgresqlv1webhook.User_Webhook{},
Validator: &postgresqlv1webhook.User_Webhook{},
})
knownTypes = append(
knownTypes,
®istration.KnownType{
Obj: &azuresqlv1.User{},
Defaulter: &azuresqlv1webhook.User_Webhook{},
Validator: &azuresqlv1webhook.User_Webhook{},
})
return knownTypes
}
func CreateScheme() *runtime.Scheme {
scheme := createScheme()
_ = mysqlv1.AddToScheme(scheme)
_ = postgresqlv1.AddToScheme(scheme)
_ = azuresqlv1.AddToScheme(scheme)
return scheme
}
// GetResourceExtensions returns a map between resource and resource extension
func GetResourceExtensions(scheme *runtime.Scheme) (map[schema.GroupVersionKind]genruntime.ResourceExtension, error) {
extensionMapping := make(map[schema.GroupVersionKind]genruntime.ResourceExtension)
for _, extension := range getResourceExtensions() {
for _, resource := range extension.GetExtendedResources() {
// Make sure the type casting goes well, and we can extract the GVK successfully.
resourceObj, ok := resource.(runtime.Object)
if !ok {
err := eris.Errorf("unexpected resource type for resource '%s', found '%T'", resource.AzureName(), resource)
return nil, err
}
gvk, err := apiutil.GVKForObject(resourceObj, scheme)
if err != nil {
return nil, err
}
extensionMapping[gvk] = extension
}
}
return extensionMapping, nil
}
// watchSecretsFactory is used to register an EventHandlerFactory for watching secret mutations.
func watchSecretsFactory(keys []string, objList client.ObjectList) registration.EventHandlerFactory {
return func(client client.Client, log logr.Logger) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(watchEntity(client, log, keys, objList, &corev1.Secret{}))
}
}
// watchSecretsFactory is used to register an EventHandlerFactory for watching secret mutations.
func watchConfigMapsFactory(keys []string, objList client.ObjectList) registration.EventHandlerFactory {
return func(client client.Client, log logr.Logger) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(watchEntity(client, log, keys, objList, &corev1.ConfigMap{}))
}
}
// TODO: It may be possible where we construct the ctrl.Manager to limit what secrets we watch with
// TODO: this feature: https://github.com/kubernetes-sigs/controller-runtime/blob/master/designs/use-selectors-at-cache.md,
// TODO: likely scoped by a label selector?
func watchEntity(c client.Client, log logr.Logger, keys []string, objList client.ObjectList, entity client.Object) handler.MapFunc {
listType := reflect.TypeOf(objList).Elem()
return func(ctx context.Context, o client.Object) []reconcile.Request {
// Safety check that we're looking at the right kind of entity
if reflect.TypeOf(o) != reflect.TypeOf(entity) {
log.V(Status).Info(
"Unexpected watch entity type",
"namespace", o.GetNamespace(),
"name", o.GetName(),
"actual", fmt.Sprintf("%T", o),
"expected", fmt.Sprintf("%T", entity))
return nil
}
// This should be fast since the list of items it's going through should always be cached
// locally with the shared informer.
// Unfortunately we don't have a ctx we can use here, see https://github.com/kubernetes-sigs/controller-runtime/issues/1628
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var allMatches []client.Object
for _, key := range keys {
matchingResources, castOk := reflect.New(listType).Interface().(client.ObjectList)
if !castOk {
log.V(Status).Info("Somehow reflect.New() returned non client.ObjectList", "actual", fmt.Sprintf("%T", o))
continue
}
err := c.List(ctx, matchingResources, client.MatchingFields{key: o.GetName()}, client.InNamespace(o.GetNamespace()))
if err != nil {
// According to https://github.com/kubernetes/kubernetes/issues/51046, APIServer itself
// doesn't actually support this. In our envtest tests, since we use a real client that
// goes directly to APIServer, we'll never actually detect these changes
log.Error(err, "couldn't list resources using secret", "kind", "TODO", "field", key, "value", o.GetName())
continue
}
items, err := reflecthelpers.GetObjectListItems(matchingResources)
if err != nil {
log.Error(err, "failed to get list items")
continue
}
allMatches = append(allMatches, items...)
}
var result []reconcile.Request
for _, matchingResource := range allMatches {
result = append(result, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: matchingResource.GetNamespace(),
Name: matchingResource.GetName(),
},
})
}
return result
}
}