func()

in pkg/psc/controller.go [262:388]


func (c *Controller) processServiceAttachment(key string) error {
	start := time.Now()
	// NOTE: Error will be used to send metrics about whether the sync loop was successful
	// Please reuse and set err before returning
	var err error
	defer func() {
		metrics.PublishPSCProcessMetrics(metrics.SyncProcess, filterError(err), start)
		metrics.PublishLastProcessTimestampMetrics(metrics.SyncProcess)
		c.collector.SetServiceAttachment(key, metrics.PSCState{InSuccess: err == nil})
	}()

	var namespace, name string
	namespace, name, err = cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	var obj interface{}
	var exists bool
	obj, exists, err = c.svcAttachmentLister.GetByKey(key)
	if err != nil {
		return fmt.Errorf("errored getting service from store: %w", err)
	}

	if !exists {
		// Allow Garbage Collection to Delete Service Attachment
		klog.V(2).Infof("Service attachment %s/%s does not exist in store. Will be cleaned up by GC", namespace, name)
		return nil
	}
	klog.V(2).Infof("Processing Service attachment %s/%s", namespace, name)
	defer klog.V(4).Infof("Finished processing service attachment %s/%s", namespace, name)

	svcAttachment := obj.(*sav1.ServiceAttachment)
	var updatedCR *sav1.ServiceAttachment
	updatedCR, err = c.ensureSAFinalizer(svcAttachment)
	if err != nil {
		return fmt.Errorf("Errored adding finalizer on ServiceAttachment CR %s/%s: %w", namespace, name, err)
	}
	if err = validateResourceReference(updatedCR.Spec.ResourceRef); err != nil {
		return err
	}

	var frURL string
	frURL, err = c.getForwardingRule(namespace, updatedCR.Spec.ResourceRef.Name)
	if err != nil {
		return fmt.Errorf("failed to find forwarding rule: %w", err)
	}

	var subnetURLs []string
	subnetURLs, err = c.getSubnetURLs(updatedCR.Spec.NATSubnets)
	if err != nil {
		return fmt.Errorf("failed to find nat subnets: %w", err)
	}

	saName := c.saNamer.ServiceAttachment(namespace, name, string(updatedCR.UID))
	var gceSAKey *meta.Key
	gceSAKey, err = composite.CreateKey(c.cloud, saName, meta.Regional)
	if err != nil {
		return fmt.Errorf("failed to create key for GCE Service Attachment: %w", err)
	}
	var existingSA *ga.ServiceAttachment
	existingSA, err = c.cloud.Compute().ServiceAttachments().Get(context2.Background(), gceSAKey)
	if err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
		return fmt.Errorf("failed querying for GCE Service Attachment: %w", err)
	}

	gceSvcAttachment := &ga.ServiceAttachment{}
	if existingSA != nil {
		klog.V(4).Infof("Found existing service attachment %s", existingSA.Name)
		*gceSvcAttachment = *existingSA
	}

	desc := sautils.NewServiceAttachmentDesc(updatedCR.Namespace, updatedCR.Name, c.clusterName, c.clusterLoc, c.regionalCluster)
	gceSvcAttachment.ConnectionPreference = svcAttachment.Spec.ConnectionPreference
	gceSvcAttachment.Name = saName
	gceSvcAttachment.NatSubnets = subnetURLs
	gceSvcAttachment.TargetService = frURL
	gceSvcAttachment.Region = c.cloud.Region()
	gceSvcAttachment.Description = desc.String()
	gceSvcAttachment.EnableProxyProtocol = updatedCR.Spec.ProxyProtocol
	gceSvcAttachment.ConsumerAcceptLists = convertAllowList(updatedCR.Spec)
	gceSvcAttachment.ConsumerRejectLists = updatedCR.Spec.ConsumerRejectList

	if existingSA != nil {
		// Most of the validation is left to the GCE Service Attachment API. needsUpdate only checks
		// to see if the spec has changed and whether an update is necessary.
		shouldUpdate, err := needsUpdate(existingSA, gceSvcAttachment)
		if err != nil {
			return fmt.Errorf("unable to process Service Attachment Update: %w", err)
		}

		if shouldUpdate {
			// In order for the update to be successful, the self link in the target service (same resource
			// as the forwarding rule) must be exactly the same. needsUpdate throws an error in situations
			// the forwarding rule/targetservice was changed on the spec. GCE API only accepts updates where the
			// target service/forwarding rule is the same so to ensure the target service is not changed,
			// set the target service to match the existing. Otherwise, a mismatch between the target services
			// is possible because the PSC controller generates the GA version of the selflink, while the GCE API
			// may use a different version causing the selflink to differ even if the resource is the same.
			gceSvcAttachment.TargetService = existingSA.TargetService

			klog.V(2).Infof("Service Attachment CR %s/%s was updated. %s requires an update", updatedCR.Namespace, updatedCR.Name, saName)
			if err = c.cloud.Compute().ServiceAttachments().Patch(context2.Background(), gceSAKey, gceSvcAttachment); err != nil {
				return fmt.Errorf("failed to update GCE Service Attachment: %w", err)
			}
		}

		_, err = c.updateServiceAttachmentStatus(updatedCR, gceSAKey)
		return err
	}

	klog.V(2).Infof("Creating service attachment %s", saName)
	if err = c.cloud.Compute().ServiceAttachments().Insert(context2.Background(), gceSAKey, gceSvcAttachment); err != nil {
		return fmt.Errorf("failed to create GCE Service Attachment: %w", err)
	}
	klog.V(2).Infof("Created service attachment %s", saName)

	updatedCR, err = c.updateServiceAttachmentStatus(updatedCR, gceSAKey)
	klog.V(2).Infof("Updated Service Attachment %s/%s status", updatedCR.Namespace, updatedCR.Name)

	if err == nil {
		c.recorder(svcAttachment.Namespace).Eventf(svcAttachment, v1.EventTypeNormal, "ServiceAttachmentCreated",
			fmt.Sprintf("Service Attachment %s was successfully created.", updatedCR.Status.ServiceAttachmentURL))
	}

	return err
}