pkg/controller/controller.go (210 lines of code) (raw):
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/clients"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/config"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/controller/liveness"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/controller/metrics"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/controller/sslcertificatemanager"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/controller/state"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/controller/sync"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/flags"
ingressutils "github.com/GoogleCloudPlatform/gke-managed-certs/pkg/utils/ingress"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/utils/queue"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/utils/random"
"github.com/GoogleCloudPlatform/gke-managed-certs/pkg/utils/types"
)
type params struct {
clients *clients.Clients
config *config.Config
metrics metrics.Interface
healthCheck *liveness.HealthCheck
resyncInterval time.Duration
state state.Interface
sync sync.Interface
}
type controller struct {
clients *clients.Clients
ingressQueue workqueue.RateLimitingInterface
ingressResyncQueue workqueue.RateLimitingInterface
managedCertificateQueue workqueue.RateLimitingInterface
managedCertificateResyncQueue workqueue.RateLimitingInterface
metrics metrics.Interface
healthCheck *liveness.HealthCheck
resyncInterval time.Duration
state state.Interface
sync sync.Interface
}
func NewParams(ctx context.Context, clients *clients.Clients, config *config.Config,
healthCheck *liveness.HealthCheck) *params {
metrics := metrics.New(config)
state := state.New(ctx, clients.ConfigMap)
ssl := sslcertificatemanager.New(clients.Event, metrics, clients.Ssl, state)
random := random.New(config.SslCertificateNamePrefix)
return ¶ms{
clients: clients,
config: config,
metrics: metrics,
healthCheck: healthCheck,
resyncInterval: flags.F.ResyncInterval,
state: state,
sync: sync.New(config, clients.Event, clients.Ingress,
clients.ManagedCertificate, metrics, random, ssl, state),
}
}
func New(ctx context.Context, p *params) *controller {
return &controller{
clients: p.clients,
metrics: p.metrics,
healthCheck: p.healthCheck,
ingressQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "ingressQueue"),
ingressResyncQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "ingressResyncQueue"),
managedCertificateQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "managedCertificateQueue"),
managedCertificateResyncQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "managedCertificateResyncQueue"),
resyncInterval: p.resyncInterval,
state: p.state,
sync: p.sync,
}
}
func (c *controller) Run(ctx context.Context) error {
defer runtime.HandleCrash()
defer c.ingressQueue.ShutDown()
defer c.ingressResyncQueue.ShutDown()
defer c.managedCertificateQueue.ShutDown()
defer c.managedCertificateResyncQueue.ShutDown()
defer c.healthCheck.Stop()
klog.Info("Controller.Run()")
klog.Info("Start reporting metrics")
go c.metrics.Start(flags.F.PrometheusAddress)
klog.Info("Start liveness probe health checks")
c.healthCheck.StartMonitoring()
c.clients.Run(ctx, c.ingressQueue, c.managedCertificateQueue)
klog.Info("Waiting for cache sync")
cacheCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if !cache.WaitForCacheSync(cacheCtx.Done(), c.clients.HasSynced) {
return fmt.Errorf("Timed out waiting for cache sync")
}
klog.Info("Cache synced")
go wait.Until(
func() {
c.processNext(ctx, c.ingressQueue, liveness.Undefined, c.sync.Ingress)
},
time.Second, ctx.Done())
go wait.Until(
func() {
c.processNext(ctx, c.ingressResyncQueue, liveness.IngressResyncProcess,
c.sync.Ingress)
},
time.Second, ctx.Done())
go wait.Until(
func() {
c.processNext(ctx, c.managedCertificateQueue, liveness.Undefined,
c.sync.ManagedCertificate)
},
time.Second, ctx.Done())
go wait.Until(
func() {
c.processNext(ctx, c.managedCertificateResyncQueue,
liveness.McrtResyncProcess, c.sync.ManagedCertificate)
},
time.Second, ctx.Done())
go wait.Until(func() { c.synchronizeAll(ctx) }, c.resyncInterval, ctx.Done())
go wait.Until(func() { c.reportMetrics() }, time.Minute, ctx.Done())
klog.Info("Waiting for stop signal or error")
<-ctx.Done()
klog.Info("Received stop signal, shutting down")
return nil
}
func (c *controller) synchronizeAll(ctx context.Context) {
// TODO(b/204546048): Add a metric to measure how long syncAll takes.
// loopStart := time.Now()
// metrics.UpdateLastTime(metrics.Main, loopStart)
c.healthCheck.UpdateLastActivity(liveness.SynchronizeAll, time.Now())
ingressScheduled := false
mcrtScheduled := false
if ingresses, err := c.clients.Ingress.List(); err != nil {
runtime.HandleError(err)
} else {
for _, ingress := range ingresses {
if !ingressutils.IsGKE(ingress) {
klog.Infof("Skipping non-GKE Ingress %s/%s: %v",
ingress.Namespace, ingress.Name, *ingress)
} else {
queue.Add(c.ingressResyncQueue, ingress)
ingressScheduled = true
}
}
}
if managedCertificates, err := c.clients.ManagedCertificate.List(); err != nil {
runtime.HandleError(err)
} else {
for _, managedCertificate := range managedCertificates {
queue.Add(c.managedCertificateResyncQueue, managedCertificate)
mcrtScheduled = true
}
}
for id := range c.state.List() {
queue.AddId(c.managedCertificateResyncQueue, id)
mcrtScheduled = true
}
c.healthCheck.UpdateLastSuccessSync(time.Now(), ingressScheduled, mcrtScheduled)
// TODO(b/204546048): Add a metric to measure how long syncAll takes.
// metrics.UpdateDurationFromStart(metrics.Main, loopStart)
}
func (c *controller) reportMetrics() {
managedCertificates, err := c.clients.ManagedCertificate.List()
if err != nil {
runtime.HandleError(err)
return
}
statuses := make(map[string]int, 0)
for _, mcrt := range managedCertificates {
statuses[mcrt.Status.CertificateStatus]++
}
c.metrics.ObserveManagedCertificatesStatuses(statuses)
c.metrics.ObserveIngressHighPriorityQueueLength(c.ingressQueue.Len())
c.metrics.ObserveIngressLowPriorityQueueLength(c.ingressResyncQueue.Len())
c.metrics.ObserveManagedCertificateHighPriorityQueueLength(c.managedCertificateQueue.Len())
c.metrics.ObserveManagedCertificateLowPriorityQueueLength(
c.managedCertificateResyncQueue.Len())
}
func (c *controller) processNext(ctx context.Context, queue workqueue.RateLimitingInterface,
activityName liveness.ActivityName, handle func(ctx context.Context, id types.Id) error) {
obj, shutdown := queue.Get()
if shutdown {
return
}
go func() {
defer queue.Done(obj)
key, ok := obj.(string)
if !ok {
queue.Forget(obj)
runtime.HandleError(fmt.Errorf("Expected string in queue but got %T", obj))
return
}
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
err = handle(ctx, types.NewId(namespace, name))
if err == nil {
if activityName != liveness.Undefined {
c.healthCheck.UpdateLastActivity(activityName, time.Now())
}
queue.Forget(obj)
return
}
queue.AddRateLimited(obj)
runtime.HandleError(err)
}()
}