func()

in pkg/providers/v1/aws_loadbalancer.go [780:860]


func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
	if c.cfg.Global.DisableSecurityGroupIngress {
		return nil
	}

	clusterSGs, err := c.getTaggedSecurityGroups()
	if err != nil {
		return fmt.Errorf("error querying for tagged security groups: %q", err)
	}
	// scan instances for groups we want to open
	desiredSGIDs := sets.String{}
	for _, instance := range instances {
		sg, err := findSecurityGroupForInstance(instance, clusterSGs)
		if err != nil {
			return err
		}
		if sg == nil {
			klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId))
			continue
		}
		desiredSGIDs.Insert(aws.StringValue(sg.GroupId))
	}

	// TODO(@M00nF1sh): do we really needs to support SG without cluster tag at current version?
	// findSecurityGroupForInstance might return SG that are not tagged.
	{
		for sgID := range desiredSGIDs.Difference(sets.StringKeySet(clusterSGs)) {
			sg, err := c.findSecurityGroup(sgID)
			if err != nil {
				return fmt.Errorf("error finding instance group: %q", err)
			}
			clusterSGs[sgID] = sg
		}
	}

	{
		clientPorts := sets.Int64{}
		clientProtocol := "tcp"
		healthCheckPorts := sets.Int64{}
		for _, port := range portMappings {
			clientPorts.Insert(port.TrafficPort)
			hcPort := port.TrafficPort
			if port.HealthCheckConfig.Port != defaultHealthCheckPort {
				var err error
				if hcPort, err = strconv.ParseInt(port.HealthCheckConfig.Port, 10, 0); err != nil {
					return fmt.Errorf("Invalid health check port %v", port.HealthCheckConfig.Port)
				}
			}
			healthCheckPorts.Insert(hcPort)
			if port.TrafficProtocol == string(v1.ProtocolUDP) {
				clientProtocol = "udp"
			}
		}
		clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
		healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
		for sgID, sg := range clusterSGs {
			sgPerms := NewIPPermissionSet(sg.IpPermissions...).Ungroup()
			if desiredSGIDs.Has(sgID) {
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", healthCheckPorts, subnetCIDRs); err != nil {
					return err
				}
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, clientPorts, clientCIDRs); err != nil {
					return err
				}
			} else {
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", nil, nil); err != nil {
					return err
				}
				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, nil, nil); err != nil {
					return err
				}
			}
			if !sgPerms.Equal(NewIPPermissionSet(sg.IpPermissions...).Ungroup()) {
				if err := c.updateInstanceSecurityGroupForNLBMTU(sgID, sgPerms); err != nil {
					return err
				}
			}
		}
	}
	return nil
}