in pkg/neg/syncers/utils.go [117:219]
func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName, kubeSystemUID, port string, networkEndpointType negtypes.NetworkEndpointType, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder, version meta.Version, customName bool) (negv1beta1.NegObjectReference, error) {
var negRef negv1beta1.NegObjectReference
neg, err := cloud.GetNetworkEndpointGroup(negName, zone, version)
if err != nil {
if !utils.IsNotFoundError(err) {
klog.Errorf("Failed to get Neg %q in zone %q: %s", negName, zone, err)
return negRef, err
}
klog.V(4).Infof("Neg %q in zone %q was not found: %s", negName, zone, err)
}
needToCreate := false
if neg == nil {
needToCreate = true
} else {
expectedDesc := utils.NegDescription{
ClusterUID: kubeSystemUID,
Namespace: svcNamespace,
ServiceName: svcName,
Port: port,
}
if customName && neg.Description == "" {
klog.Errorf("Found Neg with custom name %s but empty description", negName)
return negv1beta1.NegObjectReference{}, fmt.Errorf("neg name %s is already in use, found a custom named neg with an empty description", negName)
}
if matches, err := utils.VerifyDescription(expectedDesc, neg.Description, negName, zone); !matches {
klog.Errorf("Neg Name %s is already in use: %s", negName, err)
return negv1beta1.NegObjectReference{}, fmt.Errorf("neg name %s is already in use, found conflicting description: %w", negName, err)
}
if networkEndpointType != negtypes.NonGCPPrivateEndpointType &&
// Only perform the following checks when the NEGs are not Non-GCP NEGs.
// Non-GCP NEGs do not have associated network and subnetwork.
(!utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL())) {
needToCreate = true
klog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", negName, zone)
err = cloud.DeleteNetworkEndpointGroup(negName, zone, version)
if err != nil {
return negRef, err
}
if recorder != nil && serviceLister != nil {
if svc := getService(serviceLister, svcNamespace, svcName); svc != nil {
recorder.Eventf(svc, apiv1.EventTypeNormal, "Delete", "Deleted NEG %q for %s in %q.", negName, negServicePortName, zone)
}
}
}
}
if needToCreate {
klog.V(2).Infof("Creating NEG %q for %s in %q.", negName, negServicePortName, zone)
var subnetwork string
switch networkEndpointType {
case negtypes.NonGCPPrivateEndpointType:
subnetwork = ""
default:
subnetwork = cloud.SubnetworkURL()
}
desc := ""
negDesc := utils.NegDescription{
ClusterUID: kubeSystemUID,
Namespace: svcNamespace,
ServiceName: svcName,
Port: port,
}
desc = negDesc.String()
err = cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Version: version,
Name: negName,
NetworkEndpointType: string(networkEndpointType),
Network: cloud.NetworkURL(),
Subnetwork: subnetwork,
Description: desc,
}, zone)
if err != nil {
return negRef, err
}
if recorder != nil && serviceLister != nil {
if svc := getService(serviceLister, svcNamespace, svcName); svc != nil {
recorder.Eventf(svc, apiv1.EventTypeNormal, "Create", "Created NEG %q for %s in %q.", negName, negServicePortName, zone)
}
}
}
if neg == nil {
var err error
neg, err = cloud.GetNetworkEndpointGroup(negName, zone, version)
if err != nil {
klog.Errorf("Error while retriving %q in zone %q: %v after initialization", negName, zone, err)
return negRef, err
}
}
negRef = negv1beta1.NegObjectReference{
Id: fmt.Sprint(neg.Id),
SelfLink: neg.SelfLink,
NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType),
}
return negRef, nil
}