in pkg/ingress/controller/controller.go [664:932]
func (c *Controller) ensureIngress(ing *nwv1.Ingress) error {
ingName := ing.ObjectMeta.Name
ingNamespace := ing.ObjectMeta.Namespace
clusterName := c.config.ClusterName
ingfullName := fmt.Sprintf("%s/%s", ingNamespace, ingName)
resName := utils.GetResourceName(ingNamespace, ingName, clusterName)
if len(ing.Spec.TLS) > 0 && c.osClient.Barbican == nil {
return fmt.Errorf("TLS Ingress not supported because of Key Manager service unavailable")
}
lb, err := c.osClient.EnsureLoadBalancer(resName, c.config.Octavia.SubnetID, ingNamespace, ingName, clusterName)
if err != nil {
return err
}
logger := log.WithFields(log.Fields{"ingress": ingfullName, "lbID": lb.ID})
if strings.Contains(lb.Description, ing.ResourceVersion) {
logger.Info("ingress not changed")
return nil
}
var nodePorts []int
var sgID string
if c.config.Octavia.ManageSecurityGroups {
logger.Info("ensuring security group")
sgDescription := fmt.Sprintf("Security group created for Ingress %s from cluster %s", ingfullName, clusterName)
sgTags := []string{IngressControllerTag, fmt.Sprintf("%s_%s", ingNamespace, ingName)}
sgID, err = c.osClient.EnsureSecurityGroup(false, resName, sgDescription, sgTags)
if err != nil {
return fmt.Errorf("failed to prepare the security group for the ingress %s: %v", ingfullName, err)
}
logger.WithFields(log.Fields{"sgID": sgID}).Info("ensured security group")
}
// Convert kubernetes secrets to barbican ones
var secretRefs []string
for _, tls := range ing.Spec.TLS {
secretName := fmt.Sprintf(BarbicanSecretNameTemplate, clusterName, ingNamespace, ingName, tls.SecretName)
secretRef, err := c.toBarbicanSecret(tls.SecretName, ingNamespace, secretName)
if err != nil {
return fmt.Errorf("failed to create Barbican secret: %v", err)
}
logger.WithFields(log.Fields{"secretName": secretName, "secretRef": secretRef}).Info("secret created in Barbican")
secretRefs = append(secretRefs, secretRef)
}
port := 80
if len(secretRefs) > 0 {
port = 443
}
// Create listener
sourceRanges := getStringFromIngressAnnotation(ing, IngressAnnotationSourceRangesKey, "0.0.0.0/0")
listenerAllowedCIDRs := strings.Split(sourceRanges, ",")
listener, err := c.osClient.EnsureListener(resName, lb.ID, secretRefs, listenerAllowedCIDRs)
if err != nil {
return err
}
// get nodes information and prepare update member params.
nodeObjs, err := listWithPredicate(c.nodeLister, getNodeConditionPredicate())
if err != nil {
return err
}
var updateMemberOpts []pools.BatchUpdateMemberOpts
for _, node := range nodeObjs {
addr, err := getNodeAddressForLB(node)
if err != nil {
// Node failure, do not create member
logger.WithFields(log.Fields{"node": node.Name, "error": err}).Warn("failed to get node address")
continue
}
nodeName := node.Name
member := pools.BatchUpdateMemberOpts{
Name: &nodeName,
Address: addr,
}
updateMemberOpts = append(updateMemberOpts, member)
}
// only allow >= 1 members or it will lead to openstack octavia issue
if len(updateMemberOpts) == 0 {
return fmt.Errorf("no available nodes")
}
// Get all the existing pools and l7 policies
var newPools []openstack.IngPool
var newPolicies []openstack.IngPolicy
var oldPolicies []openstack.ExistingPolicy
existingPolicies, err := openstackutil.GetL7policies(c.osClient.Octavia, listener.ID)
if err != nil {
return fmt.Errorf("failed to get l7 policies for listener %s", listener.ID)
}
for _, policy := range existingPolicies {
rules, err := openstackutil.GetL7Rules(c.osClient.Octavia, policy.ID)
if err != nil {
return fmt.Errorf("failed to get l7 rules for policy %s", policy.ID)
}
oldPolicies = append(oldPolicies, openstack.ExistingPolicy{
Policy: policy,
Rules: rules,
})
}
existingPools, err := openstackutil.GetPools(c.osClient.Octavia, lb.ID)
if err != nil {
return fmt.Errorf("failed to get pools from load balancer %s, error: %v", lb.ID, err)
}
// Add default pool for the listener if 'backend' is defined
if ing.Spec.DefaultBackend != nil {
poolName := utils.Hash(fmt.Sprintf("%s+%s", ing.Spec.DefaultBackend.Service.Name, ing.Spec.DefaultBackend.Service.Port.String()))
serviceName := fmt.Sprintf("%s/%s", ingNamespace, ing.Spec.DefaultBackend.Service.Name)
nodePort, err := c.getServiceNodePort(serviceName, ing.Spec.DefaultBackend.Service)
if err != nil {
return err
}
nodePorts = append(nodePorts, nodePort)
var members = make([]pools.BatchUpdateMemberOpts, len(updateMemberOpts))
copy(members, updateMemberOpts)
for index := range members {
members[index].ProtocolPort = nodePort
}
// This pool is the default pool of the listener.
newPools = append(newPools, openstack.IngPool{
Name: poolName,
Opts: pools.CreateOpts{
Name: poolName,
Protocol: "HTTP",
LBMethod: pools.LBMethodRoundRobin,
ListenerID: listener.ID,
Persistence: nil,
},
PoolMembers: members,
})
}
// Add l7 load balancing rules. Each host and path pair is mapped to a l7 policy in octavia,
// which contains two rules(with type 'HOST_NAME' and 'PATH' respectively)
for _, rule := range ing.Spec.Rules {
host := rule.Host
for _, path := range rule.HTTP.Paths {
var policyRules []l7policies.CreateRuleOpts
if host != "" {
policyRules = append(policyRules, l7policies.CreateRuleOpts{
RuleType: l7policies.TypeHostName,
CompareType: l7policies.CompareTypeRegex,
Value: fmt.Sprintf("^%s(:%d)?$", strings.ReplaceAll(host, ".", "\\."), port)})
}
// make the pool name unique in the load balancer
poolName := utils.Hash(fmt.Sprintf("%s+%s", path.Backend.Service.Name, path.Backend.Service.Port.String()))
serviceName := fmt.Sprintf("%s/%s", ingNamespace, path.Backend.Service.Name)
nodePort, err := c.getServiceNodePort(serviceName, path.Backend.Service)
if err != nil {
return err
}
nodePorts = append(nodePorts, nodePort)
var members = make([]pools.BatchUpdateMemberOpts, len(updateMemberOpts))
copy(members, updateMemberOpts)
for index := range members {
members[index].ProtocolPort = nodePort
}
// The pool is a shared pool in a load balancer.
newPools = append(newPools, openstack.IngPool{
Name: poolName,
Opts: pools.CreateOpts{
Name: poolName,
Protocol: "HTTP",
LBMethod: pools.LBMethodRoundRobin,
LoadbalancerID: lb.ID,
Persistence: nil,
},
PoolMembers: members,
})
policyRules = append(policyRules, l7policies.CreateRuleOpts{
RuleType: l7policies.TypePath,
CompareType: l7policies.CompareTypeStartWith,
Value: path.Path,
})
newPolicies = append(newPolicies, openstack.IngPolicy{
RedirectPoolName: poolName,
Opts: l7policies.CreateOpts{
ListenerID: listener.ID,
Action: l7policies.ActionRedirectToPool,
Description: "Created by kubernetes ingress",
},
RulesOpts: policyRules,
})
}
}
// Reconsile octavia resources.
rt := openstack.NewResourceTracker(ingfullName, c.osClient.Octavia, lb.ID, listener.ID, newPools, newPolicies, existingPools, oldPolicies)
if err := rt.CreateResources(); err != nil {
return err
}
if err := rt.CleanupResources(); err != nil {
return err
}
if c.config.Octavia.ManageSecurityGroups {
logger.WithFields(log.Fields{"sgID": sgID}).Info("ensuring security group rules")
if err := c.osClient.EnsureSecurityGroupRules(sgID, c.subnetCIDR, nodePorts); err != nil {
return fmt.Errorf("failed to ensure security group rules for Ingress %s: %v", ingName, err)
}
if err := c.osClient.EnsurePortSecurityGroup(false, sgID, nodeObjs); err != nil {
return fmt.Errorf("failed to operate port security group for Ingress %s: %v", ingName, err)
}
logger.WithFields(log.Fields{"sgID": sgID}).Info("ensured security group rules")
}
internalSetting := getStringFromIngressAnnotation(ing, IngressAnnotationInternal, "true")
isInternal, err := strconv.ParseBool(internalSetting)
if err != nil {
return fmt.Errorf("unknown annotation %s: %v", IngressAnnotationInternal, err)
}
address := lb.VipAddress
// Allocate floating ip for loadbalancer vip if the external network is configured and the Ingress is not internal.
if !isInternal && c.config.Octavia.FloatingIPNetwork != "" {
logger.Info("creating floating IP")
description := fmt.Sprintf("Floating IP for Kubernetes ingress %s in namespace %s from cluster %s", ingName, ingNamespace, clusterName)
address, err = c.osClient.EnsureFloatingIP(false, lb.VipPortID, c.config.Octavia.FloatingIPNetwork, description)
if err != nil {
return fmt.Errorf("failed to create floating IP: %v", err)
}
logger.WithFields(log.Fields{"fip": address}).Info("floating IP created")
}
// Update ingress status
newIng, err := c.updateIngressStatus(ing, address)
if err != nil {
return err
}
c.recorder.Event(ing, apiv1.EventTypeNormal, "Updated", fmt.Sprintf("Successfully associated IP address %s to ingress %s", address, ingfullName))
// Add ingress resource version to the load balancer description
newDes := fmt.Sprintf("Kubernetes Ingress %s in namespace %s from cluster %s, version: %s", ingName, ingNamespace, clusterName, newIng.ResourceVersion)
if err = c.osClient.UpdateLoadBalancerDescription(lb.ID, newDes); err != nil {
return err
}
logger.Info("openstack resources for ingress created")
return nil
}