func()

in pkg/ingress/controller/controller.go [664:932]


func (c *Controller) ensureIngress(ing *nwv1.Ingress) error {
	ingName := ing.ObjectMeta.Name
	ingNamespace := ing.ObjectMeta.Namespace
	clusterName := c.config.ClusterName

	ingfullName := fmt.Sprintf("%s/%s", ingNamespace, ingName)
	resName := utils.GetResourceName(ingNamespace, ingName, clusterName)

	if len(ing.Spec.TLS) > 0 && c.osClient.Barbican == nil {
		return fmt.Errorf("TLS Ingress not supported because of Key Manager service unavailable")
	}

	lb, err := c.osClient.EnsureLoadBalancer(resName, c.config.Octavia.SubnetID, ingNamespace, ingName, clusterName)
	if err != nil {
		return err
	}

	logger := log.WithFields(log.Fields{"ingress": ingfullName, "lbID": lb.ID})

	if strings.Contains(lb.Description, ing.ResourceVersion) {
		logger.Info("ingress not changed")
		return nil
	}

	var nodePorts []int
	var sgID string

	if c.config.Octavia.ManageSecurityGroups {
		logger.Info("ensuring security group")

		sgDescription := fmt.Sprintf("Security group created for Ingress %s from cluster %s", ingfullName, clusterName)
		sgTags := []string{IngressControllerTag, fmt.Sprintf("%s_%s", ingNamespace, ingName)}
		sgID, err = c.osClient.EnsureSecurityGroup(false, resName, sgDescription, sgTags)
		if err != nil {
			return fmt.Errorf("failed to prepare the security group for the ingress %s: %v", ingfullName, err)
		}

		logger.WithFields(log.Fields{"sgID": sgID}).Info("ensured security group")
	}

	// Convert kubernetes secrets to barbican ones
	var secretRefs []string
	for _, tls := range ing.Spec.TLS {
		secretName := fmt.Sprintf(BarbicanSecretNameTemplate, clusterName, ingNamespace, ingName, tls.SecretName)
		secretRef, err := c.toBarbicanSecret(tls.SecretName, ingNamespace, secretName)
		if err != nil {
			return fmt.Errorf("failed to create Barbican secret: %v", err)
		}

		logger.WithFields(log.Fields{"secretName": secretName, "secretRef": secretRef}).Info("secret created in Barbican")

		secretRefs = append(secretRefs, secretRef)
	}
	port := 80
	if len(secretRefs) > 0 {
		port = 443
	}

	// Create listener
	sourceRanges := getStringFromIngressAnnotation(ing, IngressAnnotationSourceRangesKey, "0.0.0.0/0")
	listenerAllowedCIDRs := strings.Split(sourceRanges, ",")
	listener, err := c.osClient.EnsureListener(resName, lb.ID, secretRefs, listenerAllowedCIDRs)
	if err != nil {
		return err
	}

	// get nodes information and prepare update member params.
	nodeObjs, err := listWithPredicate(c.nodeLister, getNodeConditionPredicate())
	if err != nil {
		return err
	}
	var updateMemberOpts []pools.BatchUpdateMemberOpts
	for _, node := range nodeObjs {
		addr, err := getNodeAddressForLB(node)
		if err != nil {
			// Node failure, do not create member
			logger.WithFields(log.Fields{"node": node.Name, "error": err}).Warn("failed to get node address")
			continue
		}
		nodeName := node.Name
		member := pools.BatchUpdateMemberOpts{
			Name:    &nodeName,
			Address: addr,
		}
		updateMemberOpts = append(updateMemberOpts, member)
	}
	// only allow >= 1 members or it will lead to openstack octavia issue
	if len(updateMemberOpts) == 0 {
		return fmt.Errorf("no available nodes")
	}

	// Get all the existing pools and l7 policies
	var newPools []openstack.IngPool
	var newPolicies []openstack.IngPolicy
	var oldPolicies []openstack.ExistingPolicy

	existingPolicies, err := openstackutil.GetL7policies(c.osClient.Octavia, listener.ID)
	if err != nil {
		return fmt.Errorf("failed to get l7 policies for listener %s", listener.ID)
	}
	for _, policy := range existingPolicies {
		rules, err := openstackutil.GetL7Rules(c.osClient.Octavia, policy.ID)
		if err != nil {
			return fmt.Errorf("failed to get l7 rules for policy %s", policy.ID)
		}
		oldPolicies = append(oldPolicies, openstack.ExistingPolicy{
			Policy: policy,
			Rules:  rules,
		})
	}

	existingPools, err := openstackutil.GetPools(c.osClient.Octavia, lb.ID)
	if err != nil {
		return fmt.Errorf("failed to get pools from load balancer %s, error: %v", lb.ID, err)
	}

	// Add default pool for the listener if 'backend' is defined
	if ing.Spec.DefaultBackend != nil {
		poolName := utils.Hash(fmt.Sprintf("%s+%s", ing.Spec.DefaultBackend.Service.Name, ing.Spec.DefaultBackend.Service.Port.String()))

		serviceName := fmt.Sprintf("%s/%s", ingNamespace, ing.Spec.DefaultBackend.Service.Name)
		nodePort, err := c.getServiceNodePort(serviceName, ing.Spec.DefaultBackend.Service)
		if err != nil {
			return err
		}
		nodePorts = append(nodePorts, nodePort)

		var members = make([]pools.BatchUpdateMemberOpts, len(updateMemberOpts))
		copy(members, updateMemberOpts)
		for index := range members {
			members[index].ProtocolPort = nodePort
		}

		// This pool is the default pool of the listener.
		newPools = append(newPools, openstack.IngPool{
			Name: poolName,
			Opts: pools.CreateOpts{
				Name:        poolName,
				Protocol:    "HTTP",
				LBMethod:    pools.LBMethodRoundRobin,
				ListenerID:  listener.ID,
				Persistence: nil,
			},
			PoolMembers: members,
		})
	}

	// Add l7 load balancing rules. Each host and path pair is mapped to a l7 policy in octavia,
	// which contains two rules(with type 'HOST_NAME' and 'PATH' respectively)
	for _, rule := range ing.Spec.Rules {
		host := rule.Host

		for _, path := range rule.HTTP.Paths {
			var policyRules []l7policies.CreateRuleOpts

			if host != "" {
				policyRules = append(policyRules, l7policies.CreateRuleOpts{
					RuleType:    l7policies.TypeHostName,
					CompareType: l7policies.CompareTypeRegex,
					Value:       fmt.Sprintf("^%s(:%d)?$", strings.ReplaceAll(host, ".", "\\."), port)})
			}

			// make the pool name unique in the load balancer
			poolName := utils.Hash(fmt.Sprintf("%s+%s", path.Backend.Service.Name, path.Backend.Service.Port.String()))

			serviceName := fmt.Sprintf("%s/%s", ingNamespace, path.Backend.Service.Name)
			nodePort, err := c.getServiceNodePort(serviceName, path.Backend.Service)
			if err != nil {
				return err
			}
			nodePorts = append(nodePorts, nodePort)

			var members = make([]pools.BatchUpdateMemberOpts, len(updateMemberOpts))
			copy(members, updateMemberOpts)
			for index := range members {
				members[index].ProtocolPort = nodePort
			}

			// The pool is a shared pool in a load balancer.
			newPools = append(newPools, openstack.IngPool{
				Name: poolName,
				Opts: pools.CreateOpts{
					Name:           poolName,
					Protocol:       "HTTP",
					LBMethod:       pools.LBMethodRoundRobin,
					LoadbalancerID: lb.ID,
					Persistence:    nil,
				},
				PoolMembers: members,
			})

			policyRules = append(policyRules, l7policies.CreateRuleOpts{
				RuleType:    l7policies.TypePath,
				CompareType: l7policies.CompareTypeStartWith,
				Value:       path.Path,
			})

			newPolicies = append(newPolicies, openstack.IngPolicy{
				RedirectPoolName: poolName,
				Opts: l7policies.CreateOpts{
					ListenerID:  listener.ID,
					Action:      l7policies.ActionRedirectToPool,
					Description: "Created by kubernetes ingress",
				},
				RulesOpts: policyRules,
			})
		}
	}

	// Reconsile octavia resources.
	rt := openstack.NewResourceTracker(ingfullName, c.osClient.Octavia, lb.ID, listener.ID, newPools, newPolicies, existingPools, oldPolicies)
	if err := rt.CreateResources(); err != nil {
		return err
	}
	if err := rt.CleanupResources(); err != nil {
		return err
	}

	if c.config.Octavia.ManageSecurityGroups {
		logger.WithFields(log.Fields{"sgID": sgID}).Info("ensuring security group rules")

		if err := c.osClient.EnsureSecurityGroupRules(sgID, c.subnetCIDR, nodePorts); err != nil {
			return fmt.Errorf("failed to ensure security group rules for Ingress %s: %v", ingName, err)
		}

		if err := c.osClient.EnsurePortSecurityGroup(false, sgID, nodeObjs); err != nil {
			return fmt.Errorf("failed to operate port security group for Ingress %s: %v", ingName, err)
		}

		logger.WithFields(log.Fields{"sgID": sgID}).Info("ensured security group rules")
	}

	internalSetting := getStringFromIngressAnnotation(ing, IngressAnnotationInternal, "true")
	isInternal, err := strconv.ParseBool(internalSetting)
	if err != nil {
		return fmt.Errorf("unknown annotation %s: %v", IngressAnnotationInternal, err)
	}

	address := lb.VipAddress
	// Allocate floating ip for loadbalancer vip if the external network is configured and the Ingress is not internal.
	if !isInternal && c.config.Octavia.FloatingIPNetwork != "" {
		logger.Info("creating floating IP")

		description := fmt.Sprintf("Floating IP for Kubernetes ingress %s in namespace %s from cluster %s", ingName, ingNamespace, clusterName)
		address, err = c.osClient.EnsureFloatingIP(false, lb.VipPortID, c.config.Octavia.FloatingIPNetwork, description)
		if err != nil {
			return fmt.Errorf("failed to create floating IP: %v", err)
		}

		logger.WithFields(log.Fields{"fip": address}).Info("floating IP created")
	}

	// Update ingress status
	newIng, err := c.updateIngressStatus(ing, address)
	if err != nil {
		return err
	}
	c.recorder.Event(ing, apiv1.EventTypeNormal, "Updated", fmt.Sprintf("Successfully associated IP address %s to ingress %s", address, ingfullName))

	// Add ingress resource version to the load balancer description
	newDes := fmt.Sprintf("Kubernetes Ingress %s in namespace %s from cluster %s, version: %s", ingName, ingNamespace, clusterName, newIng.ResourceVersion)
	if err = c.osClient.UpdateLoadBalancerDescription(lb.ID, newDes); err != nil {
		return err
	}

	logger.Info("openstack resources for ingress created")

	return nil
}