npm/cmd/start.go (204 lines of code) (raw):
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package main
import (
"fmt"
"math/rand"
"time"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/npm"
npmconfig "github.com/Azure/azure-container-networking/npm/config"
restserver "github.com/Azure/azure-container-networking/npm/http/server"
"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies"
"github.com/Azure/azure-container-networking/npm/pkg/models"
"github.com/Azure/azure-container-networking/npm/util"
"github.com/spf13/cobra"
"github.com/spf13/viper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sversion "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
"k8s.io/utils/exec"
)
var npmV2DataplaneCfg = &dataplane.Config{
IPSetManagerCfg: &ipsets.IPSetManagerCfg{
// NOTE: NetworkName and IPSetMode must be set later by the npm ConfigMap or default config
},
PolicyManagerCfg: &policies.PolicyManagerCfg{
PolicyMode: policies.IPSetPolicyMode,
// NOTE: PlaceAzureChainFirst must be set later by the npm ConfigMap or default config
},
}
func newStartNPMCmd() *cobra.Command {
// getTuplesCmd represents the getTuples command
startNPMCmd := &cobra.Command{
Use: "start",
Short: "Starts the Azure NPM process",
RunE: func(cmd *cobra.Command, args []string) error {
config := &npmconfig.Config{}
err := viper.Unmarshal(config)
if err != nil {
return fmt.Errorf("failed to load config with error: %w", err)
}
flags := npmconfig.Flags{
KubeConfigPath: viper.GetString(flagKubeConfigPath),
}
// start is blocking, unless there's an error
err = start(*config, flags)
metrics.Close()
return err
},
}
startNPMCmd.Flags().String(flagKubeConfigPath, flagDefaults[flagKubeConfigPath], "path to kubeconfig")
return startNPMCmd
}
func start(config npmconfig.Config, flags npmconfig.Flags) error {
klog.Infof("loaded config: %+v", config)
if util.IsWindowsDP() {
config.Toggles.EnableV2NPM = true
klog.Infof("NPM is running on Windows Dataplane. Enabling V2 NPM")
} else {
klog.Infof("NPM is running on Linux Dataplane")
}
klog.Infof("starting NPM version %d with image %s", config.NPMVersion(), version)
var err error
err = initLogging()
if err != nil {
return err
}
klog.Infof("initializing metrics")
metrics.InitializeAll()
// Create the kubernetes client
var k8sConfig *rest.Config
if flags.KubeConfigPath == "" {
klog.Infof("loading in cluster kubeconfig")
k8sConfig, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to load in cluster config: %w", err)
}
} else {
klog.Infof("loading kubeconfig from flag: %s", flags.KubeConfigPath)
k8sConfig, err = clientcmd.BuildConfigFromFlags("", flags.KubeConfigPath)
if err != nil {
return fmt.Errorf("failed to load kubeconfig [%s] with err config: %w", flags.KubeConfigPath, err)
}
}
// Creates the clientset
clientset, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
klog.Infof("clientset creation failed with error %v.", err)
return fmt.Errorf("failed to generate clientset with cluster config: %w", err)
}
// Setting reSyncPeriod
minResyncPeriod := time.Duration(config.ResyncPeriodInMinutes) * time.Minute
// Adding some randomness so all NPM pods will not request for info at once.
factor := rand.Float64() + 1 //nolint
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
klog.Infof("Resync period for NPM pod is set to %d.", int(resyncPeriod/time.Minute))
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
podFactory := factory // // Separate podFactory for different versions in npm and npm lite.
// npm-lite -> daemon set will listen to pods only in its own node
if config.Toggles.EnableNPMLite {
podFactory = informers.NewSharedInformerFactoryWithOptions(
clientset,
resyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
// Use field selector to filter pods based on their assigned node
klog.Infof("NPM agent is listening to pods only under its node")
options.FieldSelector = "spec.nodeName=" + models.GetNodeName()
}),
)
}
logLevel := config.LogLevel
if logLevel == "" {
logLevel = npmconfig.DefaultConfig.LogLevel
}
err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata(), logLevel)
if err != nil {
klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err)
}
var dp dataplane.GenericDataplane
stopChannel := wait.NeverStop
if config.Toggles.EnableV2NPM {
// update the dataplane config
npmV2DataplaneCfg.EnableNPMLite = config.Toggles.EnableNPMLite
npmV2DataplaneCfg.MaxBatchedACLsPerPod = config.MaxBatchedACLsPerPod
npmV2DataplaneCfg.NetPolInBackground = config.Toggles.NetPolInBackground
if config.NetPolInvervalInMilliseconds > 0 {
npmV2DataplaneCfg.NetPolInterval = time.Duration(config.NetPolInvervalInMilliseconds * int(time.Millisecond))
} else {
npmV2DataplaneCfg.NetPolInterval = time.Duration(npmconfig.DefaultConfig.NetPolInvervalInMilliseconds * int(time.Millisecond))
}
if config.MaxPendingNetPols > 0 {
npmV2DataplaneCfg.MaxPendingNetPols = config.MaxPendingNetPols
} else {
npmV2DataplaneCfg.MaxPendingNetPols = npmconfig.DefaultConfig.MaxPendingNetPols
}
npmV2DataplaneCfg.ApplyInBackground = config.Toggles.ApplyInBackground
if config.ApplyMaxBatches > 0 {
npmV2DataplaneCfg.ApplyMaxBatches = config.ApplyMaxBatches
} else {
npmV2DataplaneCfg.ApplyMaxBatches = npmconfig.DefaultConfig.ApplyMaxBatches
}
if config.ApplyIntervalInMilliseconds > 0 {
npmV2DataplaneCfg.ApplyInterval = time.Duration(config.ApplyIntervalInMilliseconds * int(time.Millisecond))
} else {
npmV2DataplaneCfg.ApplyInterval = time.Duration(npmconfig.DefaultConfig.ApplyIntervalInMilliseconds * int(time.Millisecond))
}
if config.WindowsNetworkName == "" {
npmV2DataplaneCfg.NetworkName = util.AzureNetworkName
} else {
npmV2DataplaneCfg.NetworkName = config.WindowsNetworkName
}
npmV2DataplaneCfg.PlaceAzureChainFirst = config.Toggles.PlaceAzureChainFirst
if config.Toggles.ApplyIPSetsOnNeed {
npmV2DataplaneCfg.IPSetMode = ipsets.ApplyOnNeed
} else {
npmV2DataplaneCfg.IPSetMode = ipsets.ApplyAllIPSets
}
var nodeIP string
if util.IsWindowsDP() {
nodeIP, err = util.NodeIP()
if err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "error: failed to get node IP while booting up: %v", err)
return fmt.Errorf("failed to get node IP while booting up: %w", err)
}
klog.Infof("node IP is %s", nodeIP)
}
npmV2DataplaneCfg.NodeIP = nodeIP
dp, err = dataplane.NewDataPlane(models.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel)
if err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "error: failed to create dataplane with error %v", err)
return fmt.Errorf("failed to create dataplane with error %w", err)
}
dp.RunPeriodicTasks()
}
k8sServerVersion := k8sServerVersion(clientset)
npMgr := npm.NewNetworkPolicyManager(config, factory, podFactory, dp, exec.New(), version, k8sServerVersion)
go restserver.NPMRestServerListenAndServe(config, npMgr)
metrics.SendLog(util.NpmID, "starting NPM", metrics.PrintLog)
if err = npMgr.Start(config, stopChannel); err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "Failed to start NPM due to %+v", err)
return fmt.Errorf("failed to start with err: %w", err)
}
select {}
}
func initLogging() error {
log.SetName("azure-npm")
log.SetLevel(log.LevelInfo)
if err := log.SetTargetLogDirectory(log.TargetStdout, ""); err != nil {
log.Logf("Failed to configure logging, err:%v.", err)
return fmt.Errorf("%w", err)
}
return nil
}
func k8sServerVersion(kubeclientset kubernetes.Interface) *k8sversion.Info {
var err error
var serverVersion *k8sversion.Info
for ticker, start := time.NewTicker(1*time.Second).C, time.Now(); time.Since(start) < time.Minute*1; {
<-ticker
serverVersion, err = kubeclientset.Discovery().ServerVersion()
if err == nil {
break
}
}
if err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "Error: failed to retrieving kubernetes version with err: %s", err.Error())
}
return serverVersion
}