in pkg/psc/controller.go [262:388]
func (c *Controller) processServiceAttachment(key string) error {
start := time.Now()
// NOTE: Error will be used to send metrics about whether the sync loop was successful
// Please reuse and set err before returning
var err error
defer func() {
metrics.PublishPSCProcessMetrics(metrics.SyncProcess, filterError(err), start)
metrics.PublishLastProcessTimestampMetrics(metrics.SyncProcess)
c.collector.SetServiceAttachment(key, metrics.PSCState{InSuccess: err == nil})
}()
var namespace, name string
namespace, name, err = cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
var obj interface{}
var exists bool
obj, exists, err = c.svcAttachmentLister.GetByKey(key)
if err != nil {
return fmt.Errorf("errored getting service from store: %w", err)
}
if !exists {
// Allow Garbage Collection to Delete Service Attachment
klog.V(2).Infof("Service attachment %s/%s does not exist in store. Will be cleaned up by GC", namespace, name)
return nil
}
klog.V(2).Infof("Processing Service attachment %s/%s", namespace, name)
defer klog.V(4).Infof("Finished processing service attachment %s/%s", namespace, name)
svcAttachment := obj.(*sav1.ServiceAttachment)
var updatedCR *sav1.ServiceAttachment
updatedCR, err = c.ensureSAFinalizer(svcAttachment)
if err != nil {
return fmt.Errorf("Errored adding finalizer on ServiceAttachment CR %s/%s: %w", namespace, name, err)
}
if err = validateResourceReference(updatedCR.Spec.ResourceRef); err != nil {
return err
}
var frURL string
frURL, err = c.getForwardingRule(namespace, updatedCR.Spec.ResourceRef.Name)
if err != nil {
return fmt.Errorf("failed to find forwarding rule: %w", err)
}
var subnetURLs []string
subnetURLs, err = c.getSubnetURLs(updatedCR.Spec.NATSubnets)
if err != nil {
return fmt.Errorf("failed to find nat subnets: %w", err)
}
saName := c.saNamer.ServiceAttachment(namespace, name, string(updatedCR.UID))
var gceSAKey *meta.Key
gceSAKey, err = composite.CreateKey(c.cloud, saName, meta.Regional)
if err != nil {
return fmt.Errorf("failed to create key for GCE Service Attachment: %w", err)
}
var existingSA *ga.ServiceAttachment
existingSA, err = c.cloud.Compute().ServiceAttachments().Get(context2.Background(), gceSAKey)
if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return fmt.Errorf("failed querying for GCE Service Attachment: %w", err)
}
gceSvcAttachment := &ga.ServiceAttachment{}
if existingSA != nil {
klog.V(4).Infof("Found existing service attachment %s", existingSA.Name)
*gceSvcAttachment = *existingSA
}
desc := sautils.NewServiceAttachmentDesc(updatedCR.Namespace, updatedCR.Name, c.clusterName, c.clusterLoc, c.regionalCluster)
gceSvcAttachment.ConnectionPreference = svcAttachment.Spec.ConnectionPreference
gceSvcAttachment.Name = saName
gceSvcAttachment.NatSubnets = subnetURLs
gceSvcAttachment.TargetService = frURL
gceSvcAttachment.Region = c.cloud.Region()
gceSvcAttachment.Description = desc.String()
gceSvcAttachment.EnableProxyProtocol = updatedCR.Spec.ProxyProtocol
gceSvcAttachment.ConsumerAcceptLists = convertAllowList(updatedCR.Spec)
gceSvcAttachment.ConsumerRejectLists = updatedCR.Spec.ConsumerRejectList
if existingSA != nil {
// Most of the validation is left to the GCE Service Attachment API. needsUpdate only checks
// to see if the spec has changed and whether an update is necessary.
shouldUpdate, err := needsUpdate(existingSA, gceSvcAttachment)
if err != nil {
return fmt.Errorf("unable to process Service Attachment Update: %w", err)
}
if shouldUpdate {
// In order for the update to be successful, the self link in the target service (same resource
// as the forwarding rule) must be exactly the same. needsUpdate throws an error in situations
// the forwarding rule/targetservice was changed on the spec. GCE API only accepts updates where the
// target service/forwarding rule is the same so to ensure the target service is not changed,
// set the target service to match the existing. Otherwise, a mismatch between the target services
// is possible because the PSC controller generates the GA version of the selflink, while the GCE API
// may use a different version causing the selflink to differ even if the resource is the same.
gceSvcAttachment.TargetService = existingSA.TargetService
klog.V(2).Infof("Service Attachment CR %s/%s was updated. %s requires an update", updatedCR.Namespace, updatedCR.Name, saName)
if err = c.cloud.Compute().ServiceAttachments().Patch(context2.Background(), gceSAKey, gceSvcAttachment); err != nil {
return fmt.Errorf("failed to update GCE Service Attachment: %w", err)
}
}
_, err = c.updateServiceAttachmentStatus(updatedCR, gceSAKey)
return err
}
klog.V(2).Infof("Creating service attachment %s", saName)
if err = c.cloud.Compute().ServiceAttachments().Insert(context2.Background(), gceSAKey, gceSvcAttachment); err != nil {
return fmt.Errorf("failed to create GCE Service Attachment: %w", err)
}
klog.V(2).Infof("Created service attachment %s", saName)
updatedCR, err = c.updateServiceAttachmentStatus(updatedCR, gceSAKey)
klog.V(2).Infof("Updated Service Attachment %s/%s status", updatedCR.Namespace, updatedCR.Name)
if err == nil {
c.recorder(svcAttachment.Namespace).Eventf(svcAttachment, v1.EventTypeNormal, "ServiceAttachmentCreated",
fmt.Sprintf("Service Attachment %s was successfully created.", updatedCR.Status.ServiceAttachmentURL))
}
return err
}