controllers/daemon/podendpoint_controller.go (348 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT license. package daemon import ( "context" "fmt" "net" "os" "strings" "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" egressgatewayv1alpha1 "github.com/Azure/kube-egress-gateway/api/v1alpha1" "github.com/Azure/kube-egress-gateway/pkg/consts" "github.com/Azure/kube-egress-gateway/pkg/netlinkwrapper" "github.com/Azure/kube-egress-gateway/pkg/netnswrapper" "github.com/Azure/kube-egress-gateway/pkg/wgctrlwrapper" ) var _ reconcile.Reconciler = &PodEndpointReconciler{} // PodEndpointReconciler reconciles gateway node network according to a PodEndpoint object type PodEndpointReconciler struct { client.Client TickerEvents chan event.GenericEvent Netlink netlinkwrapper.Interface NetNS netnswrapper.Interface WgCtrl wgctrlwrapper.Interface } //+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=podendpoints,verbs=get;list;watch; //+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=podendpoints/status,verbs=get;update;patch //+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch //+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=staticgatewayconfigurations,verbs=get;list;watch //+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=gatewaystatuses,verbs=get;list;watch;create;update;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by // the StaticGatewayConfiguration object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *PodEndpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := log.FromContext(ctx) // Got an event from cleanup ticker if req.NamespacedName.Namespace == "" && req.NamespacedName.Name == "" { if err := r.cleanUp(ctx); err != nil { return ctrl.Result{}, fmt.Errorf("failed to clean up orphaned wireguard peers: %w", err) } } podEndpoint := &egressgatewayv1alpha1.PodEndpoint{} if err := r.Get(ctx, req.NamespacedName, podEndpoint); err != nil { if apierrors.IsNotFound(err) { // Object not found, return. return ctrl.Result{}, nil } log.Error(err, "unable to fetch PodEndpoint instance") return ctrl.Result{}, err } gwConfigKey := types.NamespacedName{ Namespace: podEndpoint.Namespace, Name: podEndpoint.Spec.StaticGatewayConfiguration, } // Fetch the StaticGatewayConfiguration instance. gwConfig := &egressgatewayv1alpha1.StaticGatewayConfiguration{} if err := r.Get(ctx, gwConfigKey, gwConfig); err != nil { return ctrl.Result{}, fmt.Errorf("failed to fetch StaticGatewayConfiguration(%s/%s): %w", gwConfigKey.Namespace, gwConfigKey.Name, err) } if !applyToNode(gwConfig) { // gwConfig does not apply to this node return ctrl.Result{}, nil } // Reconcile wireguard peer return r.reconcile(ctx, gwConfig, podEndpoint) } // SetupWithManager sets up the controller with the Manager. func (r *PodEndpointReconciler) SetupWithManager(mgr ctrl.Manager) error { r.Netlink = netlinkwrapper.NewNetLink() r.NetNS = netnswrapper.NewNetNS() r.WgCtrl = wgctrlwrapper.NewWgCtrl() controller, err := ctrl.NewControllerManagedBy(mgr).For(&egressgatewayv1alpha1.PodEndpoint{}).Build(r) if err != nil { return err } return controller.Watch(source.Channel(r.TickerEvents, &handler.EnqueueRequestForObject{})) } func (r *PodEndpointReconciler) reconcile( ctx context.Context, gwConfig *egressgatewayv1alpha1.StaticGatewayConfiguration, podEndpoint *egressgatewayv1alpha1.PodEndpoint, ) (ctrl.Result, error) { log := log.FromContext(ctx) log.Info("Reconciling PodEndpoint") nsName := consts.GatewayNetnsName gwns, err := r.NetNS.GetNS(nsName) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to get gateway network namespace %s: %w", nsName, err) } defer gwns.Close() if err := gwns.Do(func(nn ns.NetNS) error { wgClient, err := r.WgCtrl.New() if err != nil { return fmt.Errorf("failed to create wgctrl client: %w", err) } defer func() { _ = wgClient.Close() }() podPublicKey, err := wgtypes.ParseKey(podEndpoint.Spec.PodPublicKey) if err != nil { return fmt.Errorf("failed to parse pod wireguard public key: %w", err) } _, podIPNet, err := net.ParseCIDR(podEndpoint.Spec.PodIpAddress) if err != nil { return fmt.Errorf("failed to parse pod IPv4 address: %w", err) } wgConfig := wgtypes.Config{ Peers: []wgtypes.PeerConfig{ { PublicKey: podPublicKey, ReplaceAllowedIPs: true, AllowedIPs: []net.IPNet{ *podIPNet, }, }, }, } if err := wgClient.ConfigureDevice(getWireguardInterfaceName(gwConfig), wgConfig); err != nil { return fmt.Errorf("failed to add peer to wireguard device: %w", err) } if err := r.addWireguardPeerRoutes(gwConfig, podEndpoint); err != nil { return fmt.Errorf("failed to add pod route: %w", err) } return nil }); err != nil { return ctrl.Result{}, err } peerConfigs := []egressgatewayv1alpha1.PeerConfiguration{ { PodEndpoint: fmt.Sprintf("%s/%s", podEndpoint.Namespace, podEndpoint.Name), InterfaceName: getWireguardInterfaceName(gwConfig), PublicKey: podEndpoint.Spec.PodPublicKey, }, } if err := r.updateGatewayNodeStatus(ctx, peerConfigs, true /* add */); err != nil { return ctrl.Result{}, err } log.Info("Pod wireguard endpoint reconciled") return ctrl.Result{}, nil } func (r *PodEndpointReconciler) cleanUp(ctx context.Context) error { log := log.FromContext(ctx) log.Info("Cleaning up orphaned wireguard peers") podEndpointList := &egressgatewayv1alpha1.PodEndpointList{} if err := r.List(ctx, podEndpointList); err != nil { return fmt.Errorf("failed to list PodEndpoints: %w", err) } gwConfigList := &egressgatewayv1alpha1.StaticGatewayConfigurationList{} if err := r.List(ctx, gwConfigList); err != nil { return fmt.Errorf("failed to list staticGatewayConfigurations: %w", err) } gwConfigMap := make(map[string]string) for _, gwConfig := range gwConfigList.Items { gwConfig := gwConfig // skip deleting gwConfig, as the wglink will be deleted in staticGatewayConfiguration controller if applyToNode(&gwConfig) && gwConfig.ObjectMeta.DeletionTimestamp.IsZero() { gwConfigMap[strings.ToLower(fmt.Sprintf("%s/%s", gwConfig.Namespace, gwConfig.Name))] = getWireguardInterfaceName(&gwConfig) } } // map: gw-namespace-name -> set of peer public keys peerMap := make(map[string]map[string]struct{}) for _, podEndpoint := range podEndpointList.Items { if wglinkName, ok := gwConfigMap[strings.ToLower(fmt.Sprintf("%s/%s", podEndpoint.Namespace, podEndpoint.Spec.StaticGatewayConfiguration))]; ok { if _, exists := peerMap[wglinkName]; !exists { peerMap[wglinkName] = make(map[string]struct{}) } peerMap[wglinkName][podEndpoint.Spec.PodPublicKey] = struct{}{} } } var peersToDelete []egressgatewayv1alpha1.PeerConfiguration for _, wglinkName := range gwConfigMap { peers, err := r.cleanUpWgLink(ctx, wglinkName, peerMap) if err != nil { // do not block cleaning up rest namespaces log.Error(err, fmt.Sprintf("failed to clean up peers for wgLink %s", wglinkName)) } peersToDelete = append(peersToDelete, peers...) } if err := r.updateGatewayNodeStatus(ctx, peersToDelete, false /* add */); err != nil { return fmt.Errorf("failed to update gateway node status: %w", err) } log.Info("Wireguard peer cleanup completed") return nil } func (r *PodEndpointReconciler) cleanUpWgLink( ctx context.Context, wglinkName string, peerMap map[string]map[string]struct{}, ) ([]egressgatewayv1alpha1.PeerConfiguration, error) { log := log.FromContext(ctx) peersToDelete := make([]egressgatewayv1alpha1.PeerConfiguration, 0) gwns, err := r.NetNS.GetNS(consts.GatewayNetnsName) if err != nil { return nil, fmt.Errorf("failed to get gateway network namespace %s: %w", consts.GatewayNetnsName, err) } defer gwns.Close() if err := gwns.Do(func(nn ns.NetNS) error { wgClient, err := r.WgCtrl.New() if err != nil { return fmt.Errorf("failed to create wgctrl client: %w", err) } defer func() { _ = wgClient.Close() }() device, err := wgClient.Device(wglinkName) if err != nil { return fmt.Errorf("failed to get wireguard link configuration: %w", err) } wgConfig := wgtypes.Config{} podIPToDel := make(map[string]bool) for i := range device.Peers { if _, ok := peerMap[wglinkName][device.Peers[i].PublicKey.String()]; !ok { wgConfig.Peers = append(wgConfig.Peers, wgtypes.PeerConfig{ PublicKey: device.Peers[i].PublicKey, Remove: true, }) for _, ipNet := range device.Peers[i].AllowedIPs { podIPToDel[ipNet.IP.String()] = true } log.Info(fmt.Sprintf("Removing peer %s from wgLink %s", device.Peers[i].PublicKey.String(), wglinkName)) } } if len(wgConfig.Peers) > 0 { if err := r.deleteWireguardPeerRoutes(wglinkName, podIPToDel); err != nil { return fmt.Errorf("failed to delete pod route on wglink %s: %w", wglinkName, err) } if err := wgClient.ConfigureDevice(wglinkName, wgConfig); err != nil { return fmt.Errorf("failed to remove peers from wireguard device %s: %w", wglinkName, err) } for _, peer := range wgConfig.Peers { peersToDelete = append(peersToDelete, egressgatewayv1alpha1.PeerConfiguration{PublicKey: peer.PublicKey.String()}) } } return nil }); err != nil { return nil, err } return peersToDelete, nil } func (r *PodEndpointReconciler) addWireguardPeerRoutes( gwConfig *egressgatewayv1alpha1.StaticGatewayConfiguration, podEndpoint *egressgatewayv1alpha1.PodEndpoint, ) error { wgLink, err := r.Netlink.LinkByName(getWireguardInterfaceName(gwConfig)) if err != nil { return fmt.Errorf("failed to retrieve wireguard device: %w", err) } _, dst, err := net.ParseCIDR(podEndpoint.Spec.PodIpAddress) if err != nil { return fmt.Errorf("failed to parse pod ip net %s: %w", podEndpoint.Spec.PodIpAddress, err) } route := &netlink.Route{ LinkIndex: wgLink.Attrs().Index, Scope: netlink.SCOPE_LINK, Dst: dst, } if err := r.Netlink.RouteReplace(route); err != nil { return fmt.Errorf("failed to add route %s: %w", route, err) } return nil } func (r *PodEndpointReconciler) deleteWireguardPeerRoutes( wglinkName string, podIPToDel map[string]bool, ) error { wgLink, err := r.Netlink.LinkByName(wglinkName) if err != nil { return fmt.Errorf("failed to get wglink %s: %w", wglinkName, err) } routes, err := r.Netlink.RouteList(wgLink, netlink.FAMILY_ALL) if err != nil { return fmt.Errorf("failed to list routes on wglink %s: %w", wglinkName, err) } for _, route := range routes { route := route if _, ok := podIPToDel[route.Dst.IP.String()]; ok { if err := r.Netlink.RouteDel(&route); err != nil { return fmt.Errorf("failed to delete route %s: %w", route, err) } } } return nil } func (r *PodEndpointReconciler) updateGatewayNodeStatus( ctx context.Context, peerConfigs []egressgatewayv1alpha1.PeerConfiguration, add bool, ) error { log := log.FromContext(ctx) gwStatusKey := types.NamespacedName{ Namespace: os.Getenv(consts.PodNamespaceEnvKey), Name: os.Getenv(consts.NodeNameEnvKey), } gwStatus := &egressgatewayv1alpha1.GatewayStatus{} if err := r.Get(ctx, gwStatusKey, gwStatus); err != nil { if !apierrors.IsNotFound(err) { log.Error(err, "failed to get existing gateway status object %s/%s", gwStatusKey.Namespace, gwStatusKey.Name) return err } else { if !add { // ignore creating object during cleanup return nil } // gwStatus does not exist, create a new one log.Info(fmt.Sprintf("Creating new gateway status(%s/%s)", gwStatusKey.Namespace, gwStatusKey.Name)) node := &corev1.Node{} if err := r.Get(ctx, types.NamespacedName{Name: os.Getenv(consts.NodeNameEnvKey)}, node); err != nil { return fmt.Errorf("failed to get current node: %w", err) } gwStatus := &egressgatewayv1alpha1.GatewayStatus{ ObjectMeta: metav1.ObjectMeta{ Name: gwStatusKey.Name, Namespace: gwStatusKey.Namespace, }, Spec: egressgatewayv1alpha1.GatewayStatusSpec{ ReadyPeerConfigurations: peerConfigs, }, } if err := controllerutil.SetOwnerReference(node, gwStatus, r.Client.Scheme()); err != nil { return fmt.Errorf("failed to set gwStatus owner reference to node: %w", err) } log.Info("Creating new gateway status object") if err := r.Create(ctx, gwStatus); err != nil { return fmt.Errorf("failed to create gwStatus object: %w", err) } } } else { changed := false peerMap := make(map[string]*egressgatewayv1alpha1.PeerConfiguration) for _, peerConfig := range gwStatus.Spec.ReadyPeerConfigurations { peerConfig := peerConfig peerMap[peerConfig.PublicKey] = &peerConfig } for i, peerConfig := range peerConfigs { if _, ok := peerMap[peerConfig.PublicKey]; ok { if !add { delete(peerMap, peerConfig.PublicKey) changed = true } } else { if add { peerMap[peerConfig.PublicKey] = &peerConfigs[i] changed = true } } } if changed { var peers []egressgatewayv1alpha1.PeerConfiguration for _, peerConfig := range peerMap { peers = append(peers, *peerConfig) } gwStatus.Spec.ReadyPeerConfigurations = peers log.Info("Updating gateway status object") if err := r.Update(ctx, gwStatus); err != nil { return fmt.Errorf("failed to update gwStatus object: %w", err) } } } return nil }