pkg/controller/controller.go (135 lines of code) (raw):
// -------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// --------------------------------------------------------------------------------------------
package controller
import (
"context"
"strconv"
"time"
"github.com/Azure/go-autorest/autorest/to"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/azure"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/environment"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/metricstore"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/worker"
)
// AppGwIngressController configures the application gateway based on the ingress rules defined.
type AppGwIngressController struct {
azClient azure.AzClient
appGwIdentifier appgw.Identifier
ipAddressMap map[string]k8scontext.IPAddress
cniReconciler CniReconciler
k8sContext *k8scontext.Context
worker *worker.Worker
hostedOnUnderlay bool
configCache *[]byte
recorder record.EventRecorder
agicPod *v1.Pod
MetricStore metricstore.MetricStore
stopChannel chan struct{}
}
type CniReconciler interface {
Reconcile(ctx context.Context) error
}
// NewAppGwIngressController constructs a controller object.
func NewAppGwIngressController(azClient azure.AzClient, appGwIdentifier appgw.Identifier, k8sContext *k8scontext.Context, recorder record.EventRecorder, metricStore metricstore.MetricStore, cniReconciler CniReconciler, agicPod *v1.Pod, hostedOnUnderlay bool) *AppGwIngressController {
controller := &AppGwIngressController{
azClient: azClient,
appGwIdentifier: appGwIdentifier,
k8sContext: k8sContext,
recorder: recorder,
cniReconciler: cniReconciler,
configCache: to.ByteSlicePtr([]byte{}),
ipAddressMap: map[string]k8scontext.IPAddress{},
stopChannel: make(chan struct{}),
agicPod: agicPod,
MetricStore: metricStore,
hostedOnUnderlay: hostedOnUnderlay,
}
controller.worker = &worker.Worker{
EventProcessor: controller,
}
return controller
}
// Start function runs the k8scontext and continues to listen to the
// event channel and enqueue events before stopChannel is closed
func (c *AppGwIngressController) Start(envVariables environment.EnvVariables) error {
// Starts k8scontext which contains all the informers
// This will start individual go routines for informers
if err := c.k8sContext.Run(c.stopChannel, false, envVariables); err != nil {
klog.Error("Could not start Kubernetes Context: ", err)
return err
}
// initilize reconcilerTickerTask
if envVariables.ReconcilePeriodSeconds != "" {
go reconcilerTickerTask(c.k8sContext.Work, c.stopChannel, envVariables.ReconcilePeriodSeconds)
}
// Starts Worker processing events from k8sContext
go c.worker.Run(c.k8sContext.Work, c.stopChannel)
return nil
}
// Stop function terminates the k8scontext and signal the stopchannel
func (c *AppGwIngressController) Stop() {
c.MetricStore.Stop()
close(c.stopChannel)
}
// Liveness fulfills the health.HealthProbe interface; It is evaluated when K8s liveness-checks the AGIC pod.
func (c *AppGwIngressController) Liveness() bool {
// TODO(draychev): implement
return true
}
// Readiness fulfills the health.HealthProbe interface; It is evaluated when K8s readiness-checks the AGIC pod.
func (c *AppGwIngressController) Readiness() bool {
if !c.hostedOnUnderlay {
// When the channel is CLOSED we have synced cache and are READY!
_, isOpen := <-c.k8sContext.CacheSynced
return !isOpen
}
return true
}
// ProcessEvent is the handler for K8 cluster events which are listened by informers.
func (c *AppGwIngressController) ProcessEvent(event events.Event) error {
processEventStart := time.Now()
err := c.cniReconciler.Reconcile(context.Background())
if err != nil {
// Not treated as fatal errors, but we log them and emit a warning event.
if c.agicPod != nil {
c.recorder.Event(c.agicPod, v1.EventTypeWarning, events.ReasonFailedCNIConfiguration, err.Error())
}
klog.Warning(err)
}
appGw, cbCtx, err := c.GetAppGw()
if err != nil {
klog.Error("Error Retrieving AppGw for k8s event. ", err)
return err
}
// Reset all ingress Ips and ignore mutating appgw if gateway is in stopped state
if !c.isApplicationGatewayMutable(appGw) {
klog.Info("Reset all ingress ip")
c.ResetAllIngress(appGw, cbCtx)
klog.Info("Ignore mutating App Gateway as it is not mutable")
return nil
}
if err := c.MutateAllIngress(appGw, cbCtx); err != nil {
klog.Error("Error mutating AKS from k8s event. ", err)
}
if err := c.MutateAppGateway(event, appGw, cbCtx); err != nil {
klogIt := klog.Errorf
if cbCtx.EnvVariables.EnablePanicOnPutError {
klogIt = klog.Fatalf
}
if c.agicPod != nil {
c.recorder.Event(c.agicPod, v1.EventTypeWarning, events.ReasonFailedApplyingAppGwConfig, err.Error())
}
klogIt(err.Error())
c.MetricStore.IncArmAPIUpdateCallFailureCounter()
return err
}
c.MetricStore.IncArmAPIUpdateCallSuccessCounter()
duration := time.Since(processEventStart)
c.MetricStore.SetUpdateLatencySec(duration)
// We keep this at log level 1 to show some heartbeat in the logs. Without this it is way too quiet.
klog.V(1).Infof("Completed last event loop run in: %+v", duration)
return nil
}
func reconcilerTickerTask(work chan events.Event, stopChannel chan struct{}, reconcilePeriodSecondsStr string) {
klog.V(3).Info("Reconciler Ticker task started with period: ", reconcilePeriodSecondsStr)
reconcilePeriodSeconds, _ := strconv.Atoi(reconcilePeriodSecondsStr)
reconcileTicker := time.NewTicker(time.Duration(reconcilePeriodSeconds) * time.Second)
for {
select {
case tickedTime := <-reconcileTicker.C:
klog.V(9).Info("Reconciling ticker ticked at ", tickedTime)
work <- events.Event{
Type: events.PeriodicReconcile,
}
case <-stopChannel:
return
}
}
}