in npm/cmd/start.go [72:224]
func start(config npmconfig.Config, flags npmconfig.Flags) error {
klog.Infof("loaded config: %+v", config)
if util.IsWindowsDP() {
config.Toggles.EnableV2NPM = true
klog.Infof("NPM is running on Windows Dataplane. Enabling V2 NPM")
} else {
klog.Infof("NPM is running on Linux Dataplane")
}
klog.Infof("starting NPM version %d with image %s", config.NPMVersion(), version)
var err error
err = initLogging()
if err != nil {
return err
}
klog.Infof("initializing metrics")
metrics.InitializeAll()
// Create the kubernetes client
var k8sConfig *rest.Config
if flags.KubeConfigPath == "" {
klog.Infof("loading in cluster kubeconfig")
k8sConfig, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to load in cluster config: %w", err)
}
} else {
klog.Infof("loading kubeconfig from flag: %s", flags.KubeConfigPath)
k8sConfig, err = clientcmd.BuildConfigFromFlags("", flags.KubeConfigPath)
if err != nil {
return fmt.Errorf("failed to load kubeconfig [%s] with err config: %w", flags.KubeConfigPath, err)
}
}
// Creates the clientset
clientset, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
klog.Infof("clientset creation failed with error %v.", err)
return fmt.Errorf("failed to generate clientset with cluster config: %w", err)
}
// Setting reSyncPeriod
minResyncPeriod := time.Duration(config.ResyncPeriodInMinutes) * time.Minute
// Adding some randomness so all NPM pods will not request for info at once.
factor := rand.Float64() + 1 //nolint
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
klog.Infof("Resync period for NPM pod is set to %d.", int(resyncPeriod/time.Minute))
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
podFactory := factory // // Separate podFactory for different versions in npm and npm lite.
// npm-lite -> daemon set will listen to pods only in its own node
if config.Toggles.EnableNPMLite {
podFactory = informers.NewSharedInformerFactoryWithOptions(
clientset,
resyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
// Use field selector to filter pods based on their assigned node
klog.Infof("NPM agent is listening to pods only under its node")
options.FieldSelector = "spec.nodeName=" + models.GetNodeName()
}),
)
}
logLevel := config.LogLevel
if logLevel == "" {
logLevel = npmconfig.DefaultConfig.LogLevel
}
err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata(), logLevel)
if err != nil {
klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err)
}
var dp dataplane.GenericDataplane
stopChannel := wait.NeverStop
if config.Toggles.EnableV2NPM {
// update the dataplane config
npmV2DataplaneCfg.EnableNPMLite = config.Toggles.EnableNPMLite
npmV2DataplaneCfg.MaxBatchedACLsPerPod = config.MaxBatchedACLsPerPod
npmV2DataplaneCfg.NetPolInBackground = config.Toggles.NetPolInBackground
if config.NetPolInvervalInMilliseconds > 0 {
npmV2DataplaneCfg.NetPolInterval = time.Duration(config.NetPolInvervalInMilliseconds * int(time.Millisecond))
} else {
npmV2DataplaneCfg.NetPolInterval = time.Duration(npmconfig.DefaultConfig.NetPolInvervalInMilliseconds * int(time.Millisecond))
}
if config.MaxPendingNetPols > 0 {
npmV2DataplaneCfg.MaxPendingNetPols = config.MaxPendingNetPols
} else {
npmV2DataplaneCfg.MaxPendingNetPols = npmconfig.DefaultConfig.MaxPendingNetPols
}
npmV2DataplaneCfg.ApplyInBackground = config.Toggles.ApplyInBackground
if config.ApplyMaxBatches > 0 {
npmV2DataplaneCfg.ApplyMaxBatches = config.ApplyMaxBatches
} else {
npmV2DataplaneCfg.ApplyMaxBatches = npmconfig.DefaultConfig.ApplyMaxBatches
}
if config.ApplyIntervalInMilliseconds > 0 {
npmV2DataplaneCfg.ApplyInterval = time.Duration(config.ApplyIntervalInMilliseconds * int(time.Millisecond))
} else {
npmV2DataplaneCfg.ApplyInterval = time.Duration(npmconfig.DefaultConfig.ApplyIntervalInMilliseconds * int(time.Millisecond))
}
if config.WindowsNetworkName == "" {
npmV2DataplaneCfg.NetworkName = util.AzureNetworkName
} else {
npmV2DataplaneCfg.NetworkName = config.WindowsNetworkName
}
npmV2DataplaneCfg.PlaceAzureChainFirst = config.Toggles.PlaceAzureChainFirst
if config.Toggles.ApplyIPSetsOnNeed {
npmV2DataplaneCfg.IPSetMode = ipsets.ApplyOnNeed
} else {
npmV2DataplaneCfg.IPSetMode = ipsets.ApplyAllIPSets
}
var nodeIP string
if util.IsWindowsDP() {
nodeIP, err = util.NodeIP()
if err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "error: failed to get node IP while booting up: %v", err)
return fmt.Errorf("failed to get node IP while booting up: %w", err)
}
klog.Infof("node IP is %s", nodeIP)
}
npmV2DataplaneCfg.NodeIP = nodeIP
dp, err = dataplane.NewDataPlane(models.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel)
if err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "error: failed to create dataplane with error %v", err)
return fmt.Errorf("failed to create dataplane with error %w", err)
}
dp.RunPeriodicTasks()
}
k8sServerVersion := k8sServerVersion(clientset)
npMgr := npm.NewNetworkPolicyManager(config, factory, podFactory, dp, exec.New(), version, k8sServerVersion)
go restserver.NPMRestServerListenAndServe(config, npMgr)
metrics.SendLog(util.NpmID, "starting NPM", metrics.PrintLog)
if err = npMgr.Start(config, stopChannel); err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "Failed to start NPM due to %+v", err)
return fmt.Errorf("failed to start with err: %w", err)
}
select {}
}