in aws/aws_loadbalancer.go [144:400]
func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) {
loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName)
if err != nil {
return nil, err
}
dirty := false
// Get additional tags set by the user
tags := getKeyValuePropertiesFromAnnotation(annotations, ServiceAnnotationLoadBalancerAdditionalTags)
// Add default tags
tags[TagNameKubernetesService] = namespacedName.String()
tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)
if loadBalancer == nil {
// Create the LB
createRequest := &elbv2.CreateLoadBalancerInput{
Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork),
Name: aws.String(loadBalancerName),
}
if internalELB {
createRequest.Scheme = aws.String("internal")
}
var allocationIDs []string
if eipList, present := annotations[ServiceAnnotationLoadBalancerEIPAllocations]; present {
allocationIDs = strings.Split(eipList, ",")
if len(allocationIDs) != len(subnetIDs) {
return nil, fmt.Errorf("error creating load balancer: Must have same number of EIP AllocationIDs (%d) and SubnetIDs (%d)", len(allocationIDs), len(subnetIDs))
}
}
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.SubnetMappings = createSubnetMappings(subnetIDs, allocationIDs)
for k, v := range tags {
createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
createResponse, err := c.elbv2.CreateLoadBalancer(createRequest)
if err != nil {
return nil, fmt.Errorf("error creating load balancer: %q", err)
}
loadBalancer = createResponse.LoadBalancers[0]
for i := range mappings {
// It is easier to keep track of updates by having possibly
// duplicate target groups where the backend port is the same
_, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId, tags)
if err != nil {
return nil, fmt.Errorf("error creating listener: %q", err)
}
}
if err := c.reconcileLBAttributes(aws.StringValue(loadBalancer.LoadBalancerArn), annotations); err != nil {
return nil, err
}
} else {
// TODO: Sync internal vs non-internal
// sync mappings
{
listenerDescriptions, err := c.elbv2.DescribeListeners(
&elbv2.DescribeListenersInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
},
)
if err != nil {
return nil, fmt.Errorf("error describing listeners: %q", err)
}
// actual maps FrontendPort to an elbv2.Listener
actual := map[int64]map[string]*elbv2.Listener{}
for _, listener := range listenerDescriptions.Listeners {
if actual[*listener.Port] == nil {
actual[*listener.Port] = map[string]*elbv2.Listener{}
}
actual[*listener.Port][*listener.Protocol] = listener
}
actualTargetGroups, err := c.elbv2.DescribeTargetGroups(
&elbv2.DescribeTargetGroupsInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
},
)
if err != nil {
return nil, fmt.Errorf("error listing target groups: %q", err)
}
nodePortTargetGroup := map[int64]*elbv2.TargetGroup{}
for _, targetGroup := range actualTargetGroups.TargetGroups {
nodePortTargetGroup[*targetGroup.Port] = targetGroup
}
// Handle additions/modifications
for _, mapping := range mappings {
frontendPort := mapping.FrontendPort
frontendProtocol := mapping.FrontendProtocol
nodePort := mapping.TrafficPort
// modifications
if listener, ok := actual[frontendPort][frontendProtocol]; ok {
listenerNeedsModification := false
if aws.StringValue(listener.Protocol) != mapping.FrontendProtocol {
listenerNeedsModification = true
}
switch mapping.FrontendProtocol {
case elbv2.ProtocolEnumTls:
{
if aws.StringValue(listener.SslPolicy) != mapping.SSLPolicy {
listenerNeedsModification = true
}
if len(listener.Certificates) == 0 || aws.StringValue(listener.Certificates[0].CertificateArn) != mapping.SSLCertificateARN {
listenerNeedsModification = true
}
}
case elbv2.ProtocolEnumTcp:
{
if aws.StringValue(listener.SslPolicy) != "" {
listenerNeedsModification = true
}
if len(listener.Certificates) != 0 {
listenerNeedsModification = true
}
}
}
// recreate targetGroup if trafficPort, protocol or HealthCheckProtocol changed
healthCheckModified := false
targetGroupRecreated := false
targetGroup, ok := nodePortTargetGroup[nodePort]
if targetGroup != nil && (!strings.EqualFold(mapping.HealthCheckConfig.Protocol, aws.StringValue(targetGroup.HealthCheckProtocol)) ||
mapping.HealthCheckConfig.Interval != aws.Int64Value(targetGroup.HealthCheckIntervalSeconds)) {
healthCheckModified = true
}
if !ok || aws.StringValue(targetGroup.Protocol) != mapping.TrafficProtocol || healthCheckModified {
// create new target group
targetGroup, err = c.ensureTargetGroup(
nil,
namespacedName,
mapping,
instanceIDs,
*loadBalancer.VpcId,
tags,
)
if err != nil {
return nil, err
}
targetGroupRecreated = true
listenerNeedsModification = true
}
if listenerNeedsModification {
modifyListenerInput := &elbv2.ModifyListenerInput{
ListenerArn: listener.ListenerArn,
Port: aws.Int64(frontendPort),
Protocol: aws.String(mapping.FrontendProtocol),
DefaultActions: []*elbv2.Action{{
TargetGroupArn: targetGroup.TargetGroupArn,
Type: aws.String("forward"),
}},
}
if mapping.FrontendProtocol == elbv2.ProtocolEnumTls {
if mapping.SSLPolicy != "" {
modifyListenerInput.SslPolicy = aws.String(mapping.SSLPolicy)
}
modifyListenerInput.Certificates = []*elbv2.Certificate{
{
CertificateArn: aws.String(mapping.SSLCertificateARN),
},
}
}
if _, err := c.elbv2.ModifyListener(modifyListenerInput); err != nil {
return nil, fmt.Errorf("error updating load balancer listener: %q", err)
}
}
// Delete old targetGroup if needed
if targetGroupRecreated {
if _, err := c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{
TargetGroupArn: listener.DefaultActions[0].TargetGroupArn,
}); err != nil {
return nil, fmt.Errorf("error deleting old target group: %q", err)
}
} else {
// Run ensureTargetGroup to make sure instances in service are up-to-date
_, err = c.ensureTargetGroup(
targetGroup,
namespacedName,
mapping,
instanceIDs,
*loadBalancer.VpcId,
tags,
)
if err != nil {
return nil, err
}
}
dirty = true
continue
}
// Additions
_, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId, tags)
if err != nil {
return nil, err
}
dirty = true
}
frontEndPorts := map[int64]map[string]bool{}
for i := range mappings {
if frontEndPorts[mappings[i].FrontendPort] == nil {
frontEndPorts[mappings[i].FrontendPort] = map[string]bool{}
}
frontEndPorts[mappings[i].FrontendPort][mappings[i].FrontendProtocol] = true
}
// handle deletions
for port := range actual {
for protocol := range actual[port] {
if _, ok := frontEndPorts[port][protocol]; !ok {
err := c.deleteListenerV2(actual[port][protocol])
if err != nil {
return nil, err
}
dirty = true
}
}
}
}
if err := c.reconcileLBAttributes(aws.StringValue(loadBalancer.LoadBalancerArn), annotations); err != nil {
return nil, err
}
// Subnets cannot be modified on NLBs
if dirty {
loadBalancers, err := c.elbv2.DescribeLoadBalancers(
&elbv2.DescribeLoadBalancersInput{
LoadBalancerArns: []*string{
loadBalancer.LoadBalancerArn,
},
},
)
if err != nil {
return nil, fmt.Errorf("error retrieving load balancer after update: %q", err)
}
loadBalancer = loadBalancers.LoadBalancers[0]
}
}
return loadBalancer, nil
}