func()

in pkg/azure/client.go [127:209]


func (az *azClient) processRequest(ctx context.Context, req ctrl.Request, nodesWithFwTaint []*corev1.Node) (err error) {
	processEventStart := time.Now()

	var erulesSourceAddresses = make(map[string][]string)
	var ipGroupIds = make(map[string]string)
	erulesList := &azurefirewallrulesv1.AzureFirewallRulesList{}
	listOpts := []client.ListOption{}
	if err := az.client.List(ctx, erulesList, listOpts...); err != nil {
		return err
	}

	nodeList := &corev1.NodeList{}
	if err := az.client.List(ctx, nodeList, []client.ListOption{}...); err != nil {
		return err
	}

	//fetch all the Ip groups in a resource group
	var ipGroupsInRG = make(map[string]*a.IPGroup)
	res1 := az.ipGroupClient.NewListByResourceGroupPager(az.resourceGroupName, nil)
	for res1.More() {
		page, err := res1.NextPage(ctx)
		if err != nil {
			klog.Error("failed to advance page: %v", err)
		}
		for _, ipGroup := range page.Value {
			ipGroupsInRG[*ipGroup.Name] = ipGroup
		}
	}

	for _, item := range erulesList.Items {
		for _, egressrule := range item.Spec.EgressRules {
			var sourceIpGroups []string
			if egressrule.NodeSelector != nil {
				for _, m := range egressrule.NodeSelector {
					for k, v := range m {
						IPGroupName := IpGroupNamePrefix + k + v
						if ipGroupIds[IPGroupName] == "" {
							sourceAddress := getSourceAddressesByNodeLabels(k, v, *nodeList)
							var id = ""

							//check if IP Group already exists
							if _, ok := ipGroupsInRG[IPGroupName]; ok {
								addressesInIpGroup := ipGroupsInRG[IPGroupName].Properties.IPAddresses
								IPGroupProvisioningState := *ipGroupsInRG[IPGroupName].Properties.ProvisioningState
								if len(sourceAddress) == len(addressesInIpGroup) && !checkIfElementsPresentInArray(sourceAddress, addressesInIpGroup) {
									//IP group not changed
									id = *ipGroupsInRG[IPGroupName].ID
								} else if IPGroupProvisioningState == a.ProvisioningStateUpdating && pollers[IPGroupName] != nil {
									//if the IP group is in updating state, wait for it to complete.
									klog.Info("Waiting for the Ip group update to complete, ", IPGroupName)
									_, err = pollers[IPGroupName].PollUntilDone(ctx, nil)
								}
							}

							// update IP Group and get the associated ID.
							if id == "" {
								poller := az.updateIpGroup(sourceAddress, IPGroupName)
								pollers[IPGroupName] = poller
								res, err1 := az.ipGroupClient.Get(az.ctx, az.resourceGroupName, IPGroupName, &a.IPGroupsClientGetOptions{Expand: nil})
								if err1 != nil {
									klog.Error("Failed to get the IP Group", err1)
								}
								id = *res.IPGroup.ID
							}
							ipGroupIds[IPGroupName] = id
						}
						sourceIpGroups = append(sourceIpGroups, ipGroupIds[IPGroupName])
					}
				}
			}
			erulesSourceAddresses[egressrule.Name] = sourceIpGroups
		}
	}

	go az.WaitForNodeIpGroupUpdate(ctx, nodesWithFwTaint, pollers)

	//Generate fw config
	az.BuildPolicy(*erulesList, erulesSourceAddresses)

	duration := time.Now().Sub(processEventStart)
	klog.Infof("Completed last event loop run in: %+v", duration)
	return
}