npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go (343 lines of code) (raw):

package goalstateprocessor import ( "bytes" "context" "fmt" cp "github.com/Azure/azure-container-networking/npm/pkg/controlplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/pkg/protos" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "k8s.io/klog" ) var ErrPodOrNodeNameNil = fmt.Errorf("both pod and node name must be set") type GoalStateProcessor struct { ctx context.Context cancel context.CancelFunc nodeID string podName string dp dataplane.GenericDataplane inputChannel chan *protos.Events backoffChannel chan *protos.Events } func NewGoalStateProcessor( ctx context.Context, nodeID string, podName string, inputChan chan *protos.Events, dp dataplane.GenericDataplane) (*GoalStateProcessor, error) { if nodeID == "" || podName == "" { return nil, ErrPodOrNodeNameNil } klog.Infof("Creating GoalStateProcessor for node %s", nodeID) return &GoalStateProcessor{ ctx: ctx, nodeID: nodeID, podName: podName, dp: dp, inputChannel: inputChan, backoffChannel: make(chan *protos.Events), }, nil } // Start kicks off the GoalStateProcessor func (gsp *GoalStateProcessor) Start(stopCh <-chan struct{}) { klog.Infof("Starting GoalStateProcessor for node %s", gsp.nodeID) go gsp.run(stopCh) } // Stop stops the GoalStateProcessor func (gsp *GoalStateProcessor) Stop() { klog.Infof("Stopping GoalStateProcessor for node %s", gsp.nodeID) gsp.cancel() } func (gsp *GoalStateProcessor) run(stopCh <-chan struct{}) { klog.Infof("Starting dataplane for node %s", gsp.nodeID) for gsp.processNext(stopCh) { } } func (gsp *GoalStateProcessor) processNext(stopCh <-chan struct{}) bool { select { // TODO benchmark how many events can stay in pipeline as we work // on a previous event case inputEvents := <-gsp.inputChannel: // TODO remove this large print later klog.Infof("Received event %s", inputEvents) gsp.process(inputEvents) return true case backoffEvents := <-gsp.backoffChannel: // For now keep it simple. Do not worry about backoff events // but if we need to handle them, we can do it here. // TODO remove this large print later klog.Infof("Received backoff event %s", backoffEvents) gsp.process(backoffEvents) return true case <-gsp.ctx.Done(): klog.Infof("GoalStateProcessor for node %s received context Done", gsp.nodeID) return false case <-stopCh: klog.Infof("GoalStateProcessor for node %s stopped", gsp.nodeID) return false } } func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { klog.Infof("Processing event") // apply dataplane after syncing defer func() { dperr := gsp.dp.ApplyDataPlane() if dperr != nil { klog.Errorf("Apply Dataplane failed with %v", dperr) } }() payload := inputEvent.GetPayload() if !validatePayload(payload) { klog.Warningf("Empty payload in event %s", inputEvent) return } switch inputEvent.GetEventType() { case protos.Events_Hydration: // in hydration event, any thing in local cache and not in event should be deleted. klog.Infof("Received hydration event") gsp.processHydrationEvent(payload) case protos.Events_GoalState: klog.Infof("Received goal state event") gsp.processGoalStateEvent(payload) default: klog.Errorf("Received unknown event type %s", inputEvent.GetEventType()) } } func (gsp *GoalStateProcessor) processHydrationEvent(payload map[string]*protos.GoalState) { // Hydration events are sent when the daemon first starts up, or a reconnection to controller happens. // In this case, the controller will send a current state of the cache down to daemon. // Daemon will need to calculate what updates and deleted have been missed and send them to the dataplane. // Sequence of processing will be: // Apply IPsets // Apply Policies // Get all existing IPSets and policies in the dataplane // Delete cached Policies not in event // Delete cached IPSets (without references) not in the event var appendedIPSets map[string]struct{} var appendedPolicies map[string]struct{} var err error if ipsetApplyPayload, ok := payload[cp.IpsetApply]; ok { appendedIPSets, err = gsp.processIPSetsApplyEvent(ipsetApplyPayload) if err != nil { klog.Errorf("Error processing IPSET apply HYDRATION event %s", err) } } if policyApplyPayload, ok := payload[cp.PolicyApply]; ok { appendedPolicies, err = gsp.processPolicyApplyEvent(policyApplyPayload) if err != nil { klog.Errorf("Error processing POLICY apply HYDRATION event %s", err) } } cachedPolicyKeys := gsp.dp.GetAllPolicies() toDeletePolicies := make([]string, 0) if appendedPolicies == nil { toDeletePolicies = cachedPolicyKeys } else { for _, policy := range cachedPolicyKeys { if _, ok := appendedPolicies[policy]; !ok { toDeletePolicies = append(toDeletePolicies, policy) } } } if len(toDeletePolicies) > 0 { klog.Infof("Deleting %d policies", len(toDeletePolicies)) err = gsp.processPolicyRemoveEvent(toDeletePolicies) if err != nil { klog.Errorf("Error processing POLICY remove HYDRATION event %s", err) } } cachedIPSetNames := gsp.dp.GetAllIPSets() hashedsetnames := make([]string, len(cachedIPSetNames)) toDeleteIPSets := make([]string, 0) i := 0 for name := range cachedIPSetNames { hashedsetnames[i] = name i++ } if appendedIPSets == nil { toDeleteIPSets = hashedsetnames } else { for _, ipset := range cachedIPSetNames { if _, ok := appendedIPSets[ipset]; !ok { toDeleteIPSets = append(toDeleteIPSets, ipset) } } } if len(toDeleteIPSets) > 0 { klog.Infof("Deleting %d ipsets", len(toDeleteIPSets)) gsp.processIPSetsRemoveEvent(toDeleteIPSets, util.ForceDelete) } } func (gsp *GoalStateProcessor) processGoalStateEvent(payload map[string]*protos.GoalState) { // Process these individual buckets in order // 1. Apply IPSET // 2. Apply POLICY // 3. Remove POLICY // 4. Remove IPSET if ipsetApplyPayload, ok := payload[cp.IpsetApply]; ok { _, err := gsp.processIPSetsApplyEvent(ipsetApplyPayload) if err != nil { klog.Errorf("Error processing IPSET apply event %s", err) } } if policyApplyPayload, ok := payload[cp.PolicyApply]; ok { _, err := gsp.processPolicyApplyEvent(policyApplyPayload) if err != nil { klog.Errorf("Error processing POLICY apply event %s", err) } } if policyRemovePayload, ok := payload[cp.PolicyRemove]; ok { payload := bytes.NewBuffer(policyRemovePayload.GetData()) netpolNames, err := cp.DecodeStrings(payload) if err != nil { klog.Errorf("Error processing POLICY remove event, failed to decode Policy remove event %s", err) } err = gsp.processPolicyRemoveEvent(netpolNames) if err != nil { klog.Errorf("Error processing POLICY remove event %s", err) } } if ipsetRemovePayload, ok := payload[cp.IpsetRemove]; ok { payload := bytes.NewBuffer(ipsetRemovePayload.GetData()) ipsetNames, err := cp.DecodeStrings(payload) if err != nil { klog.Errorf("Error processing IPSET remove event, failed to decode IPSet remove event: %s", err) } gsp.processIPSetsRemoveEvent(ipsetNames, util.SoftDelete) } } func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) (map[string]struct{}, error) { payload := bytes.NewBuffer(goalState.GetData()) payloadIPSets, err := cp.DecodeControllerIPSets(payload) if err != nil { return nil, npmerrors.SimpleErrorWrapper("failed to decode IPSet apply event", err) } klog.Infof("Processing IPSet apply event %v", payloadIPSets) appendedIPSets := make(map[string]struct{}, len(payloadIPSets)) for _, ipset := range payloadIPSets { if ipset == nil { klog.Warningf("Empty IPSet apply event") continue } klog.Infof("ipset: %v", ipset) ipsetName := ipset.GetPrefixName() klog.Infof("Processing %s IPSET apply event", ipsetName) cachedIPSet := gsp.dp.GetIPSet(ipsetName) if cachedIPSet == nil { klog.Infof("IPSet %s not found in cache, adding to cache", ipsetName) } switch ipset.GetSetKind() { case ipsets.HashSet: err = gsp.applySets(ipset, cachedIPSet) if err != nil { return nil, err } case ipsets.ListSet: err = gsp.applyLists(ipset, cachedIPSet) if err != nil { return nil, err } case ipsets.UnknownKind: return nil, npmerrors.SimpleError( fmt.Sprintf("failed to decode IPSet apply event: Unknown IPSet kind %s", cachedIPSet.Kind), ) } appendedIPSets[ipsetName] = struct{}{} } return appendedIPSets, nil } func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { if len(ipSet.IPPodMetadata) == 0 { gsp.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipSet.GetMetadata()}) return nil } setMetadata := ipSet.GetMetadata() for _, podMetadata := range ipSet.IPPodMetadata { err := gsp.dp.AddToSets([]*ipsets.IPSetMetadata{setMetadata}, podMetadata) if err != nil { return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at AddToSet.", err) } } if cachedIPSet != nil { for podIP, cachedPodKey := range cachedIPSet.IPPodKey { if _, ok := ipSet.IPPodMetadata[podIP]; !ok { err := gsp.dp.RemoveFromSets([]*ipsets.IPSetMetadata{setMetadata}, dataplane.NewPodMetadata(podIP, cachedPodKey, "")) if err != nil { return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at RemoveFromSets.", err) } } } } return nil } func (gsp *GoalStateProcessor) applyLists(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { if len(ipSet.MemberIPSets) == 0 { gsp.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipSet.GetMetadata()}) return nil } setMetadata := ipSet.GetMetadata() membersToAdd := make([]*ipsets.IPSetMetadata, len(ipSet.MemberIPSets)) idx := 0 for _, memberIPSet := range ipSet.MemberIPSets { membersToAdd[idx] = memberIPSet idx++ } err := gsp.dp.AddToLists([]*ipsets.IPSetMetadata{setMetadata}, membersToAdd) if err != nil { return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at AddToLists.", err) } if cachedIPSet != nil { toDeleteMembers := make([]*ipsets.IPSetMetadata, 0) for _, memberSet := range cachedIPSet.MemberIPSets { if _, ok := ipSet.MemberIPSets[memberSet.Name]; !ok { toDeleteMembers = append(toDeleteMembers, memberSet.GetSetMetadata()) } } if len(toDeleteMembers) > 0 { err := gsp.dp.RemoveFromList(setMetadata, toDeleteMembers) if err != nil { return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at RemoveFromList.", err) } } } return nil } func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(ipsetNames []string, forceDelete util.DeleteOption) { for _, ipsetName := range ipsetNames { if ipsetName == "" { klog.Warningf("Empty IPSet remove event") continue } klog.Infof("Processing %s IPSET remove event", ipsetName) cachedIPSet := gsp.dp.GetIPSet(ipsetName) if cachedIPSet == nil { klog.Infof("IPSet %s not found in cache, ignoring delete call.", ipsetName) continue } gsp.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedIPSet.Name, cachedIPSet.Type), forceDelete) } } func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalState) (map[string]struct{}, error) { payload := bytes.NewBuffer(goalState.GetData()) netpols, err := cp.DecodeNPMNetworkPolicies(payload) if err != nil { return nil, npmerrors.SimpleErrorWrapper("failed to decode Policy apply event", err) } appendedPolicies := make(map[string]struct{}, len(netpols)) for _, netpol := range netpols { if netpol == nil { klog.Warningf("Empty Policy apply event") continue } klog.Infof("Processing %s Policy ADD event", netpol.PolicyKey) klog.Infof("Netpol: %v", netpol) err = gsp.dp.UpdatePolicy(netpol) if err != nil { klog.Errorf("Error applying policy %s to dataplane with error: %s", netpol.PolicyKey, err.Error()) return nil, npmerrors.SimpleErrorWrapper("failed update policy event", err) } appendedPolicies[netpol.PolicyKey] = struct{}{} } return appendedPolicies, nil } func (gsp *GoalStateProcessor) processPolicyRemoveEvent(netpolNames []string) error { for _, netpolName := range netpolNames { klog.Infof("Processing %s Policy remove event", netpolName) if netpolName == "" { klog.Warningf("Empty Policy remove event") continue } err := gsp.dp.RemovePolicy(netpolName) if err != nil { klog.Errorf("Error removing policy %s from dataplane with error: %s", netpolName, err.Error()) return npmerrors.SimpleErrorWrapper("failed remove policy event", err) } } return nil } func validatePayload(payload map[string]*protos.GoalState) bool { for _, v := range payload { if len(v.GetData()) != 0 { return true } } return false }