func()

in pkg/controller/ingress/reconcile/applier/server_group.go [95:215]


func (s *serverGroupApplier) Apply(ctx context.Context) error {
	traceID := ctx.Value(util.TraceID)

	var resSGPs []*albmodel.ServerGroup
	s.stack.ListResources(&resSGPs)

	sdkSGPs, err := s.findSDKServerGroups(ctx)
	if err != nil {
		return err
	}

	matchedResAndSDKSGPs, unmatchedResSGPs, unmatchedSDKSGPs, err := matchResAndSDKServerGroupsSGP(resSGPs, sdkSGPs, s.trackingProvider.ResourceIDTagKey())
	if err != nil {
		return err
	}

	if len(matchedResAndSDKSGPs) != 0 {
		s.logger.V(util.SynLogLevel).Info("apply serverGroups",
			"matchedResAndSDKSGPs", matchedResAndSDKSGPs,
			"traceID", traceID)
	}
	if len(unmatchedResSGPs) != 0 {
		s.logger.V(util.SynLogLevel).Info("apply serverGroups",
			"unmatchedResSGPs", unmatchedResSGPs,
			"traceID", traceID)
	}
	if len(unmatchedSDKSGPs) != 0 {
		s.logger.V(util.SynLogLevel).Info("apply serverGroups",
			"unmatchedSDKSGPs", unmatchedSDKSGPs,
			"traceID", traceID)
	}

	s.unmatchedSDKSGPs = unmatchedSDKSGPs

	var (
		errCreate error
		wgCreate  sync.WaitGroup
		chCreate  = make(chan struct{}, util.ServerGroupConcurrentNum)
	)
	for _, resSGP := range unmatchedResSGPs {
		chCreate <- struct{}{}
		wgCreate.Add(1)

		go func(res *albmodel.ServerGroup) {
			util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)

			defer func() {
				wgCreate.Done()
				<-chCreate
			}()

			sgpStatus, errOnce := s.albProvider.CreateALBServerGroup(ctx, res, s.trackingProvider)
			if errCreate == nil && errOnce != nil {
				errCreate = errOnce
				return
			}
			res.SetStatus(sgpStatus)

			if strings.Contains(res.Spec.ServerGroupNamedKey.IngressName, util.DefaultListenerFlag) {
				return
			}
			if errOnce = s.addServerToServerGroup(ctx, sgpStatus.ServerGroupID, types.NamespacedName{
				Namespace: res.Spec.ServerGroupNamedKey.Namespace,
				Name:      res.Spec.ServerGroupNamedKey.ServiceName},
				intstr.FromInt(res.Spec.ServerGroupNamedKey.ServicePort)); errCreate == nil && errOnce != nil {
				if errTmp := s.albProvider.DeleteALBServerGroup(ctx, sgpStatus.ServerGroupID); errTmp != nil {
					s.logger.V(util.SynLogLevel).Error(errTmp, "apply serverGroups roll back server group failed",
						"serverGroupID", sgpStatus.ServerGroupID, "traceID", traceID)
				}
				errCreate = errOnce
				return
			}
		}(resSGP)
	}
	wgCreate.Wait()
	if errCreate != nil {
		return errCreate
	}

	var (
		errUpdate error
		wgUpdate  sync.WaitGroup
		chUpdate  = make(chan struct{}, util.ServerGroupConcurrentNum)
	)
	for _, resAndSDKSGP := range matchedResAndSDKSGPs {
		chUpdate <- struct{}{}
		wgUpdate.Add(1)

		go func(resSGP *albmodel.ServerGroup, sdkSGP albmodel.ServerGroupWithTags) {
			util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)

			defer func() {
				wgUpdate.Done()
				<-chUpdate
			}()

			sgpStatus, errOnce := s.albProvider.UpdateALBServerGroup(ctx, resSGP, sdkSGP)
			if errUpdate == nil && errOnce != nil {
				errUpdate = errOnce
			}
			resSGP.SetStatus(sgpStatus)

			if strings.Contains(resSGP.Spec.ServerGroupNamedKey.IngressName, util.DefaultListenerFlag) {
				return
			}
			if errOnce = s.removeServerFromServerGroup(ctx, sgpStatus.ServerGroupID, types.NamespacedName{
				Namespace: resSGP.Spec.ServerGroupNamedKey.Namespace,
				Name:      resSGP.Spec.ServerGroupNamedKey.ServiceName},
				intstr.FromInt(resSGP.Spec.ServerGroupNamedKey.ServicePort)); errCreate == nil && errOnce != nil {
				errCreate = errOnce
				return
			}
		}(resAndSDKSGP.ResSGP, resAndSDKSGP.SdkSGP)
	}
	wgUpdate.Wait()
	if errUpdate != nil {
		return errUpdate
	}

	return nil
}