pkg/controller/nginxingress/nginx_ingress_controller.go (518 lines of code) (raw):
package nginxingress
import (
"context"
"errors"
"fmt"
"net/url"
"strconv"
"time"
"github.com/Azure/aks-app-routing-operator/pkg/controller/keyvault"
approutingv1alpha1 "github.com/Azure/aks-app-routing-operator/api/v1alpha1"
"github.com/Azure/aks-app-routing-operator/pkg/config"
"github.com/Azure/aks-app-routing-operator/pkg/controller/controllername"
"github.com/Azure/aks-app-routing-operator/pkg/controller/metrics"
"github.com/Azure/aks-app-routing-operator/pkg/manifests"
"github.com/Azure/aks-app-routing-operator/pkg/util"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/record"
"k8s.io/utils/keymutex"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
controllerClassMaxLen = 250
)
const (
defaultMinReplicas = 2
defaultMaxReplicas = 100
)
const (
rapidTargetCPUUtilization = 55
balancedTargetCPUUtilization = 70
steadyTargetCPUUtilization = 80
defaultTargetCPUUtilization = balancedTargetCPUUtilization
)
var (
icCollisionErr = errors.New("collision on the IngressClass")
maxCollisionsErr = errors.New("max collisions reached")
)
var (
nginxIngressControllerReconcilerName = controllername.New("nginx", "ingress", "controller", "reconciler")
// collisionCountMu is used to prevent multiple nginxIngressController resources from determining their collisionCount at the same time. We use
// a hashed key mutex because collisions can only occur when the nginxIngressController resources have the same spec.ControllerNamePrefix field.
// This is the field used to key into this mutex. Using this mutex prevents a race condition of multiple nginxIngressController resources attempting
// to determine their collisionCount at the same time then both attempting to create and take ownership of the same resources.
collisionCountMu = keymutex.NewHashed(6) // 6 is the number of "buckets". It's not too big, not too small
)
// nginxIngressControllerReconciler reconciles a NginxIngressController object
type nginxIngressControllerReconciler struct {
client client.Client
conf *config.Config
events record.EventRecorder
defaultNicControllerClass string
}
// NewReconciler sets up the controller with the Manager.
func NewReconciler(conf *config.Config, mgr ctrl.Manager, defaultIngressClassControllerClass string) error {
metrics.InitControllerMetrics(nginxIngressControllerReconcilerName)
reconciler := &nginxIngressControllerReconciler{
client: mgr.GetClient(),
conf: conf,
events: mgr.GetEventRecorderFor("aks-app-routing-operator"),
defaultNicControllerClass: defaultIngressClassControllerClass,
}
if err := nginxIngressControllerReconcilerName.AddToController(
ctrl.NewControllerManagedBy(mgr).
For(&approutingv1alpha1.NginxIngressController{}).
Owns(&appsv1.Deployment{}),
mgr.GetLogger(),
).Complete(reconciler); err != nil {
return err
}
return nil
}
func (n *nginxIngressControllerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
start := time.Now()
lgr := log.FromContext(ctx, "nginxIngressController", req.NamespacedName)
ctx = log.IntoContext(ctx, lgr)
lgr.Info("reconciling NginxIngressController")
defer lgr.Info("finished reconciling resource", "latencySec", time.Since(start).String())
defer func() {
metrics.HandleControllerReconcileMetrics(nginxIngressControllerReconcilerName, res, err)
}()
var nginxIngressController approutingv1alpha1.NginxIngressController
if err := n.client.Get(ctx, req.NamespacedName, &nginxIngressController); err != nil {
if apierrors.IsNotFound(err) { // object was deleted
lgr.Info("NginxIngressController not found")
return ctrl.Result{}, nil
}
lgr.Error(err, "unable to fetch NginxIngressController")
return ctrl.Result{}, err
}
lgr = lgr.WithValues("generation", nginxIngressController.Generation)
ctx = log.IntoContext(ctx, lgr)
var managedRes []approutingv1alpha1.ManagedObjectReference = nil
var controllerDeployment *appsv1.Deployment = nil
var ingressClass *netv1.IngressClass = nil
lockKey := nginxIngressController.Spec.ControllerNamePrefix
collisionCountMu.LockKey(lockKey)
defer collisionCountMu.UnlockKey(lockKey)
lgr.Info("determining collision count")
startingCollisionCount := nginxIngressController.Status.CollisionCount
newCollisionCount, collisionCountErr := n.GetCollisionCount(ctx, &nginxIngressController)
if collisionCountErr == nil && newCollisionCount != startingCollisionCount {
nginxIngressController.Status.CollisionCount = newCollisionCount
if err := n.client.Status().Update(ctx, &nginxIngressController); err != nil {
lgr.Error(err, "unable to update collision count status")
return ctrl.Result{}, fmt.Errorf("updating status: %w", err)
}
return ctrl.Result{Requeue: true}, nil
}
defer func() { // defer is before checking err so that we can update status even if there is an error
lgr.Info("updating status")
n.updateStatus(&nginxIngressController, controllerDeployment, ingressClass, managedRes, collisionCountErr)
if statusErr := n.client.Status().Update(ctx, &nginxIngressController); statusErr != nil {
if apierrors.IsConflict(statusErr) {
lgr.Info("conflict updating status, requeuing")
res = ctrl.Result{Requeue: true}
err = nil
return
}
if err == nil {
lgr.Error(statusErr, "unable to update NginxIngressController status")
err = statusErr
}
}
}()
if collisionCountErr != nil {
if isUnreconcilableError(collisionCountErr) {
lgr.Info("unreconcilable collision count")
return ctrl.Result{RequeueAfter: time.Minute}, nil // requeue in case cx fixes the unreconcilable reason
}
lgr.Error(collisionCountErr, "unable to get determine collision count")
return ctrl.Result{}, fmt.Errorf("determining collision count: %w", collisionCountErr)
}
lgr.Info("calculating managed resources")
resources := n.ManagedResources(&nginxIngressController)
if resources == nil {
return ctrl.Result{}, fmt.Errorf("unable to get managed resources")
}
controllerDeployment = resources.Deployment
ingressClass = resources.IngressClass
lgr.Info("reconciling managed resources")
managedRes, err = n.ReconcileResource(ctx, &nginxIngressController, resources)
if err != nil {
lgr.Error(err, "unable to reconcile resource")
return ctrl.Result{}, fmt.Errorf("reconciling resource: %w", err)
}
if replicas := resources.Deployment.Spec.Replicas; replicas != nil {
lgr.Info(fmt.Sprintf("nginx deployment targets %d replicas", *replicas), "replicas", *replicas)
}
return ctrl.Result{}, nil
}
// ReconcileResource reconciles the NginxIngressController resources returning a list of managed resources.
func (n *nginxIngressControllerReconciler) ReconcileResource(ctx context.Context, nic *approutingv1alpha1.NginxIngressController, res *manifests.NginxResources) ([]approutingv1alpha1.ManagedObjectReference, error) {
if nic == nil {
return nil, errors.New("nginxIngressController cannot be nil")
}
if res == nil {
return nil, errors.New("resources cannot be nil")
}
lgr := log.FromContext(ctx)
var managedResourceRefs []approutingv1alpha1.ManagedObjectReference
for _, obj := range res.Objects() {
// TODO: upsert works fine for now but we want to set annotations exactly on the nginx service, we should use an alternative strategy for that in the future
if err := util.Upsert(ctx, n.client, obj); err != nil {
// publish an event so cx can see things like their policy is preventing us from creating a resource
n.events.Eventf(nic, corev1.EventTypeWarning, "EnsuringManagedResourcesFailed", "Failed to ensure managed resource %s %s/%s: %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetNamespace(), obj.GetName(), err.Error())
lgr.Error(err, "unable to upsert object", "name", obj.GetName(), "kind", obj.GetObjectKind().GroupVersionKind().Kind, "namespace", obj.GetNamespace())
return nil, fmt.Errorf("upserting object: %w", err)
}
if manifests.HasTopLevelLabels(obj.GetLabels()) {
managedResourceRefs = append(managedResourceRefs, approutingv1alpha1.ManagedObjectReference{
Name: obj.GetName(),
Namespace: obj.GetNamespace(),
Kind: obj.GetObjectKind().GroupVersionKind().Kind,
APIGroup: obj.GetObjectKind().GroupVersionKind().Group,
})
}
}
return managedResourceRefs, nil
}
func (n *nginxIngressControllerReconciler) ManagedResources(nic *approutingv1alpha1.NginxIngressController) *manifests.NginxResources {
if nic == nil {
return nil
}
nginxIngressCfg := ToNginxIngressConfig(nic, n.defaultNicControllerClass)
if nginxIngressCfg == nil {
return nil
}
res := manifests.GetNginxResources(n.conf, nginxIngressCfg)
owner := manifests.GetOwnerRefs(nic, true)
for _, obj := range res.Objects() {
if manifests.HasTopLevelLabels(obj.GetLabels()) {
obj.SetOwnerReferences(owner)
}
}
return res
}
func (n *nginxIngressControllerReconciler) GetCollisionCount(ctx context.Context, nic *approutingv1alpha1.NginxIngressController) (int32, error) {
lgr := log.FromContext(ctx)
// it's not this fn's job to overwrite the collision count, so we revert any changes we make
startingCollisionCount := nic.Status.CollisionCount
defer func() {
nic.Status.CollisionCount = startingCollisionCount
}()
for {
lgr = lgr.WithValues("collisionCount", nic.Status.CollisionCount)
if nic.Status.CollisionCount == approutingv1alpha1.MaxCollisions {
lgr.Info("max collisions reached")
return 0, maxCollisionsErr
}
collision, err := n.collides(ctx, nic)
if err != nil {
lgr.Error(err, "unable to determine collision")
return 0, fmt.Errorf("determining collision: %w", err)
}
if collision == collisionIngressClass {
// rare since our webhook guards against it, but it's possible
lgr.Info("ingress class collision")
return 0, icCollisionErr
}
if collision == collisionNone {
break
}
lgr.Info("reconcilable collision detected, incrementing")
nic.Status.CollisionCount++
}
return nic.Status.CollisionCount, nil
}
func (n *nginxIngressControllerReconciler) collides(ctx context.Context, nic *approutingv1alpha1.NginxIngressController) (collision, error) {
lgr := log.FromContext(ctx)
// ignore any collisions on default nic for migration story. Need to acquire ownership of old app routing resources
if IsDefaultNic(nic) {
return collisionNone, nil
}
res := n.ManagedResources(nic)
if res == nil {
return collisionNone, fmt.Errorf("getting managed objects")
}
for _, obj := range res.Objects() {
lgr := lgr.WithValues("kind", obj.GetObjectKind().GroupVersionKind().Kind, "name", obj.GetName(), "namespace", obj.GetNamespace())
// if we won't own the resource, we don't care about collisions.
// this is most commonly used for namespaces since we shouldn't own
// namespaces
if !manifests.HasTopLevelLabels(obj.GetLabels()) {
lgr.Info("skipping collision check because we don't own the resource")
continue
}
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
err := n.client.Get(ctx, client.ObjectKeyFromObject(obj), u)
if err != nil {
if apierrors.IsNotFound(err) {
continue
}
return collisionNone, fmt.Errorf("getting object: %w", err)
}
if owner := util.FindOwnerKind(u.GetOwnerReferences(), nic.Kind); owner == nic.Name {
lgr.Info("the nginxIngressController owns this resource")
continue
}
lgr.Info("collision detected")
if obj.GetObjectKind().GroupVersionKind().Kind == "IngressClass" {
lgr.Info("collision is with an IngressClass")
return collisionIngressClass, nil
}
return collisionOther, nil
}
lgr.Info("no collisions detected")
return collisionNone, nil
}
// updateStatus updates the status of the NginxIngressController resource. If a nil controller Deployment or IngressClass is passed, the status is defaulted for those fields if they are not already set.
func (n *nginxIngressControllerReconciler) updateStatus(nic *approutingv1alpha1.NginxIngressController, controllerDeployment *appsv1.Deployment, ic *netv1.IngressClass, managedResourceRefs []approutingv1alpha1.ManagedObjectReference, err error) {
n.updateStatusManagedResourceRefs(nic, managedResourceRefs)
n.updateStatusIngressClass(nic, ic)
// default conditions
if controllerDeployment == nil || controllerDeployment.CreationTimestamp.IsZero() {
n.updateStatusNilDeployment(nic)
} else {
for _, cond := range controllerDeployment.Status.Conditions {
switch cond.Type {
case appsv1.DeploymentProgressing:
n.updateStatusControllerProgressing(nic, cond)
case appsv1.DeploymentAvailable:
n.updateStatusControllerAvailable(nic, cond)
}
}
}
n.updateStatusControllerReplicas(nic, controllerDeployment)
n.updateStatusAvailable(nic)
// error checking at end to take precedence over other conditions
n.updateStatusFromError(nic, err)
}
func (n *nginxIngressControllerReconciler) updateStatusManagedResourceRefs(nic *approutingv1alpha1.NginxIngressController, managedResourceRefs []approutingv1alpha1.ManagedObjectReference) {
if managedResourceRefs == nil {
return
}
nic.Status.ManagedResourceRefs = managedResourceRefs
}
func (n *nginxIngressControllerReconciler) updateStatusIngressClass(nic *approutingv1alpha1.NginxIngressController, ic *netv1.IngressClass) {
if ic == nil || ic.CreationTimestamp.IsZero() {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeIngressClassReady,
Status: metav1.ConditionUnknown,
Reason: "IngressClassUnknown",
Message: "IngressClass is unknown",
})
} else {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeIngressClassReady,
Status: metav1.ConditionTrue,
Reason: "IngressClassIsReady",
Message: "Ingress Class is up-to-date ",
})
}
}
func (n *nginxIngressControllerReconciler) updateStatusNilDeployment(nic *approutingv1alpha1.NginxIngressController) {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeControllerAvailable,
Status: metav1.ConditionUnknown,
Reason: "ControllerDeploymentUnknown",
Message: "Controller deployment is unknown",
})
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeProgressing,
Status: metav1.ConditionUnknown,
Reason: "ControllerDeploymentUnknown",
Message: "Controller deployment is unknown",
})
}
func (n *nginxIngressControllerReconciler) updateStatusControllerReplicas(nic *approutingv1alpha1.NginxIngressController, deployment *appsv1.Deployment) {
if deployment == nil {
return
}
nic.Status.ControllerReadyReplicas = deployment.Status.ReadyReplicas
nic.Status.ControllerAvailableReplicas = deployment.Status.AvailableReplicas
nic.Status.ControllerUnavailableReplicas = deployment.Status.UnavailableReplicas
nic.Status.ControllerReplicas = deployment.Status.Replicas
}
func (n *nginxIngressControllerReconciler) updateStatusAvailable(nic *approutingv1alpha1.NginxIngressController) {
controllerAvailable := nic.GetCondition(approutingv1alpha1.ConditionTypeControllerAvailable)
icAvailable := nic.GetCondition(approutingv1alpha1.ConditionTypeIngressClassReady)
if controllerAvailable != nil && icAvailable != nil && controllerAvailable.Status == metav1.ConditionTrue && icAvailable.Status == metav1.ConditionTrue {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeAvailable,
Status: metav1.ConditionTrue,
Reason: "ControllerIsAvailable",
Message: "Controller Deployment has minimum availability and IngressClass is up-to-date",
})
} else {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "ControllerIsNotAvailable",
Message: "Controller Deployment does not have minimum availability or IngressClass is not up-to-date",
})
}
}
func (n *nginxIngressControllerReconciler) updateStatusFromError(nic *approutingv1alpha1.NginxIngressController, err error) {
if errors.Is(err, icCollisionErr) {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeProgressing,
Status: metav1.ConditionFalse,
Reason: "IngressClassCollision",
Message: "IngressClass already exists and is not owned by this controller",
})
n.events.Event(nic, corev1.EventTypeWarning, "IngressClassCollision", "IngressClass already exists and is not owned by this controller. Change the spec.IngressClassName to an unused IngressClass name in a new NginxIngressController.")
}
if errors.Is(err, maxCollisionsErr) {
nic.SetCondition(metav1.Condition{
Type: approutingv1alpha1.ConditionTypeProgressing,
Status: metav1.ConditionFalse,
Reason: "TooManyCollisions",
Message: "Too many collisions with existing resources",
})
n.events.Event(nic, corev1.EventTypeWarning, "TooManyCollisions", "Too many collisions with existing resources. Change the spec.ControllerNamePrefix to something more unique in a new NginxIngressController.")
}
}
func (n *nginxIngressControllerReconciler) updateStatusControllerAvailable(nic *approutingv1alpha1.NginxIngressController, availableCondition appsv1.DeploymentCondition) {
if availableCondition.Type != appsv1.DeploymentAvailable {
return
}
var cond metav1.Condition
switch availableCondition.Status {
case corev1.ConditionTrue:
cond = metav1.Condition{
Type: approutingv1alpha1.ConditionTypeControllerAvailable,
Status: metav1.ConditionTrue,
Reason: "ControllerDeploymentAvailable",
Message: "Controller Deployment is available",
}
case corev1.ConditionFalse:
cond = metav1.Condition{
Type: approutingv1alpha1.ConditionTypeControllerAvailable,
Status: metav1.ConditionFalse,
Reason: "ControllerDeploymentNotAvailable",
Message: "Controller Deployment is not available",
}
default:
cond = metav1.Condition{
Type: approutingv1alpha1.ConditionTypeControllerAvailable,
Status: metav1.ConditionUnknown,
Reason: "ControllerDeploymentUnknown",
Message: "Controller Deployment is in an unknown state",
}
}
nic.SetCondition(cond)
}
func (n *nginxIngressControllerReconciler) updateStatusControllerProgressing(nic *approutingv1alpha1.NginxIngressController, progressingCondition appsv1.DeploymentCondition) {
if progressingCondition.Type != appsv1.DeploymentProgressing {
return
}
var cond metav1.Condition
switch progressingCondition.Status {
case corev1.ConditionTrue:
cond = metav1.Condition{
Type: approutingv1alpha1.ConditionTypeProgressing,
Status: metav1.ConditionTrue,
Reason: "ControllerDeploymentProgressing",
Message: "Controller Deployment has successfully progressed",
}
case corev1.ConditionFalse:
cond = metav1.Condition{
Type: approutingv1alpha1.ConditionTypeProgressing,
Status: metav1.ConditionFalse,
Reason: "ControllerDeploymentNotProgressing",
Message: "Controller Deployment has failed to progress",
}
default:
cond = metav1.Condition{
Type: approutingv1alpha1.ConditionTypeProgressing,
Status: metav1.ConditionUnknown,
Reason: "ControllerDeploymentProgressingUnknown",
Message: "Controller Deployment progress is unknown",
}
}
nic.SetCondition(cond)
}
func isUnreconcilableError(err error) bool {
return errors.Is(err, icCollisionErr) || errors.Is(err, maxCollisionsErr)
}
func ToNginxIngressConfig(nic *approutingv1alpha1.NginxIngressController, defaultNicControllerClass string) *manifests.NginxIngressConfig {
if nic == nil {
return nil
}
cc := "approuting.kubernetes.azure.com/" + url.PathEscape(nic.Name)
if len(cc) > controllerClassMaxLen {
cc = cc[:controllerClassMaxLen]
}
resourceName := fmt.Sprintf("%s-%d", nic.Spec.ControllerNamePrefix, nic.Status.CollisionCount)
if IsDefaultNic(nic) {
cc = defaultNicControllerClass
resourceName = DefaultNicResourceName
}
scaling := nic.Spec.Scaling
var minReplicas int32 = defaultMinReplicas
if scaling != nil && scaling.MinReplicas != nil {
minReplicas = *scaling.MinReplicas
}
var maxReplicas int32 = defaultMaxReplicas
if scaling != nil && scaling.MaxReplicas != nil {
maxReplicas = *scaling.MaxReplicas
}
// we use CEL validation on crd to enforce min <= max if it's defined. There's an edge case where they define max to 1 but don't define min which defaults to 2. The opposite is true too
if minReplicas > maxReplicas {
if scaling == nil || scaling.MinReplicas == nil {
minReplicas = maxReplicas
} else {
maxReplicas = minReplicas
}
}
nginxIng := &manifests.NginxIngressConfig{
ControllerClass: cc,
ResourceName: resourceName,
IcName: nic.Spec.IngressClassName,
ServiceConfig: &manifests.ServiceConfig{
Annotations: nic.Spec.LoadBalancerAnnotations,
},
HTTPDisabled: nic.Spec.HTTPDisabled,
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
TargetCPUUtilizationPercentage: getTargetCPUUtilizationPercentage(nic),
}
if cert := nic.Spec.DefaultSSLCertificate; cert != nil {
if cert.Secret != nil && cert.Secret.Name != "" && cert.Secret.Namespace != "" {
nginxIng.DefaultSSLCertificate = cert.Secret.Namespace + "/" + cert.Secret.Name
}
if cert.Secret == nil && cert.KeyVaultURI != nil {
nginxIng.DefaultSSLCertificate = config.DefaultNs + "/" + keyvault.DefaultNginxCertName(nic)
}
if cert.ForceSSLRedirect {
nginxIng.ForceSSLRedirect = true
}
}
if nic.Spec.DefaultBackendService != nil && nic.Spec.DefaultBackendService.Name != "" && nic.Spec.DefaultBackendService.Namespace != "" {
nginxIng.DefaultBackendService = nic.Spec.DefaultBackendService.Namespace + "/" + nic.Spec.DefaultBackendService.Name
}
if len(nic.Spec.CustomHTTPErrors) != 0 {
errStr := ""
for i, errCode := range nic.Spec.CustomHTTPErrors {
errStr += strconv.Itoa(int(errCode))
if i+1 < len(nic.Spec.CustomHTTPErrors) {
errStr += ","
}
}
nginxIng.CustomHTTPErrors = errStr
}
return nginxIng
}
func getTargetCPUUtilizationPercentage(nic *approutingv1alpha1.NginxIngressController) int32 {
if nic == nil {
return defaultTargetCPUUtilization
}
scaling := nic.Spec.Scaling
if scaling == nil {
return defaultTargetCPUUtilization
}
thresh := scaling.Threshold
if thresh == nil {
return defaultTargetCPUUtilization
}
switch *thresh {
case approutingv1alpha1.RapidThreshold:
return rapidTargetCPUUtilization
case approutingv1alpha1.BalancedThreshold:
return balancedTargetCPUUtilization
case approutingv1alpha1.SteadyThreshold:
return steadyTargetCPUUtilization
default:
return defaultTargetCPUUtilization
}
}