pkg/cni/conf/confmanager.go (304 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT license. package conf import ( "context" "encoding/json" "errors" "fmt" "net" "os" "path/filepath" "sort" "strings" "time" "github.com/fsnotify/fsnotify" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/Azure/kube-egress-gateway/pkg/consts" "github.com/Azure/kube-egress-gateway/pkg/logger" ) var ErrMainCNINotFound error = errors.New("no existing cni plugin configuration file found") type Manager struct { cniConfDir string cniConfFile string cniConfFileTemp string cniUninstallConfigMapName string cniConfWatcher *fsnotify.Watcher exceptionCidrs []string k8sClient client.Client grpcPort int } func NewCNIConfManager(cniConfDir, cniConfFile, exceptionCidrs, cniUninstallConfigMapName string, k8sClient client.Client, grpcPort int) (*Manager, error) { cidrs, err := parseCidrs(exceptionCidrs) if err != nil { return nil, err } watcher, err := newWatcher(cniConfDir) if err != nil { return nil, err } return &Manager{ cniConfDir: cniConfDir, cniConfFile: cniConfFile, cniConfFileTemp: cniConfFile + ".tmp", cniUninstallConfigMapName: cniUninstallConfigMapName, cniConfWatcher: watcher, exceptionCidrs: cidrs, k8sClient: k8sClient, grpcPort: grpcPort, }, nil } func (mgr *Manager) IsReady() bool { log := logger.GetLogger() file := filepath.Join(mgr.cniConfDir, mgr.cniConfFile) if _, err := os.Stat(file); err != nil { if os.IsNotExist(err) { log.Info("cni configuration file not found, skip removing taint") } log.Error(err, "failed to stat cni configuration file in node event handler", "file name", file) return false } return true } func (mgr *Manager) Start(ctx context.Context) error { log := logger.GetLogger() defer func() { log.Info("Stopping cni configuration directory monitoring") if err := mgr.cniConfWatcher.Close(); err != nil { log.Error(err, "failed to close watcher") } }() log.Info("Installing cni configuration") if err := mgr.insertCNIPluginConf(); err != nil { if errors.Is(err, ErrMainCNINotFound) { log.Info("Main CNI config file is missing, continue to watch changes") } else { return err } } log.Info("Start to watch cni configuration changes", "conf directory", mgr.cniConfDir) for { select { case event := <-mgr.cniConfWatcher.Events: if strings.Contains(event.Name, mgr.cniConfFile) { // ignore our cni conf file change itself to avoid loop log.Info("Detected changes in cni configuration file, ignoring...", "change event", event) continue } log.Info("Detected changes in cni configuration directory, regenerating...", "change event", event) if err := mgr.insertCNIPluginConf(); err != nil { log.Error(err, "failed to regenerate cni conf") } case err := <-mgr.cniConfWatcher.Errors: if err != nil { log.Error(err, "failed to watch cni configuration directory changes") } case <-ctx.Done(): if err := mgr.removeCNIPluginConf(); err != nil { log.Error(err, "failed to remove cni configuration file on exit") } return nil } } } func (mgr *Manager) insertCNIPluginConf() error { file, err := findMasterPlugin(mgr.cniConfDir, mgr.cniConfFile) if err != nil { return err } ext := filepath.Ext(file) var rawList map[string]interface{} if ext == ".conflist" { rawList, err = mgr.managePluginFromConfList(file) if err != nil { return err } } else { rawList, err = mgr.managePluginFromConf(file) if err != nil { return err } } newBytes, err := json.MarshalIndent(rawList, "", " ") if err != nil { return fmt.Errorf("failed to marshal bytes into json: %w, bytes: %s", err, string(newBytes)) } tmpFile := filepath.Join(mgr.cniConfDir, mgr.cniConfFileTemp) err = os.WriteFile(tmpFile, newBytes, 0644) if err != nil { return fmt.Errorf("failed to write to tmp file: %w", err) } err = os.Rename(tmpFile, filepath.Join(mgr.cniConfDir, mgr.cniConfFile)) if err != nil { return fmt.Errorf("failed to rename file: %w", err) } return nil } func (mgr *Manager) removeCNIPluginConf() error { log := logger.GetLogger() cm := &corev1.ConfigMap{} cmKey := client.ObjectKey{Name: mgr.cniUninstallConfigMapName, Namespace: os.Getenv(consts.PodNamespaceEnvKey)} err := mgr.k8sClient.Get(context.Background(), cmKey, cm) if err == nil { if cm.Data["uninstall"] == "false" { log.Info("Uninstall flag is NOT set, skip removing cni configuration file") return nil } } else if apierrors.IsNotFound(err) { log.Info(fmt.Sprintf("CNI uninstall configMap (%s/%s) is not found, skip removing cni configuration file", cmKey.Namespace, cmKey.Name)) return nil } else { return fmt.Errorf("failed to get cni uninstall configMap (%s/%s): %w", cmKey.Namespace, cmKey.Name, err) } // only remove the cniConf file when the uninstall configMap is found and uninstall flag is set to true log.Info("Removing cni configuration file...") file := filepath.Join(mgr.cniConfDir, mgr.cniConfFile) if _, err := os.Stat(file); err != nil { if os.IsNotExist(err) { return nil } return fmt.Errorf("failed to stat file %s: %w", file, err) } return retry.OnError( // retry for 10 times, 5s each wait.Backoff{Duration: 5 * time.Second, Steps: 10}, func(err error) bool { return true }, func() error { if err := os.Remove(file); err != nil { return fmt.Errorf("failed to delete file %s: %w", file, err) } log.Info(fmt.Sprintf("successfully removed cni configuration file %s", file)) return nil }, ) } func newWatcher(cniConfDir string) (*fsnotify.Watcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, fmt.Errorf("failed to create new watcher for %q: %v", cniConfDir, err) } if err = watcher.Add(cniConfDir); err != nil { watcher.Close() return nil, fmt.Errorf("failed to add watch on %q: %v", cniConfDir, err) } return watcher, nil } func (mgr *Manager) managePluginFromConf(file string) (map[string]interface{}, error) { bytes, err := os.ReadFile(file) if err != nil { return nil, fmt.Errorf("failed to read cni config file %s: %w", file, err) } rawConf, rawList := make(map[string]interface{}), make(map[string]interface{}) if err = json.Unmarshal(bytes, &rawConf); err != nil { return nil, fmt.Errorf("failed to unmarshal cni config from file %s: %w", file, err) } networkName, ok := rawConf["name"] if !ok { return nil, fmt.Errorf("failed to find network name in %s", file) } rawList["name"] = networkName delete(rawConf, "name") cniVersion, ok := rawConf["cniVersion"] if ok { cniVersion, ok = cniVersion.(string) if !ok { return nil, fmt.Errorf("cniVersion (%v) is not in string format", cniVersion) } rawList["cniVersion"] = cniVersion delete(rawConf, "cniVersion") } var plugins []interface{} plugins = append(plugins, rawConf) plugins = append(plugins, map[string]interface{}{ "type": consts.KubeEgressCNIName, "ipam": map[string]interface{}{"type": consts.KubeEgressIPAMCNIName}, "excludedCIDRs": mgr.exceptionCidrs, "socketPath": fmt.Sprintf("localhost:%d", mgr.grpcPort), }) rawList["plugins"] = plugins return rawList, nil } func (mgr *Manager) managePluginFromConfList(file string) (map[string]interface{}, error) { bytes, err := os.ReadFile(file) if err != nil { return nil, fmt.Errorf("failed to read cni config file %s: %w", file, err) } rawList := make(map[string]interface{}) if err = json.Unmarshal(bytes, &rawList); err != nil { return nil, fmt.Errorf("failed to unmarshal cni config from file %s: %w", file, err) } var plugins []interface{} plug, ok := rawList["plugins"] if !ok { return nil, fmt.Errorf("failed to find plugins in cni config file %s", file) } plugins, ok = plug.([]interface{}) if !ok { return nil, fmt.Errorf("plugins field is not an array in %s", file) } if len(plugins) == 0 { return nil, fmt.Errorf("empty plugin list in cni config file %s", file) } // remove the plugin if it already exists for i, plugin := range plugins { p, ok := plugin.(map[string]interface{}) if !ok { return nil, fmt.Errorf("failed to parse plugin conf in file %s", file) } cniType, ok := p["type"] if !ok { return nil, fmt.Errorf("failed to find type in plugin conf in file %s", file) } cniTypeStr, ok := cniType.(string) if ok && strings.EqualFold(cniTypeStr, consts.KubeEgressCNIName) { plugins = append(plugins[:i], plugins[i+1:]...) break } } // insert kube-egress-gateway-cni at plugins[1] plugins = append(plugins[:1], append([]interface{}{map[string]interface{}{ "type": consts.KubeEgressCNIName, "ipam": map[string]interface{}{"type": consts.KubeEgressIPAMCNIName}, "excludedCIDRs": mgr.exceptionCidrs, "socketPath": fmt.Sprintf("localhost:%d", mgr.grpcPort), }}, plugins[1:]...)...) rawList["plugins"] = plugins return rawList, nil } func findMasterPlugin(cniConfDir, cniConfFile string) (string, error) { var confFiles []string files, err := os.ReadDir(cniConfDir) if err != nil { return "", fmt.Errorf("failed to read cni config directory: %w", err) } for _, file := range files { if !file.Type().IsRegular() { continue } if strings.EqualFold(file.Name(), cniConfFile) { continue } fileExtension := filepath.Ext(file.Name()) if fileExtension == ".conflist" || fileExtension == ".conf" || fileExtension == ".json" { confFiles = append(confFiles, file.Name()) } } if len(confFiles) == 0 { return "", ErrMainCNINotFound } sort.Strings(confFiles) return filepath.Join(cniConfDir, confFiles[0]), nil } func parseCidrs(cidrs string) ([]string, error) { var res []string cidrList := strings.Split(cidrs, ",") for _, cidr := range cidrList { cidr = strings.TrimSpace(cidr) if cidr == "" { continue } _, _, err := net.ParseCIDR(cidr) if err != nil { return nil, fmt.Errorf("cidr %s is not valid: %w", cidr, err) } res = append(res, cidr) } return res, nil }