func()

in controllers/route/route_controller.go [138:290]


func (rc *RouteController) reconcile(ctx context.Context, nodes []*v1.Node, routes []*cloudprovider.Route) error {
	var l sync.Mutex
	// for each node a map of podCIDRs and their created status
	nodeRoutesStatuses := make(map[types.NodeName]map[string]bool)
	// routeMap maps routeTargetNode->route
	routeMap := make(map[types.NodeName][]*cloudprovider.Route)
	for _, route := range routes {
		if route.TargetNode != "" {
			routeMap[route.TargetNode] = append(routeMap[route.TargetNode], route)
		}
	}

	wg := sync.WaitGroup{}
	rateLimiter := make(chan struct{}, maxConcurrentRouteCreations)
	// searches existing routes by node for a matching route

	for _, node := range nodes {
		// Skip if the node hasn't been assigned a CIDR yet.
		if len(node.Spec.PodCIDRs) == 0 {
			continue
		}
		nodeName := types.NodeName(node.Name)
		l.Lock()
		nodeRoutesStatuses[nodeName] = make(map[string]bool)
		l.Unlock()
		// for every node, for every cidr
		for _, podCIDR := range node.Spec.PodCIDRs {
			// we add it to our nodeCIDRs map here because add and delete go routines run at the same time
			l.Lock()
			nodeRoutesStatuses[nodeName][podCIDR] = false
			l.Unlock()
			// ignore if already created
			if hasRoute(routeMap, nodeName, podCIDR) {
				l.Lock()
				nodeRoutesStatuses[nodeName][podCIDR] = true // a route for this podCIDR is already created
				l.Unlock()
				continue
			}
			// if we are here, then a route needs to be created for this node
			route := &cloudprovider.Route{
				TargetNode:      nodeName,
				DestinationCIDR: podCIDR,
			}
			// cloud providers that:
			// - depend on nameHint
			// - trying to support dual stack
			// will have to carefully generate new route names that allow node->(multi cidr)
			nameHint := string(node.UID)
			wg.Add(1)
			go func(nodeName types.NodeName, nameHint string, route *cloudprovider.Route) {
				defer wg.Done()
				err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error {
					startTime := time.Now()
					// Ensure that we don't have more than maxConcurrentRouteCreations
					// CreateRoute calls in flight.
					rateLimiter <- struct{}{}
					klog.Infof("Creating route for node %s %s with hint %s, throttled %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
					err := rc.routes.CreateRoute(ctx, rc.clusterName, nameHint, route)
					<-rateLimiter
					if err != nil {
						msg := fmt.Sprintf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Since(startTime), err)
						if rc.recorder != nil {
							rc.recorder.Eventf(
								&v1.ObjectReference{
									Kind:      "Node",
									Name:      string(nodeName),
									UID:       types.UID(nodeName),
									Namespace: "",
								}, v1.EventTypeWarning, "FailedToCreateRoute", msg)
							klog.V(4).Infof(msg)
							return err
						}
					}
					l.Lock()
					nodeRoutesStatuses[nodeName][route.DestinationCIDR] = true
					l.Unlock()
					klog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
					return nil
				})
				if err != nil {
					klog.Errorf("Could not create route %s %s for node %s: %v", nameHint, route.DestinationCIDR, nodeName, err)
				}
			}(nodeName, nameHint, route)
		}
	}

	// searches our bag of node->cidrs for a match
	nodeHasCidr := func(nodeName types.NodeName, cidr string) bool {
		l.Lock()
		defer l.Unlock()

		nodeRoutes := nodeRoutesStatuses[nodeName]
		if nodeRoutes == nil {
			return false
		}
		_, exist := nodeRoutes[cidr]
		return exist
	}
	// delete routes that are not in use
	for _, route := range routes {
		if rc.isResponsibleForRoute(route) {
			// Check if this route is a blackhole, or applies to a node we know about & has an incorrect CIDR.
			if route.Blackhole || !nodeHasCidr(route.TargetNode, route.DestinationCIDR) {
				wg.Add(1)
				// Delete the route.
				go func(route *cloudprovider.Route, startTime time.Time) {
					defer wg.Done()
					// respect the rate limiter
					rateLimiter <- struct{}{}
					klog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
					if err := rc.routes.DeleteRoute(ctx, rc.clusterName, route); err != nil {
						klog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Since(startTime), err)
					} else {
						klog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Since(startTime))
					}
					<-rateLimiter
				}(route, time.Now())
			}
		}
	}
	wg.Wait()

	// after all routes have been created (or not), we start updating
	// all nodes' statuses with the outcome
	for _, node := range nodes {
		wg.Add(1)
		nodeRoutes := nodeRoutesStatuses[types.NodeName(node.Name)]
		allRoutesCreated := true

		if len(nodeRoutes) == 0 {
			go func(n *v1.Node) {
				defer wg.Done()
				klog.Infof("node %v has no routes assigned to it. NodeNetworkUnavailable will be set to true", n.Name)
				rc.updateNetworkingCondition(n, false)
			}(node)
			continue
		}

		// check if all routes were created. if so, then it should be ready
		for _, created := range nodeRoutes {
			if !created {
				allRoutesCreated = false
				break
			}
		}
		go func(n *v1.Node) {
			defer wg.Done()
			rc.updateNetworkingCondition(n, allRoutesCreated)
		}(node)
	}
	wg.Wait()
	return nil
}