controllers/manager/gatewayvmconfiguration_controller.go (745 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package manager
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
compute "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v6"
network "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v6"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
egressgatewayv1alpha1 "github.com/Azure/kube-egress-gateway/api/v1alpha1"
"github.com/Azure/kube-egress-gateway/pkg/azmanager"
"github.com/Azure/kube-egress-gateway/pkg/consts"
"github.com/Azure/kube-egress-gateway/pkg/metrics"
"github.com/Azure/kube-egress-gateway/pkg/utils/to"
)
// GatewayVMConfigurationReconciler reconciles a GatewayVMConfiguration object
type GatewayVMConfigurationReconciler struct {
client.Client
*azmanager.AzureManager
Recorder record.EventRecorder
}
var (
publicIPPrefixRE = regexp.MustCompile(`(?i).*/subscriptions/(.+)/resourceGroups/(.+)/providers/Microsoft.Network/publicIPPrefixes/(.+)`)
)
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=gatewayvmconfigurations,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=gatewayvmconfigurations/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=egressgateway.kubernetes.azure.com,resources=gatewayvmconfigurations/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the GatewayVMConfiguration object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *GatewayVMConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
// handle node events and enqueue corresponding gatewayVMConfigurations if nodepool matches
if req.Namespace == "" && req.Name != "" {
log.Info(fmt.Sprintf("Reconciling node event %s", req.Name))
node := &corev1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
log.Error(err, "unable to fetch node instance")
return ctrl.Result{}, err
}
vmConfigList := &egressgatewayv1alpha1.GatewayVMConfigurationList{}
if err := r.List(ctx, vmConfigList); err != nil {
log.Error(err, "failed to list GatewayVMConfiguration")
return ctrl.Result{}, err
}
var aggregateError error
for _, vmConfig := range vmConfigList.Items {
// skip reconciling when
// 1. node has agentpool label
// 2. vmConfig has GatewayNodepoolName and node agentpool label does not match
// or vmConfig is deleting
if !vmConfig.ObjectMeta.DeletionTimestamp.IsZero() {
continue
}
if v, ok := node.Labels[consts.AKSNodepoolNameLabel]; ok {
if npName := vmConfig.Spec.GatewayNodepoolName; npName != "" && !strings.EqualFold(v, npName) {
continue
}
}
log.Info(fmt.Sprintf("reconcile vmConfig (%s/%s) upon node (%s) event", vmConfig.GetNamespace(), vmConfig.GetName(), req.Name))
if _, err := r.reconcile(ctx, &vmConfig); err != nil {
log.Error(err, "failed to reconcile GatewayVMConfiguration")
aggregateError = errors.Join(aggregateError, err)
continue // continue to reconcile other vmConfigs
}
}
return ctrl.Result{}, aggregateError
}
vmConfig := &egressgatewayv1alpha1.GatewayVMConfiguration{}
if err := r.Get(ctx, req.NamespacedName, vmConfig); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return.
return ctrl.Result{}, nil
}
log.Error(err, "unable to fetch GatewayVMConfiguration instance")
return ctrl.Result{}, err
}
gwConfig := &egressgatewayv1alpha1.StaticGatewayConfiguration{}
if err := r.Get(ctx, req.NamespacedName, gwConfig); err != nil {
log.Error(err, "failed to fetch StaticGatewayConfiguration instance")
return ctrl.Result{}, err
}
if !vmConfig.ObjectMeta.DeletionTimestamp.IsZero() {
// Clean up gatewayVMConfiguration
res, err := r.ensureDeleted(ctx, vmConfig)
if err != nil {
r.Recorder.Event(gwConfig, corev1.EventTypeWarning, "EnsureDeleteGatewayVMConfigurationError", err.Error())
}
return res, err
}
res, err := r.reconcile(ctx, vmConfig)
if err != nil {
r.Recorder.Event(gwConfig, corev1.EventTypeWarning, "ReconcileGatewayVMConfigurationError", err.Error())
} else {
r.Recorder.Event(gwConfig, corev1.EventTypeNormal, "ReconcileGatewayVMConfigurationSuccess", "GatewayVMConfiguration reconciled")
}
return res, err
}
// SetupWithManager sets up the controller with the Manager.
func (r *GatewayVMConfigurationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&egressgatewayv1alpha1.GatewayVMConfiguration{}).
// allow for node events to trigger reconciliation when either node label matches
Watches(&corev1.Node{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(resourceHasFilterLabel(
map[string]string{consts.AKSNodepoolModeLabel: consts.AKSNodepoolModeValue, consts.UpstreamNodepoolModeLabel: "true"}))).
Complete(r)
}
// resourceHasFilterLabel returns a predicate that returns true only if the provided resource contains a label
func resourceHasFilterLabel(m map[string]string) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return false
},
CreateFunc: func(e event.CreateEvent) bool {
return ifLabelMatch(e.Object, m)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return ifLabelMatch(e.Object, m)
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}
}
func ifLabelMatch(obj client.Object, m map[string]string) bool {
if len(m) == 0 {
return true
}
// return true if any label matches
for labelKey, labelValue := range m {
if labelKey == "" {
return true
}
labels := obj.GetLabels()
if v, ok := labels[labelKey]; ok {
// Return early if no labelValue was set.
if labelValue == "" {
return true
}
if strings.EqualFold(v, labelValue) {
return true
}
}
}
return false
}
func (r *GatewayVMConfigurationReconciler) reconcile(
ctx context.Context,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info(fmt.Sprintf("Reconciling GatewayVMConfiguration %s/%s", vmConfig.Namespace, vmConfig.Name))
mc := metrics.NewMetricsContext(
os.Getenv(consts.PodNamespaceEnvKey),
"reconcile_gateway_vm_configuration",
r.SubscriptionID(),
r.ResourceGroup,
strings.ToLower(fmt.Sprintf("%s/%s", vmConfig.Namespace, vmConfig.Name)),
)
succeeded := false
defer func() { mc.ObserveControllerReconcileMetrics(succeeded) }()
if !controllerutil.ContainsFinalizer(vmConfig, consts.VMConfigFinalizerName) {
log.Info("Adding finalizer")
controllerutil.AddFinalizer(vmConfig, consts.VMConfigFinalizerName)
err := r.Update(ctx, vmConfig)
if err != nil {
log.Error(err, "failed to add finalizer")
return ctrl.Result{}, err
}
}
existing := &egressgatewayv1alpha1.GatewayVMConfiguration{}
vmConfig.DeepCopyInto(existing)
vmss, ipPrefixLength, err := r.getGatewayVMSS(ctx, vmConfig)
if err != nil {
log.Error(err, "failed to get vmss")
return ctrl.Result{}, err
}
ipPrefix, ipPrefixID, isManaged, err := r.ensurePublicIPPrefix(ctx, ipPrefixLength, vmConfig)
if err != nil {
log.Error(err, "failed to ensure public ip prefix")
return ctrl.Result{}, err
}
var privateIPs []string
if privateIPs, err = r.reconcileVMSS(ctx, vmConfig, vmss, ipPrefixID, true); err != nil {
log.Error(err, "failed to reconcile VMSS")
return ctrl.Result{}, err
}
if !isManaged {
if err := r.ensurePublicIPPrefixDeleted(ctx, vmConfig); err != nil {
log.Error(err, "failed to remove managed public ip prefix")
return ctrl.Result{}, err
}
}
if vmConfig.Status == nil {
vmConfig.Status = &egressgatewayv1alpha1.GatewayVMConfigurationStatus{}
}
if vmConfig.Spec.ProvisionPublicIps {
vmConfig.Status.EgressIpPrefix = ipPrefix
} else {
vmConfig.Status.EgressIpPrefix = strings.Join(privateIPs, ",")
}
if !equality.Semantic.DeepEqual(existing, vmConfig) {
log.Info(fmt.Sprintf("Updating GatewayVMConfiguration %s/%s", vmConfig.Namespace, vmConfig.Name))
if err := r.Status().Update(ctx, vmConfig); err != nil {
log.Error(err, "failed to update gateway vm configuration")
}
}
log.Info("GatewayVMConfiguration reconciled")
succeeded = true
return ctrl.Result{}, nil
}
func (r *GatewayVMConfigurationReconciler) ensureDeleted(
ctx context.Context,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info(fmt.Sprintf("Reconciling gatewayVMConfiguration deletion %s/%s", vmConfig.Namespace, vmConfig.Name))
if !controllerutil.ContainsFinalizer(vmConfig, consts.VMConfigFinalizerName) {
log.Info("vmConfig does not have finalizer, no additional cleanup needed")
return ctrl.Result{}, nil
}
mc := metrics.NewMetricsContext(
os.Getenv(consts.PodNamespaceEnvKey),
"delete_gateway_vm_configuration",
r.SubscriptionID(),
r.ResourceGroup,
strings.ToLower(fmt.Sprintf("%s/%s", vmConfig.Namespace, vmConfig.Name)),
)
succeeded := false
defer func() { mc.ObserveControllerReconcileMetrics(succeeded) }()
vmss, _, err := r.getGatewayVMSS(ctx, vmConfig)
if err != nil {
log.Error(err, "failed to get vmss")
return ctrl.Result{}, err
}
if _, err := r.reconcileVMSS(ctx, vmConfig, vmss, "", false); err != nil {
log.Error(err, "failed to reconcile VMSS")
return ctrl.Result{}, err
}
if err := r.ensurePublicIPPrefixDeleted(ctx, vmConfig); err != nil {
log.Error(err, "failed to delete managed public ip prefix")
return ctrl.Result{}, err
}
log.Info("Removing finalizer")
controllerutil.RemoveFinalizer(vmConfig, consts.VMConfigFinalizerName)
if err := r.Update(ctx, vmConfig); err != nil {
log.Error(err, "failed to remove finalizer")
return ctrl.Result{}, err
}
log.Info("GatewayVMConfiguration deletion reconciled")
succeeded = true
return ctrl.Result{}, nil
}
func (r *GatewayVMConfigurationReconciler) getGatewayVMSS(
ctx context.Context,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
) (*compute.VirtualMachineScaleSet, int32, error) {
if vmConfig.Spec.GatewayNodepoolName != "" {
vmssList, err := r.ListVMSS(ctx)
if err != nil {
return nil, 0, err
}
for i := range vmssList {
vmss := vmssList[i]
if v, ok := vmss.Tags[consts.AKSNodepoolTagKey]; ok {
if strings.EqualFold(to.Val(v), vmConfig.Spec.GatewayNodepoolName) {
if prefixLenStr, ok := vmss.Tags[consts.AKSNodepoolIPPrefixSizeTagKey]; ok {
if prefixLen, err := strconv.Atoi(to.Val(prefixLenStr)); err == nil && prefixLen > 0 && prefixLen <= math.MaxInt32 {
return vmss, int32(prefixLen), nil
} else {
return nil, 0, fmt.Errorf("failed to parse nodepool IP prefix size: %s", to.Val(prefixLenStr))
}
} else {
return nil, 0, fmt.Errorf("nodepool does not have IP prefix size")
}
}
}
}
} else {
vmss, err := r.GetVMSS(ctx, vmConfig.Spec.VmssResourceGroup, vmConfig.Spec.VmssName)
if err != nil {
return nil, 0, err
}
return vmss, vmConfig.Spec.PublicIpPrefixSize, nil
}
return nil, 0, fmt.Errorf("gateway VMSS not found")
}
func managedSubresourceName(vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration) string {
return consts.ManagedResourcePrefix + string(vmConfig.GetUID())
}
func isErrorNotFound(err error) bool {
var respErr *azcore.ResponseError
return errors.As(err, &respErr) && respErr.StatusCode == http.StatusNotFound
}
func (r *GatewayVMConfigurationReconciler) ensurePublicIPPrefix(
ctx context.Context,
ipPrefixLength int32,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
) (string, string, bool, error) {
log := log.FromContext(ctx)
// no need to provision public ip prefix is only private egress is needed
if !vmConfig.Spec.ProvisionPublicIps {
// return isManaged as false so that previously created managed public ip prefix can be deleted
return "", "", false, nil
}
if vmConfig.Spec.PublicIpPrefixId != "" {
// if there is public prefix ip specified, prioritize this one
matches := publicIPPrefixRE.FindStringSubmatch(vmConfig.Spec.PublicIpPrefixId)
if len(matches) != 4 {
return "", "", false, fmt.Errorf("failed to parse public ip prefix id: %s", vmConfig.Spec.PublicIpPrefixId)
}
subscriptionID, resourceGroupName, publicIpPrefixName := matches[1], matches[2], matches[3]
if subscriptionID != r.SubscriptionID() {
return "", "", false, fmt.Errorf("public ip prefix subscription(%s) is not in the same subscription(%s)", subscriptionID, r.SubscriptionID())
}
ipPrefix, err := r.GetPublicIPPrefix(ctx, resourceGroupName, publicIpPrefixName)
if err != nil {
return "", "", false, fmt.Errorf("failed to get public ip prefix(%s): %w", vmConfig.Spec.PublicIpPrefixId, err)
}
if ipPrefix.Properties == nil {
return "", "", false, fmt.Errorf("public ip prefix(%s) has empty properties", vmConfig.Spec.PublicIpPrefixId)
}
if to.Val(ipPrefix.Properties.PrefixLength) != ipPrefixLength {
return "", "", false, fmt.Errorf("provided public ip prefix has invalid length(%d), required(%d)", to.Val(ipPrefix.Properties.PrefixLength), ipPrefixLength)
}
log.Info("Found existing unmanaged public ip prefix", "public ip prefix", to.Val(ipPrefix.Properties.IPPrefix))
return to.Val(ipPrefix.Properties.IPPrefix), to.Val(ipPrefix.ID), false, nil
} else {
// check if there's managed public prefix ip
publicIpPrefixName := managedSubresourceName(vmConfig)
ipPrefix, err := r.GetPublicIPPrefix(ctx, "", publicIpPrefixName)
if err == nil {
if ipPrefix.Properties == nil {
return "", "", false, fmt.Errorf("managed public ip prefix has empty properties")
} else {
log.Info("Found existing managed public ip prefix", "public ip prefix", to.Val(ipPrefix.Properties.IPPrefix))
return to.Val(ipPrefix.Properties.IPPrefix), to.Val(ipPrefix.ID), true, nil
}
} else {
if !isErrorNotFound(err) {
return "", "", false, fmt.Errorf("failed to get managed public ip prefix: %w", err)
}
// create new public ip prefix
newIPPrefix := network.PublicIPPrefix{
Name: to.Ptr(publicIpPrefixName),
Location: to.Ptr(r.Location()),
Properties: &network.PublicIPPrefixPropertiesFormat{
PrefixLength: to.Ptr(ipPrefixLength),
PublicIPAddressVersion: to.Ptr(network.IPVersionIPv4),
},
SKU: &network.PublicIPPrefixSKU{
Name: to.Ptr(network.PublicIPPrefixSKUNameStandard),
Tier: to.Ptr(network.PublicIPPrefixSKUTierRegional),
},
}
log.Info("Creating new managed public ip prefix")
ipPrefix, err := r.CreateOrUpdatePublicIPPrefix(ctx, "", publicIpPrefixName, newIPPrefix)
if err != nil {
return "", "", false, fmt.Errorf("failed to create managed public ip prefix: %w", err)
}
return to.Val(ipPrefix.Properties.IPPrefix), to.Val(ipPrefix.ID), true, nil
}
}
}
func (r *GatewayVMConfigurationReconciler) ensurePublicIPPrefixDeleted(
ctx context.Context,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
) error {
log := log.FromContext(ctx)
// only ensure managed public prefix ip is deleted
publicIpPrefixName := managedSubresourceName(vmConfig)
_, err := r.GetPublicIPPrefix(ctx, "", publicIpPrefixName)
if err != nil {
if isErrorNotFound(err) {
// resource does not exist, directly return
return nil
} else {
return fmt.Errorf("failed to get public ip prefix(%s): %w", publicIpPrefixName, err)
}
} else {
log.Info("Deleting managed public ip prefix", "public ip prefix name", publicIpPrefixName)
if err := r.DeletePublicIPPrefix(ctx, "", publicIpPrefixName); err != nil {
return fmt.Errorf("failed to delete public ip prefix(%s): %w", publicIpPrefixName, err)
}
return nil
}
}
func (r *GatewayVMConfigurationReconciler) reconcileVMSS(
ctx context.Context,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
vmss *compute.VirtualMachineScaleSet,
ipPrefixID string,
wantIPConfig bool,
) ([]string, error) {
log := log.FromContext(ctx)
ipConfigName := managedSubresourceName(vmConfig)
vmssRG := getVMSSResourceGroup(vmConfig)
needUpdate := false
if vmss.Properties == nil || vmss.Properties.VirtualMachineProfile == nil ||
vmss.Properties.VirtualMachineProfile.NetworkProfile == nil {
return nil, fmt.Errorf("vmss has empty network profile")
}
lbBackendpoolID := r.GetLBBackendAddressPoolID(to.Val(vmss.Properties.UniqueID))
interfaces := vmss.Properties.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
needUpdate, err := r.reconcileVMSSNetworkInterface(ctx, ipConfigName, ipPrefixID, to.Val(lbBackendpoolID), wantIPConfig, interfaces)
if err != nil {
return nil, fmt.Errorf("failed to reconcile vmss interface(%s): %w", to.Val(vmss.Name), err)
}
if needUpdate {
log.Info("Updating vmss", "vmssName", to.Val(vmss.Name))
newVmss := compute.VirtualMachineScaleSet{
Location: vmss.Location,
Properties: &compute.VirtualMachineScaleSetProperties{
VirtualMachineProfile: &compute.VirtualMachineScaleSetVMProfile{
NetworkProfile: vmss.Properties.VirtualMachineProfile.NetworkProfile,
},
},
}
if _, err := r.CreateOrUpdateVMSS(ctx, vmssRG, to.Val(vmss.Name), newVmss); err != nil {
return nil, fmt.Errorf("failed to update vmss(%s): %w", to.Val(vmss.Name), err)
}
}
// check and update VMSS instances
var privateIPs []string
instances, err := r.ListVMSSInstances(ctx, vmssRG, to.Val(vmss.Name))
if err != nil {
return nil, fmt.Errorf("failed to get vm instances from vmss(%s): %w", to.Val(vmss.Name), err)
}
for _, instance := range instances {
privateIP, err := r.reconcileVMSSVM(ctx, vmConfig, to.Val(vmss.Name), instance, ipPrefixID, to.Val(lbBackendpoolID), wantIPConfig)
if err != nil {
return nil, err
}
if wantIPConfig && ipPrefixID == "" {
privateIPs = append(privateIPs, privateIP)
}
}
// clean up VMProfiles for deleted nodes
var vmprofiles []egressgatewayv1alpha1.GatewayVMProfile
if vmConfig.Status != nil {
for i := range vmConfig.Status.GatewayVMProfiles {
profile := vmConfig.Status.GatewayVMProfiles[i]
for _, instance := range instances {
if profile.NodeName == to.Val(instance.Properties.OSProfile.ComputerName) {
vmprofiles = append(vmprofiles, profile)
break
}
}
}
vmConfig.Status.GatewayVMProfiles = vmprofiles
}
err = r.Status().Update(ctx, vmConfig)
if err != nil {
return nil, fmt.Errorf("failed to update vm config status: %w", err)
}
return privateIPs, nil
}
func (r *GatewayVMConfigurationReconciler) reconcileVMSSVM(
ctx context.Context,
vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration,
vmssName string,
vm *compute.VirtualMachineScaleSetVM,
ipPrefixID string,
lbBackendpoolID string,
wantIPConfig bool,
) (string, error) {
log := log.FromContext(ctx).WithValues("vmssInstance", to.Val(vm.ID), "wantIPConfig", wantIPConfig, "ipPrefixID", ipPrefixID)
ipConfigName := managedSubresourceName(vmConfig)
vmssRG := getVMSSResourceGroup(vmConfig)
if vm.Properties == nil || vm.Properties.NetworkProfileConfiguration == nil {
return "", fmt.Errorf("vmss vm(%s) has empty network profile", to.Val(vm.InstanceID))
}
if vm.Properties.OSProfile == nil {
return "", fmt.Errorf("vmss vm(%s) has empty os profile", to.Val(vm.InstanceID))
}
forceUpdate := false
// check ProvisioningState
if vm.Properties.ProvisioningState != nil && !strings.EqualFold(to.Val(vm.Properties.ProvisioningState), "Succeeded") {
log.Info(fmt.Sprintf("VMSS instance ProvisioningState %q", to.Val(vm.Properties.ProvisioningState)))
if strings.EqualFold(to.Val(vm.Properties.ProvisioningState), "Failed") {
forceUpdate = true
log.Info(fmt.Sprintf("Force update for unexpected VMSS instance ProvisioningState:%q", to.Val(vm.Properties.ProvisioningState)))
}
}
// check primary IP & secondary IP
var primaryIP, secondaryIP string
if !forceUpdate && wantIPConfig {
for _, nic := range vm.Properties.NetworkProfileConfiguration.NetworkInterfaceConfigurations {
if nic.Properties != nil && to.Val(nic.Properties.Primary) {
vmNic, err := r.GetVMSSInterface(ctx, vmssRG, vmssName, to.Val(vm.InstanceID), to.Val(nic.Name))
if err != nil || vmNic.Properties == nil || vmNic.Properties.IPConfigurations == nil {
if err != nil {
log.Info("Skip IP check for forceUpdate", "error", err.Error())
} else {
log.Info("Skip IP check for forceUpdate")
}
break
}
for _, ipConfig := range vmNic.Properties.IPConfigurations {
if ipConfig != nil && ipConfig.Properties != nil && strings.EqualFold(to.Val(ipConfig.Name), ipConfigName) {
secondaryIP = to.Val(ipConfig.Properties.PrivateIPAddress)
} else if ipConfig != nil && ipConfig.Properties != nil && to.Val(ipConfig.Properties.Primary) {
primaryIP = to.Val(ipConfig.Properties.PrivateIPAddress)
}
}
}
}
if primaryIP == "" || secondaryIP == "" {
forceUpdate = true
log.Info("Force update for missing primary IP and/or secondary IP", "primaryIP", primaryIP, "secondaryIP", secondaryIP)
}
}
interfaces := vm.Properties.NetworkProfileConfiguration.NetworkInterfaceConfigurations
needUpdate, err := r.reconcileVMSSNetworkInterface(ctx, ipConfigName, ipPrefixID, lbBackendpoolID, wantIPConfig, interfaces)
if err != nil {
return "", fmt.Errorf("failed to reconcile vm interface(%s): %w", to.Val(vm.InstanceID), err)
}
vmUpdated := false
if needUpdate || forceUpdate {
log.Info("Updating vmss instance")
if !needUpdate && forceUpdate {
log.Info("Updating vmss instance triggered by forceUpdate")
}
newVM := compute.VirtualMachineScaleSetVM{
Properties: &compute.VirtualMachineScaleSetVMProperties{
NetworkProfileConfiguration: &compute.VirtualMachineScaleSetVMNetworkProfileConfiguration{
NetworkInterfaceConfigurations: interfaces,
},
},
}
if _, err := r.UpdateVMSSInstance(ctx, vmssRG, vmssName, to.Val(vm.InstanceID), newVM); err != nil {
return "", fmt.Errorf("failed to update vmss instance(%s): %w", to.Val(vm.InstanceID), err)
}
vmUpdated = true
}
// return earlier if it's deleting event
if !wantIPConfig {
return "", nil
}
if vmUpdated || primaryIP == "" || secondaryIP == "" {
primaryIP, secondaryIP = "", ""
for _, nic := range interfaces {
if nic.Properties != nil && to.Val(nic.Properties.Primary) {
vmNic, err := r.GetVMSSInterface(ctx, vmssRG, vmssName, to.Val(vm.InstanceID), to.Val(nic.Name))
if err != nil {
return "", fmt.Errorf("failed to get vmss(%s) instance(%s) nic(%s): %w", vmssName, to.Val(vm.InstanceID), to.Val(nic.Name), err)
}
if vmNic.Properties == nil || vmNic.Properties.IPConfigurations == nil {
return "", fmt.Errorf("vmss(%s) instance(%s) nic(%s) has empty ip configurations", vmssName, to.Val(vm.InstanceID), to.Val(nic.Name))
}
for _, ipConfig := range vmNic.Properties.IPConfigurations {
if ipConfig != nil && ipConfig.Properties != nil && strings.EqualFold(to.Val(ipConfig.Name), ipConfigName) {
secondaryIP = to.Val(ipConfig.Properties.PrivateIPAddress)
} else if ipConfig != nil && ipConfig.Properties != nil && to.Val(ipConfig.Properties.Primary) {
primaryIP = to.Val(ipConfig.Properties.PrivateIPAddress)
}
}
}
}
}
if primaryIP == "" || secondaryIP == "" {
return "", fmt.Errorf("failed to find private IP from vmss(%s), instance(%s), ipConfig(%s)", vmssName, to.Val(vm.InstanceID), ipConfigName)
}
vmprofile := egressgatewayv1alpha1.GatewayVMProfile{
NodeName: to.Val(vm.Properties.OSProfile.ComputerName),
PrimaryIP: primaryIP,
SecondaryIP: secondaryIP,
}
if vmConfig.Status == nil {
vmConfig.Status = &egressgatewayv1alpha1.GatewayVMConfigurationStatus{}
}
for i, profile := range vmConfig.Status.GatewayVMProfiles {
if profile.NodeName == vmprofile.NodeName {
if profile.PrimaryIP != primaryIP || profile.SecondaryIP != secondaryIP {
vmConfig.Status.GatewayVMProfiles[i].PrimaryIP = primaryIP
vmConfig.Status.GatewayVMProfiles[i].SecondaryIP = secondaryIP
log.Info("GatewayVMConfiguration status updated", "primaryIP", primaryIP, "secondaryIP", secondaryIP)
return secondaryIP, nil
}
log.Info("GatewayVMConfiguration status not changed", "primaryIP", primaryIP, "secondaryIP", secondaryIP)
return secondaryIP, nil
}
}
log.Info("GatewayVMConfiguration status updated for new nodes", "nodeName", vmprofile.NodeName, "primaryIP", primaryIP, "secondaryIP", secondaryIP)
vmConfig.Status.GatewayVMProfiles = append(vmConfig.Status.GatewayVMProfiles, vmprofile)
return secondaryIP, nil
}
func (r *GatewayVMConfigurationReconciler) reconcileVMSSNetworkInterface(
ctx context.Context,
ipConfigName string,
ipPrefixID string,
lbBackendpoolID string,
wantIPConfig bool,
interfaces []*compute.VirtualMachineScaleSetNetworkConfiguration,
) (bool, error) {
log := log.FromContext(ctx)
expectedConfig := r.getExpectedIPConfig(ipConfigName, ipPrefixID, interfaces)
var primaryNic *compute.VirtualMachineScaleSetNetworkConfiguration
needUpdate := false
foundConfig := false
for _, nic := range interfaces {
if nic.Properties != nil && to.Val(nic.Properties.Primary) {
primaryNic = nic
for i, ipConfig := range nic.Properties.IPConfigurations {
if to.Val(ipConfig.Name) == ipConfigName {
if !wantIPConfig {
log.Info("Found unwanted ipConfig, dropping")
nic.Properties.IPConfigurations = append(nic.Properties.IPConfigurations[:i], nic.Properties.IPConfigurations[i+1:]...)
needUpdate = true
} else {
if different(ipConfig, expectedConfig) {
log.Info("Found target ipConfig with different configurations, dropping")
needUpdate = true
nic.Properties.IPConfigurations = append(nic.Properties.IPConfigurations[:i], nic.Properties.IPConfigurations[i+1:]...)
} else {
log.Info("Found expected ipConfig, keeping")
foundConfig = true
}
}
break
}
}
}
}
if wantIPConfig && !foundConfig {
if primaryNic == nil {
return false, fmt.Errorf("vmss(vm) primary network interface not found")
}
primaryNic.Properties.IPConfigurations = append(primaryNic.Properties.IPConfigurations, expectedConfig)
needUpdate = true
}
changed, err := r.reconcileLbBackendPool(lbBackendpoolID, primaryNic)
if err != nil {
return false, err
}
needUpdate = needUpdate || changed
return needUpdate, nil
}
func (r *GatewayVMConfigurationReconciler) reconcileLbBackendPool(
lbBackendpoolID string,
primaryNic *compute.VirtualMachineScaleSetNetworkConfiguration,
) (needUpdate bool, err error) {
if primaryNic == nil {
return false, fmt.Errorf("vmss(vm) primary network interface not found")
}
needBackendPool := false
for _, ipConfig := range primaryNic.Properties.IPConfigurations {
if strings.HasPrefix(to.Val(ipConfig.Name), consts.ManagedResourcePrefix) {
needBackendPool = true
break
}
}
for _, ipConfig := range primaryNic.Properties.IPConfigurations {
if ipConfig.Properties != nil && to.Val(ipConfig.Properties.Primary) {
backendPools := ipConfig.Properties.LoadBalancerBackendAddressPools
for i, backend := range backendPools {
if strings.EqualFold(lbBackendpoolID, to.Val(backend.ID)) {
if !needBackendPool {
backendPools = append(backendPools[:i], backendPools[i+1:]...)
ipConfig.Properties.LoadBalancerBackendAddressPools = backendPools
return true, nil
} else {
return false, nil
}
}
}
if !needBackendPool {
return false, nil
}
backendPools = append(backendPools, &compute.SubResource{ID: to.Ptr(lbBackendpoolID)})
ipConfig.Properties.LoadBalancerBackendAddressPools = backendPools
return true, nil
}
}
return false, fmt.Errorf("vmss(vm) primary ipConfig not found")
}
func (r *GatewayVMConfigurationReconciler) getExpectedIPConfig(
ipConfigName,
ipPrefixID string,
interfaces []*compute.VirtualMachineScaleSetNetworkConfiguration,
) *compute.VirtualMachineScaleSetIPConfiguration {
var subnetID *string
for _, nic := range interfaces {
if nic.Properties != nil && to.Val(nic.Properties.Primary) {
for _, ipConfig := range nic.Properties.IPConfigurations {
if ipConfig.Properties != nil && to.Val(ipConfig.Properties.Primary) {
subnetID = ipConfig.Properties.Subnet.ID
}
}
}
}
var pipConfig *compute.VirtualMachineScaleSetPublicIPAddressConfiguration
if ipPrefixID != "" {
pipConfig = &compute.VirtualMachineScaleSetPublicIPAddressConfiguration{
Name: to.Ptr(ipConfigName),
Properties: &compute.VirtualMachineScaleSetPublicIPAddressConfigurationProperties{
PublicIPPrefix: &compute.SubResource{
ID: to.Ptr(ipPrefixID),
},
},
}
}
return &compute.VirtualMachineScaleSetIPConfiguration{
Name: to.Ptr(ipConfigName),
Properties: &compute.VirtualMachineScaleSetIPConfigurationProperties{
Primary: to.Ptr(false),
PrivateIPAddressVersion: to.Ptr(compute.IPVersionIPv4),
PublicIPAddressConfiguration: pipConfig,
Subnet: &compute.APIEntityReference{
ID: subnetID,
},
},
}
}
func different(ipConfig1, ipConfig2 *compute.VirtualMachineScaleSetIPConfiguration) bool {
if ipConfig1.Properties == nil && ipConfig2.Properties == nil {
return false
}
if ipConfig1.Properties == nil || ipConfig2.Properties == nil {
return true
}
prop1, prop2 := ipConfig1.Properties, ipConfig2.Properties
if to.Val(prop1.Primary) != to.Val(prop2.Primary) ||
to.Val(prop1.PrivateIPAddressVersion) != to.Val(prop2.PrivateIPAddressVersion) {
return true
}
if (prop1.Subnet != nil) != (prop2.Subnet != nil) {
return true
} else if prop1.Subnet != nil && prop2.Subnet != nil && !strings.EqualFold(to.Val(prop1.Subnet.ID), to.Val(prop2.Subnet.ID)) {
return true
}
pip1, pip2 := prop1.PublicIPAddressConfiguration, prop2.PublicIPAddressConfiguration
if (pip1 == nil) != (pip2 == nil) {
return true
} else if pip1 != nil && pip2 != nil {
if to.Val(pip1.Name) != to.Val(pip2.Name) {
return true
} else if (pip1.Properties != nil) != (pip2.Properties != nil) {
return true
} else if pip1.Properties != nil && pip2.Properties != nil {
prefix1, prefix2 := pip1.Properties.PublicIPPrefix, pip2.Properties.PublicIPPrefix
if (prefix1 != nil) != (prefix2 != nil) {
return true
} else if prefix1 != nil && prefix2 != nil && !strings.EqualFold(to.Val(prefix1.ID), to.Val(prefix2.ID)) {
return true
}
}
}
return false
}
func getVMSSResourceGroup(vmConfig *egressgatewayv1alpha1.GatewayVMConfiguration) string {
if vmConfig.Spec.VmssResourceGroup != "" {
return vmConfig.Spec.VmssResourceGroup
}
// return an empty string so that azmanager will use the default resource group in the config
return ""
}