tools/gke-autopsc-controller/controllers/gateway_controller.go (234 lines of code) (raw):
/*
Copyright 2023 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"reflect"
"strings"
"github.com/go-logr/logr"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)
// ServiceReconciler reconciles a Service object
type ServiceReconciler struct {
client.Client
*PscController
Recorder record.EventRecorder
Log logr.Logger
}
const (
computeOperationStatusDone = "DONE"
computeOperationStatusRunning = "RUNNING"
computeOperationStatusPending = "PENDING"
finalizerName = "controller.autonpsc.dev/finalizer"
)
func NewPscController(project string, region string, s *compute.Service) *PscController {
return &PscController{
project: project,
region: region,
s: s,
}
}
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update
func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("gateway", req.NamespacedName)
gw := &gatewayv1beta1.Gateway{}
err := r.Get(ctx, req.NamespacedName, gw)
if err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return.
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}
if gw.Annotations["controller.autonpsc.dev/psc-serviceattachment"] == "" {
// TODO if only the annotation was removed, there might be a stale forwarding rule
logger.Info("not reconciling, no 'controller.autonpsc.dev/psc-serviceattachment' label", "gateway", gw, "forwarding-rules", gw.Annotations["networking.gke.io/forwarding-rules"])
return reconcile.Result{}, nil
}
if gw.Annotations["networking.gke.io/forwarding-rules"] == "" {
logger.Info("not reconciling, no 'networking.k8s.io/forwarding-rules' label", "gateway", gw, "forwarding-rules", gw.Annotations["networking.gke.io/forwarding-rules"])
return reconcile.Result{}, fmt.Errorf("no_forwarding_rule_yet")
}
logger.Info("reconciling", "gateway", gw, "forwarding-rules", gw.Annotations["networking.gke.io/forwarding-rules"])
service := r.getServiceAttachmentService()
name := gw.Annotations["controller.autonpsc.dev/psc-serviceattachment"]
attachment, err := service.Get(r.project, r.region, name).Context(ctx).Do()
if err != nil {
switch errr := err.(type) {
default:
logger.Error(err, "Failed retrieving ServiceAttachment")
case *googleapi.Error:
if errr.Code == 404 {
logger.Info("ServiceAttachment doesn't exist yet")
} else {
logger.Error(err, "Failed retrieving ServiceAttachment")
}
}
}
if gw.ObjectMeta.DeletionTimestamp.IsZero() {
err := handleReconciliation(ctx, req, gw, attachment, name, service, r, logger)
if err != nil {
return reconcile.Result{}, err
}
} else {
err := handleDeletion(ctx, req, attachment, service, r, name, logger, gw)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}
func handleDeletion(ctx context.Context, req ctrl.Request, attachment *compute.ServiceAttachment, service *compute.ServiceAttachmentsService, r *ServiceReconciler, name string, logger logr.Logger, gw *gatewayv1beta1.Gateway) error {
if attachment != nil {
logger.Info("Deleting Service Attachment")
lop, err := service.Delete(r.project, r.region, name).Context(ctx).Do()
if err != nil {
logger.Error(err, "Failed deleting ServiceAttachment")
return fmt.Errorf("failed_deleting")
} else {
logger.Info("Waiting for ServiceAttachment deletion")
go r.trackLopDeletion(ctx, req, gw, lop, logger)
}
}
return nil
}
func handleReconciliation(ctx context.Context, req ctrl.Request, gw *gatewayv1beta1.Gateway, attachment *compute.ServiceAttachment, name string, service *compute.ServiceAttachmentsService, r *ServiceReconciler, logger logr.Logger) error {
if attachment == nil {
lop, err := service.Insert(r.project, r.region, createServieAttachmentFromGateway(gw)).Context(ctx).Do()
if err != nil {
logger.Error(err, "Failed creating ServiceAttachment")
return fmt.Errorf("failed_creating")
} else {
logger.Info("Creating ServiceAttachment")
go r.trackLopCreation(ctx, req, gw, lop, logger)
}
} else {
change_detected := false
newAttachment := createServieAttachmentFromGateway(gw)
if attachment.TargetService != newAttachment.TargetService {
r.Recorder.Eventf(gw, "Normal", "Sync",
"Cannot change target service of ServiceAttachment, TODO implement ServiceAttachment recreation")
}
if attachment.ConnectionPreference != newAttachment.ConnectionPreference {
logger.Info("change on `ConnectionPreference` detected")
change_detected = true
}
if !NatSubnetIsEqual(attachment.NatSubnets, newAttachment.NatSubnets) {
logger.Info("change on `NatSubnets` detected", "old", attachment.NatSubnets, "new", newAttachment.NatSubnets)
change_detected = true
}
if !reflect.DeepEqual(attachment.DomainNames, newAttachment.DomainNames) {
logger.Info("change on `DomainNames` detected", "old", attachment.DomainNames, "new", newAttachment.DomainNames)
change_detected = true
}
if !reflect.DeepEqual(attachment.ConsumerAcceptLists, newAttachment.ConsumerAcceptLists) {
logger.Info("change on `ConsumerAcceptLists` detected", "old", attachment.ConsumerAcceptLists, "new", newAttachment.ConsumerAcceptLists)
change_detected = true
}
if change_detected {
newAttachment.Fingerprint = attachment.Fingerprint
newAttachment.TargetService = ""
newAttachment.Name = ""
lop, err := service.Patch(r.project, r.region, name, newAttachment).Context(ctx).Do()
if err != nil {
logger.Error(err, "Failed updating ServiceAttachment")
return fmt.Errorf("failed_updating")
} else {
logger.Info("Updating ServiceAttachment")
go r.trackLopUpdate(ctx, req, gw, lop, logger)
}
} else {
logger.Info("No Change detected, no update necessary")
}
}
return nil
}
func createServieAttachmentFromGateway(gw *gatewayv1beta1.Gateway) *compute.ServiceAttachment {
var connectionPreference = "ACCEPT_AUTOMATIC"
consumerAcceptLists, connectionPreference := extractConnectionPreferenceAndConsumerAcceptList(gw, connectionPreference)
natSubnets := convertCommaSeparatedStringToArray(gw, "controller.autonpsc.dev/psc-serviceattachment-natsubnets")
domainNames := convertCommaSeparatedStringToArray(gw, "controller.autonpsc.dev/psc-serviceattachment-domainNames")
forwardingRuleName := gw.Annotations["networking.gke.io/forwarding-rules"]
gwToCreate := compute.ServiceAttachment{
ConnectionPreference: connectionPreference,
ConsumerAcceptLists: consumerAcceptLists,
Name: gw.Annotations["controller.autonpsc.dev/psc-serviceattachment"],
//EnableProxyProtocol: false,
NatSubnets: natSubnets,
DomainNames: domainNames,
TargetService: forwardingRuleName,
}
if gw.Annotations["controller.autonpsc.dev/psc-serviceattachment-domainNames"] == "" {
gwToCreate.DomainNames = nil
}
if gw.Annotations["controller.autonpsc.dev/psc-serviceattachment-natsubnets"] == "" {
gwToCreate.NatSubnets = nil
}
if gw.Annotations["controller.autonpsc.dev/psc-serviceattachment-allowed"] == "" {
gwToCreate.ConsumerAcceptLists = nil
}
return &gwToCreate
}
func convertCommaSeparatedStringToArray(gw *gatewayv1beta1.Gateway, propertyName string) []string {
return strings.FieldsFunc(gw.Annotations[propertyName], func(c rune) bool { return c == ',' || unicode.IsSpace(c) })
}
func extractConnectionPreferenceAndConsumerAcceptList(gw *gatewayv1beta1.Gateway, connectionPreference string) ([]*compute.ServiceAttachmentConsumerProjectLimit, string) {
var consumerAcceptLists = make([]*compute.ServiceAttachmentConsumerProjectLimit, 0)
if gw.Annotations["controller.autonpsc.dev/psc-serviceattachment-allowed"] != "" {
allowedProjectIds := strings.Split(gw.Annotations["controller.autonpsc.dev/psc-serviceattachment-allowed"], ",")
for _, allowedProjectId := range allowedProjectIds {
consumerAcceptLists = append(consumerAcceptLists, &compute.ServiceAttachmentConsumerProjectLimit{
ProjectIdOrNum: strings.TrimSpace(allowedProjectId),
ConnectionLimit: 10,
})
}
connectionPreference = "ACCEPT_MANUAL"
}
return consumerAcceptLists, connectionPreference
}
func (r *ServiceReconciler) getServiceAttachmentService() *compute.ServiceAttachmentsService {
return compute.NewServiceAttachmentsService(r.s)
}
func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1beta1.Gateway{}).
Complete(r)
}
func NatSubnetIsEqual(original, new []string) bool {
if len(original) != len(new) {
return false
}
for _, val2 := range new {
contained := false
for _, val1 := range original {
if strings.Contains(val1, val2) {
contained = true
break
}
}
if !contained {
return false
}
}
return true
}
func (r *ServiceReconciler) addFinalizerToGateway(ctx context.Context, gw *gatewayv1beta1.Gateway) error {
if !controllerutil.ContainsFinalizer(gw, finalizerName) {
controllerutil.AddFinalizer(gw, finalizerName)
if err := r.Update(ctx, gw); err != nil {
return err
}
}
return nil
}
func (r *ServiceReconciler) removeFinalizerFromGateway(ctx context.Context, gw *gatewayv1beta1.Gateway) error {
if controllerutil.ContainsFinalizer(gw, finalizerName) {
controllerutil.RemoveFinalizer(gw, finalizerName)
if err := r.Update(ctx, gw); err != nil {
return err
}
}
return nil
}