pkg/controllers/gateway_controller.go (370 lines of code) (raw):
/*
Copyright 2021.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
"github.com/aws/aws-application-networking-k8s/pkg/controllers/eventhandlers"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
deploy "github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
const (
gatewayFinalizer = "gateway.k8s.aws/resources"
defaultNamespace = "default"
)
type gatewayReconciler struct {
log gwlog.Logger
client client.Client
scheme *runtime.Scheme
finalizerManager k8s.FinalizerManager
eventRecorder record.EventRecorder
cloud aws.Cloud
}
func RegisterGatewayController(
log gwlog.Logger,
cloud aws.Cloud,
finalizerManager k8s.FinalizerManager,
mgr ctrl.Manager,
) error {
mgrClient := mgr.GetClient()
scheme := mgr.GetScheme()
evtRec := mgr.GetEventRecorderFor("gateway")
r := &gatewayReconciler{
log: log,
client: mgrClient,
scheme: scheme,
finalizerManager: finalizerManager,
eventRecorder: evtRec,
cloud: cloud,
}
if config.DefaultServiceNetwork != "" {
// Attempt creation of default service network, move gracefully even if it fails.
snManager := deploy.NewDefaultServiceNetworkManager(log, cloud)
_, err := snManager.CreateOrUpdate(context.Background(), &model.ServiceNetwork{
Spec: model.ServiceNetworkSpec{
Name: config.DefaultServiceNetwork,
},
})
if err != nil {
log.Infof(context.TODO(), "Could not setup default service network %s, proceeding without it - %s",
config.DefaultServiceNetwork, err.Error())
}
}
gwClassEventHandler := eventhandlers.NewEnqueueRequestsForGatewayClassEvent(log, mgrClient)
vpcAssociationPolicyEventHandler := eventhandlers.NewVpcAssociationPolicyEventHandler(log, mgrClient)
builder := ctrl.NewControllerManagedBy(mgr).
For(&gwv1.Gateway{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))
builder.Watches(&gwv1.GatewayClass{}, gwClassEventHandler)
//Watch VpcAssociationPolicy CRD if it is installed
ok, err := k8s.IsGVKSupported(mgr, anv1alpha1.GroupVersion.String(), anv1alpha1.VpcAssociationPolicyKind)
if err != nil {
return err
}
if ok {
builder.Watches(&anv1alpha1.VpcAssociationPolicy{}, vpcAssociationPolicyEventHandler.MapToGateway())
} else {
log.Infof(context.TODO(), "VpcAssociationPolicy CRD is not installed, skipping watch")
}
return builder.Complete(r)
}
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/finalizers,verbs=update
func (r *gatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx = gwlog.StartReconcileTrace(ctx, r.log, "gateway", req.Name, req.Namespace)
defer func() {
gwlog.EndReconcileTrace(ctx, r.log)
}()
recErr := r.reconcile(ctx, req)
if recErr != nil {
r.log.Infow(ctx, "reconcile error", "name", req.Name, "message", recErr.Error())
}
res, retryErr := lattice_runtime.HandleReconcileError(recErr)
if res.RequeueAfter != 0 {
r.log.Infow(ctx, "requeue request", "name", req.Name, "requeueAfter", res.RequeueAfter)
} else if res.Requeue {
r.log.Infow(ctx, "requeue request", "name", req.Name)
} else if retryErr == nil {
r.log.Infow(ctx, "reconciled", "name", req.Name)
}
return res, retryErr
}
func (r *gatewayReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
gw := &gwv1.Gateway{}
if err := r.client.Get(ctx, req.NamespacedName, gw); err != nil {
return client.IgnoreNotFound(err)
}
if !k8s.IsControlledByLatticeGatewayController(ctx, r.client, gw) {
r.log.Infow(ctx, "Gateway is not controlled by AWS Gateway API Controller", "name", req.Name)
return nil
}
if !gw.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, gw)
} else {
return r.reconcileUpsert(ctx, gw)
}
}
func (r *gatewayReconciler) reconcileDelete(ctx context.Context, gw *gwv1.Gateway) error {
routes, err := core.ListAllRoutes(ctx, r.client)
if err != nil {
return err
}
for _, route := range routes {
parents, err := k8s.FindControlledParents(ctx, r.client, route)
if len(parents) > 0 {
gw := parents[0]
return fmt.Errorf("cannot delete gateway %s/%s - found referencing route %s/%s",
gw.Namespace, gw.Name, route.Namespace(), route.Name())
}
if err != nil {
continue
}
}
err = r.finalizerManager.RemoveFinalizers(ctx, gw, gatewayFinalizer)
if err != nil {
return err
}
return nil
}
func (r *gatewayReconciler) reconcileUpsert(ctx context.Context, gw *gwv1.Gateway) error {
if err := r.finalizerManager.AddFinalizers(ctx, gw, gatewayFinalizer); err != nil {
r.eventRecorder.Event(gw, corev1.EventTypeWarning,
k8s.GatewayEventReasonFailedAddFinalizer, fmt.Sprintf("failed add finalizer: %s", err))
return err
}
err := UpdateGWListenerStatus(ctx, r.client, gw)
if err != nil {
err2 := r.updateGatewayAcceptStatus(ctx, gw, false)
if err2 != nil {
return errors.Wrap(err2, err.Error())
}
}
err = r.updateGatewayAcceptStatus(ctx, gw, true)
if err != nil {
return err
}
snInfo, err := r.cloud.Lattice().FindServiceNetwork(ctx, gw.Name)
if err != nil {
if services.IsNotFoundError(err) {
if err = r.updateGatewayProgrammedStatus(ctx, gw, gwv1.GatewayReasonPending, "VPC Lattice Service Network not found"); err != nil {
return lattice_runtime.NewRetryError()
}
return nil
}
if errors.Is(err, services.ErrNameConflict) {
if err = r.updateGatewayProgrammedStatus(ctx, gw, gwv1.GatewayReasonInvalid, "Found multiple VPC Lattice Service Networks matching Gateway name. Either ensure only one Service Network has a matching name, or use the Service Network's id as the Gateway name."); err != nil {
return lattice_runtime.NewRetryError()
}
return nil
}
return err
}
err = r.updateGatewayProgrammedStatus(ctx, gw, gwv1.GatewayReasonProgrammed, fmt.Sprintf("aws-service-network-arn: %s", *snInfo.SvcNetwork.Arn))
if err != nil {
return err
}
return nil
}
func (r *gatewayReconciler) updateGatewayProgrammedStatus(
ctx context.Context,
gw *gwv1.Gateway,
reason gwv1.GatewayConditionReason,
message string,
) error {
gwOld := gw.DeepCopy()
if reason == gwv1.GatewayReasonProgrammed {
gw.Status.Conditions = utils.GetNewConditions(gw.Status.Conditions, metav1.Condition{
Type: string(gwv1.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
ObservedGeneration: gw.Generation,
Reason: string(gwv1.GatewayReasonProgrammed),
Message: message,
})
} else {
gw.Status.Conditions = utils.GetNewConditions(gw.Status.Conditions, metav1.Condition{
Type: string(gwv1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
ObservedGeneration: gw.Generation,
Reason: string(reason),
Message: message,
})
}
if err := r.client.Status().Patch(ctx, gw, client.MergeFrom(gwOld)); err != nil {
return fmt.Errorf("update gw status error, gw: %s, err: %w", gw.Name, err)
}
return nil
}
func (r *gatewayReconciler) updateGatewayAcceptStatus(ctx context.Context, gw *gwv1.Gateway, accepted bool) error {
gwOld := gw.DeepCopy()
var cond metav1.Condition
if accepted {
cond = metav1.Condition{
Type: string(gwv1.GatewayConditionAccepted),
ObservedGeneration: gw.Generation,
Message: config.LatticeGatewayControllerName,
Status: metav1.ConditionTrue,
Reason: string(gwv1.GatewayReasonAccepted),
}
} else {
cond = metav1.Condition{
Type: string(gwv1.GatewayConditionAccepted),
ObservedGeneration: gw.Generation,
Message: config.LatticeGatewayControllerName,
Status: metav1.ConditionFalse,
Reason: string(gwv1.GatewayReasonInvalid),
}
}
gw.Status.Conditions = utils.GetNewConditions(gw.Status.Conditions, cond)
if err := r.client.Status().Patch(ctx, gw, client.MergeFrom(gwOld)); err != nil {
return fmt.Errorf("update gateway status error, gw: %s, accepted: %t, err: %w", gw.Name, accepted, err)
}
return nil
}
func UpdateGWListenerStatus(ctx context.Context, k8sClient client.Client, gw *gwv1.Gateway) error {
hasValidListener := false
gwOld := gw.DeepCopy()
routes, err := core.ListAllRoutes(ctx, k8sClient)
if err != nil {
return err
}
// Add one of lattice domains as GW address. This is supposed to be a single ingress endpoint (or a single pool of them)
// but we have different endpoints for each service. This can represent incorrect value in some cases (e.g. cross-account)
// Due to size limit, we cannot put all service addresses here.
if len(routes) > 0 {
gw.Status.Addresses = []gwv1.GatewayStatusAddress{}
addressType := gwv1.HostnameAddressType
for _, route := range routes {
if route.DeletionTimestamp().IsZero() && len(route.K8sObject().GetAnnotations()) > 0 {
if domain, exists := route.K8sObject().GetAnnotations()[LatticeAssignedDomainName]; exists {
gw.Status.Addresses = append(gw.Status.Addresses, gwv1.GatewayStatusAddress{
Type: &addressType,
Value: domain,
})
break
}
}
}
}
if len(gw.Spec.Listeners) == 0 {
return fmt.Errorf("failed to find gateway listener")
}
defaultListener := gw.Spec.Listeners[0]
// go through each section of gw
for _, listener := range gw.Spec.Listeners {
listenerStatus := gwv1.ListenerStatus{
Name: listener.Name,
}
// mark listenerStatus's condition
listenerStatus.Conditions = make([]metav1.Condition, 0)
//Check if RouteGroupKind in listener spec is supported
validListener, supportedKinds := listenerRouteGroupKindSupported(listener)
if !validListener {
condition := metav1.Condition{
Type: string(gwv1.ListenerConditionResolvedRefs),
Status: metav1.ConditionFalse,
Reason: string(gwv1.ListenerReasonInvalidRouteKinds),
ObservedGeneration: gw.Generation,
LastTransitionTime: metav1.Now(),
}
listenerStatus.SupportedKinds = supportedKinds
listenerStatus.Conditions = append(listenerStatus.Conditions, condition)
} else {
hasValidListener = true
condition := metav1.Condition{
Type: string(gwv1.ListenerConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gwv1.ListenerReasonAccepted),
ObservedGeneration: gw.Generation,
LastTransitionTime: metav1.Now(),
}
for _, route := range routes {
if !route.DeletionTimestamp().IsZero() {
// Ignore the deleted route
continue
}
for _, parentRef := range route.Spec().ParentRefs() {
if parentRef.Name != gwv1.ObjectName(gw.Name) {
continue
}
if parentRef.Namespace != nil &&
*parentRef.Namespace != gwv1.Namespace(gw.Namespace) {
continue
}
var sectionName string
if parentRef.SectionName == nil {
sectionName = string(defaultListener.Name)
} else {
sectionName = string(*parentRef.SectionName)
}
if sectionName != string(listener.Name) {
continue
}
if parentRef.Port != nil && *parentRef.Port != listener.Port {
continue
}
listenerStatus.AttachedRoutes++
}
}
if listener.Protocol == gwv1.HTTPSProtocolType {
listenerStatus.SupportedKinds = append(listenerStatus.SupportedKinds, gwv1.RouteGroupKind{
Kind: "GRPCRoute",
})
}
listenerStatus.SupportedKinds = append(listenerStatus.SupportedKinds, gwv1.RouteGroupKind{
Kind: "HTTPRoute",
})
listenerStatus.Conditions = append(listenerStatus.Conditions, condition)
}
found := false
for i, oldStatus := range gw.Status.Listeners {
if oldStatus.Name == listenerStatus.Name {
gw.Status.Listeners[i].AttachedRoutes = listenerStatus.AttachedRoutes
gw.Status.Listeners[i].SupportedKinds = listenerStatus.SupportedKinds
// Only have one condition in the logic
gw.Status.Listeners[i].Conditions = utils.GetNewConditions(gw.Status.Listeners[i].Conditions, listenerStatus.Conditions[0])
found = true
}
}
if !found {
gw.Status.Listeners = append(gw.Status.Listeners, listenerStatus)
}
}
if err := k8sClient.Status().Patch(ctx, gw, client.MergeFrom(gwOld)); err != nil {
return errors.Wrapf(err, "listener update failed")
}
if hasValidListener {
return nil
} else {
return fmt.Errorf("no valid listeners for %s", gw.Name)
}
}
func listenerRouteGroupKindSupported(listener gwv1.Listener) (bool, []gwv1.RouteGroupKind) {
validRoute := true
supportedKinds := make([]gwv1.RouteGroupKind, 0)
for _, routeGroupKind := range listener.AllowedRoutes.Kinds {
if routeGroupKind.Kind == "HTTPRoute" {
supportedKinds = append(supportedKinds, gwv1.RouteGroupKind{
Kind: "HTTPRoute",
})
} else if routeGroupKind.Kind == "GRPCRoute" {
if listener.Protocol == gwv1.HTTPSProtocolType {
supportedKinds = append(supportedKinds, gwv1.RouteGroupKind{
Kind: "GRPCRoute",
})
} else {
validRoute = false
}
} else {
validRoute = false
}
}
if validRoute {
return true, supportedKinds
} else {
return false, supportedKinds
}
}