npm/iptm/iptm.go (399 lines of code) (raw):

// Part of this file is modified from iptables package from Kuberenetes. // https://github.com/kubernetes/kubernetes/blob/master/pkg/util/iptables package iptm import ( "bytes" "fmt" "strconv" "strings" "time" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/util" "github.com/Azure/azure-container-networking/npm/util/ioutil" utilexec "k8s.io/utils/exec" // utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) const ( iptablesErrDoesNotExist int = 1 reconcileChainTimeInMinutes = 5 minLineNumberStringLength int = 3 ) var ( // IptablesAzureChainList contains list of all NPM chains IptablesAzureChainList = []string{ util.IptablesAzureChain, util.IptablesAzureAcceptChain, util.IptablesAzureIngressChain, util.IptablesAzureEgressChain, util.IptablesAzureIngressPortChain, util.IptablesAzureIngressFromChain, util.IptablesAzureEgressPortChain, util.IptablesAzureEgressToChain, util.IptablesAzureIngressDropsChain, util.IptablesAzureEgressDropsChain, } deprecatedJumpToAzureEntry = &IptEntry{ Chain: util.IptablesForwardChain, Specs: []string{ util.IptablesJumpFlag, util.IptablesAzureChain, }, } spaceByte = []byte(" ") ) // IptEntry represents an iptables rule. type IptEntry struct { Command string Name string Chain string Flag string LockWaitTimeInSeconds string Specs []string } // IptablesManager stores iptables entries. type IptablesManager struct { exec utilexec.Interface io ioshim OperationFlag string placeAzureChainFirst bool } func isDropsChain(chainName string) bool { // Check if the chain name is one of the two DROP chains if (chainName == util.IptablesAzureIngressDropsChain) || (chainName == util.IptablesAzureEgressDropsChain) { return true } return false } // NewIptablesManager creates a new instance for IptablesManager object. func NewIptablesManager(exec utilexec.Interface, io ioshim, placeAzureChainFirst bool) *IptablesManager { iptMgr := &IptablesManager{ exec: exec, io: io, OperationFlag: "", placeAzureChainFirst: placeAzureChainFirst, } return iptMgr } // NewIptablesManager creates a new instance for IptablesManager object. // InitNpmChains initializes Azure NPM chains in iptables. func (iptMgr *IptablesManager) InitNpmChains() error { log.Logf("Initializing AZURE-NPM chains.") if err := iptMgr.addAllChains(); err != nil { return err } if err := iptMgr.checkAndAddForwardChain(); err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain. %s", err.Error()) } return iptMgr.addAllRulesToChains() } // UninitNpmChains uninitializes Azure NPM chains in iptables. func (iptMgr *IptablesManager) UninitNpmChains() error { // Remove AZURE-NPM chain from FORWARD chain. iptMgr.OperationFlag = util.IptablesDeletionFlag errCode, err := iptMgr.run(deprecatedJumpToAzureEntry) if errCode != iptablesErrDoesNotExist && err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to delete deprecated jump from FORWARD chain to AZURE-NPM") return err } entry := &IptEntry{ Chain: util.IptablesForwardChain, Specs: []string{ util.IptablesJumpFlag, util.IptablesAzureChain, util.IptablesModuleFlag, util.IptablesCtstateModuleFlag, util.IptablesCtstateFlag, util.IptablesNewState, }, } errCode, err = iptMgr.run(entry) if errCode != iptablesErrDoesNotExist && err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to delete AZURE-NPM from Forward chain") return err } // Clean old NPM chains. This is forward compatible with NPM v2. allAzureChains := append(IptablesAzureChainList, util.IptablesAzureTargetSetsChain, util.IptablesAzureIngressWrongDropsChain, ) currentAzureChains, err := ioutil.AllCurrentAzureChains(iptMgr.exec, util.IptablesDefaultWaitTime) if err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Warning: failed to get all current AZURE-NPM chains, so stale v2 chains may exist") } else { // add any extra current azure chains to the list of all azure chains. for _, chain := range allAzureChains { delete(currentAzureChains, chain) } for chain := range currentAzureChains { allAzureChains = append(allAzureChains, chain) } } iptMgr.OperationFlag = util.IptablesFlushFlag for _, chain := range allAzureChains { entry := &IptEntry{ Chain: chain, } errCode, err := iptMgr.run(entry) if errCode != iptablesErrDoesNotExist && err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to flush iptables chain %s.", chain) } } for _, chain := range allAzureChains { if err := iptMgr.deleteChain(chain); err != nil { return err } } return nil } // Add adds a rule in iptables. func (iptMgr *IptablesManager) Add(entry *IptEntry) error { log.Logf("Adding iptables entry: %+v.", entry) // Since there is a RETURN statement added to each DROP chain, we need to make sure // any new DROP rule added to ingress or egress DROPS chain is added at the BOTTOM if isDropsChain(entry.Chain) { iptMgr.OperationFlag = util.IptablesAppendFlag } else { iptMgr.OperationFlag = util.IptablesInsertionFlag } timer := metrics.StartNewTimer() _, err := iptMgr.run(entry) metrics.RecordACLRuleExecTime(timer) // record execution time regardless of failure if err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to create iptables rules.") return err } metrics.IncNumACLRules() return nil } // Delete removes a rule in iptables. func (iptMgr *IptablesManager) Delete(entry *IptEntry) error { log.Logf("Deleting iptables entry: %+v", entry) exists, err := iptMgr.exists(entry) if err != nil { return err } if !exists { return nil } iptMgr.OperationFlag = util.IptablesDeletionFlag if _, err := iptMgr.run(entry); err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to delete iptables rules.") return err } metrics.DecNumACLRules() return nil } func (iptMgr *IptablesManager) ReconcileIPTables(stopCh <-chan struct{}) { // (TODO) Ideally, we only need this when network policy installs iptables // Control below two functions with InitNpmChains and UninitNpmChains functions together go iptMgr.reconcileChains(stopCh) } // checkAndAddForwardChain initializes and reconciles Azure-NPM chain in right order func (iptMgr *IptablesManager) checkAndAddForwardChain() error { // TODO Adding this chain is printing error messages try to clean it up if err := iptMgr.addChain(util.IptablesAzureChain); err != nil { return err } // Insert AZURE-NPM chain to FORWARD chain. entry := &IptEntry{ Chain: util.IptablesForwardChain, Specs: []string{ util.IptablesJumpFlag, util.IptablesAzureChain, util.IptablesModuleFlag, util.IptablesCtstateModuleFlag, util.IptablesCtstateFlag, util.IptablesNewState, }, } var index int var kubeServicesLine int if !iptMgr.placeAzureChainFirst { // retrieve KUBE-SERVICES index var err error kubeServicesLine, err = iptMgr.getChainLineNumber(util.IptablesKubeServicesChain, util.IptablesForwardChain) if err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to get index of KUBE-SERVICES in FORWARD chain with error: %s", err.Error()) return err } index = kubeServicesLine + 1 // insert the jump to AZURE-NPM after the jump to KUBE-SERVICES } exists, err := iptMgr.exists(entry) if err != nil { return err } if !exists { // position Azure-NPM chain after Kube-Forward and Kube-Service chains if it exists iptMgr.OperationFlag = util.IptablesInsertionFlag if !iptMgr.placeAzureChainFirst { entry.Specs = append([]string{strconv.Itoa(index)}, entry.Specs...) } if _, err = iptMgr.run(entry); err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain.") return err } return nil } npmChainLine, err := iptMgr.getChainLineNumber(util.IptablesAzureChain, util.IptablesForwardChain) if err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to get index of AZURE-NPM in FORWARD chain with error: %s", err.Error()) return err } if iptMgr.placeAzureChainFirst { if npmChainLine == 1 { return nil } } else { // Kube-services line number is less than npm chain line number then all good if kubeServicesLine < npmChainLine || kubeServicesLine <= 0 { return nil } } errCode := 0 // NPM Chain number is less than KUBE-SERVICES then // delete existing NPM chain and add it in the right order iptMgr.OperationFlag = util.IptablesDeletionFlag metrics.SendErrorLogAndMetric(util.IptmID, "Info: Reconciler deleting and re-adding AZURE-NPM in FORWARD table.") if errCode, err = iptMgr.run(entry); err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to delete AZURE-NPM chain from FORWARD chain with error code %d.", errCode) return err } iptMgr.OperationFlag = util.IptablesInsertionFlag if !iptMgr.placeAzureChainFirst { // Reduce index for deleted AZURE-NPM chain index-- entry.Specs = append([]string{strconv.Itoa(index)}, entry.Specs...) } if errCode, err = iptMgr.run(entry); err != nil { metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to add AZURE-NPM chain to FORWARD chain with error code %d.", errCode) return err } return nil } // reconcileChains checks for ordering of AZURE-NPM chain in FORWARD chain periodically. func (iptMgr *IptablesManager) reconcileChains(stopCh <-chan struct{}) { ticker := time.NewTicker(time.Minute * time.Duration(reconcileChainTimeInMinutes)) defer ticker.Stop() for { select { case <-stopCh: return case <-ticker.C: if err := iptMgr.checkAndAddForwardChain(); err != nil { metrics.SendErrorLogAndMetric(util.NpmID, "Error: failed to reconcileChains Azure-NPM due to %s", err.Error()) } metrics.SendHeartbeatWithNumPolicies() } } } // addAllRulesToChains checks and adds all the rules in NPM chains func (iptMgr *IptablesManager) addAllRulesToChains() error { allDefaultRules := getAllDefaultRules() for _, rule := range allDefaultRules { entry := &IptEntry{ Chain: rule[0], Specs: rule[1:], } exists, err := iptMgr.exists(entry) if err != nil { return err } if !exists { iptMgr.OperationFlag = util.IptablesAppendFlag if _, err = iptMgr.run(entry); err != nil { msg := "Error: failed to add %s to parent chain %s" switch { case len(rule) == 3: // 0th index is parent chain and 2nd is chain to be added msg = fmt.Sprintf(msg, rule[2], rule[0]) case len(rule) > 3: // last element is comment msg = fmt.Sprintf(msg, rule[len(rule)-1], rule[0]) default: msg = "Error: failed to add main chains with invalid rule length" } metrics.SendErrorLogAndMetric(util.IptmID, msg) return err } } } return nil } // Exists checks if a rule exists in iptables. func (iptMgr *IptablesManager) exists(entry *IptEntry) (bool, error) { iptMgr.OperationFlag = util.IptablesCheckFlag returnCode, err := iptMgr.run(entry) if err == nil { return true, nil } if returnCode == iptablesErrDoesNotExist { return false, nil } return false, err } // AddAllChains adds all NPM chains func (iptMgr *IptablesManager) addAllChains() error { // Add all secondary Chains for _, chainToAdd := range IptablesAzureChainList { if err := iptMgr.addChain(chainToAdd); err != nil { return err } } return nil } // AddChain adds a chain to iptables. func (iptMgr *IptablesManager) addChain(chain string) error { entry := &IptEntry{ Chain: chain, } iptMgr.OperationFlag = util.IptablesChainCreationFlag errCode, err := iptMgr.run(entry) if err != nil { if errCode == iptablesErrDoesNotExist { log.Logf("Chain already exists %s.", entry.Chain) return nil } metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to create iptables chain %s.", entry.Chain) return err } return nil } // GetChainLineNumber given a Chain and its parent chain returns line number func (iptMgr *IptablesManager) getChainLineNumber(chain string, parentChain string) (int, error) { var ( output []byte err error ) cmdName := util.Iptables cmdArgs := []string{"-t", "filter", "-n", "--list", parentChain, "--line-numbers"} iptFilterEntries := iptMgr.exec.Command(cmdName, cmdArgs...) grep := iptMgr.exec.Command("grep", chain) pipe, err := iptFilterEntries.StdoutPipe() if err != nil { return 0, err } defer pipe.Close() grep.SetStdin(pipe) if err = iptFilterEntries.Start(); err != nil { return 0, err } // Without this wait, defunct iptable child process are created defer iptFilterEntries.Wait() if output, err = grep.CombinedOutput(); err != nil { // grep returns err status 1 if not found return 0, nil } // NOTE: v2 has different behavior for unexpected grep outputs (v2 will throw an error) // want to fix the bug here (not detecting numbers with 2+ digits), but don't want to modify the error-throwing behavior if len(output) >= minLineNumberStringLength { firstSpaceIndex := bytes.Index(output, spaceByte) if firstSpaceIndex > 0 && firstSpaceIndex < len(output) { lineNumberString := string(output[0:firstSpaceIndex]) lineNum, _ := strconv.Atoi(lineNumberString) return lineNum, nil } } return 0, nil } // DeleteChain deletes a chain from iptables. func (iptMgr *IptablesManager) deleteChain(chain string) error { entry := &IptEntry{ Chain: chain, } iptMgr.OperationFlag = util.IptablesDestroyFlag errCode, err := iptMgr.run(entry) if err != nil { if errCode == iptablesErrDoesNotExist { log.Logf("Chain doesn't exist %s.", entry.Chain) return nil } metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to delete iptables chain %s.", entry.Chain) return err } return nil } // Run execute an iptables command to update iptables. func (iptMgr *IptablesManager) run(entry *IptEntry) (int, error) { cmdName := entry.Command if cmdName == "" { cmdName = util.Iptables } if entry.LockWaitTimeInSeconds == "" { entry.LockWaitTimeInSeconds = util.IptablesDefaultWaitTime } cmdArgs := append([]string{util.IptablesWaitFlag, entry.LockWaitTimeInSeconds, iptMgr.OperationFlag, entry.Chain}, entry.Specs...) if iptMgr.OperationFlag != util.IptablesCheckFlag { log.Logf("Executing iptables command %s %v", cmdName, cmdArgs) } output, err := iptMgr.exec.Command(cmdName, cmdArgs...).CombinedOutput() if msg, failed := err.(utilexec.ExitError); failed { errCode := msg.ExitStatus() if errCode > 0 && iptMgr.OperationFlag != util.IptablesCheckFlag { msgStr := strings.TrimSuffix(string(output), "\n") if strings.Contains(msgStr, "Chain already exists") && iptMgr.OperationFlag == util.IptablesChainCreationFlag { return 0, nil } metrics.SendErrorLogAndMetric(util.IptmID, "Error: There was an error running command: [%s %v] Stderr: [%v, %s]", cmdName, strings.Join(cmdArgs, " "), err, msgStr) } return errCode, err } return 0, nil }