in pkg/providers/v1/aws_loadbalancer.go [780:860]
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
if c.cfg.Global.DisableSecurityGroupIngress {
return nil
}
clusterSGs, err := c.getTaggedSecurityGroups()
if err != nil {
return fmt.Errorf("error querying for tagged security groups: %q", err)
}
// scan instances for groups we want to open
desiredSGIDs := sets.String{}
for _, instance := range instances {
sg, err := findSecurityGroupForInstance(instance, clusterSGs)
if err != nil {
return err
}
if sg == nil {
klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId))
continue
}
desiredSGIDs.Insert(aws.StringValue(sg.GroupId))
}
// TODO(@M00nF1sh): do we really needs to support SG without cluster tag at current version?
// findSecurityGroupForInstance might return SG that are not tagged.
{
for sgID := range desiredSGIDs.Difference(sets.StringKeySet(clusterSGs)) {
sg, err := c.findSecurityGroup(sgID)
if err != nil {
return fmt.Errorf("error finding instance group: %q", err)
}
clusterSGs[sgID] = sg
}
}
{
clientPorts := sets.Int64{}
clientProtocol := "tcp"
healthCheckPorts := sets.Int64{}
for _, port := range portMappings {
clientPorts.Insert(port.TrafficPort)
hcPort := port.TrafficPort
if port.HealthCheckConfig.Port != defaultHealthCheckPort {
var err error
if hcPort, err = strconv.ParseInt(port.HealthCheckConfig.Port, 10, 0); err != nil {
return fmt.Errorf("Invalid health check port %v", port.HealthCheckConfig.Port)
}
}
healthCheckPorts.Insert(hcPort)
if port.TrafficProtocol == string(v1.ProtocolUDP) {
clientProtocol = "udp"
}
}
clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
for sgID, sg := range clusterSGs {
sgPerms := NewIPPermissionSet(sg.IpPermissions...).Ungroup()
if desiredSGIDs.Has(sgID) {
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", healthCheckPorts, subnetCIDRs); err != nil {
return err
}
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, clientPorts, clientCIDRs); err != nil {
return err
}
} else {
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", nil, nil); err != nil {
return err
}
if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, clientProtocol, nil, nil); err != nil {
return err
}
}
if !sgPerms.Equal(NewIPPermissionSet(sg.IpPermissions...).Ungroup()) {
if err := c.updateInstanceSecurityGroupForNLBMTU(sgID, sgPerms); err != nil {
return err
}
}
}
}
return nil
}