func()

in pkg/controller/ingress/reconcile/applier/listener.go [69:163]


func (s *listenerApplier) applyListenersOnLB(ctx context.Context, lbID string, resLSs []*albmodel.Listener) error {
	if len(lbID) == 0 {
		return fmt.Errorf("empty loadBalancer id when apply listeners error")
	}

	traceID := ctx.Value(util.TraceID)

	sdkLSs, err := s.findSDKListenersOnLB(ctx, lbID)
	if err != nil {
		return err
	}
	matchedResAndSDKLSs, unmatchedResLSs, unmatchedSDKLSs := matchResAndSDKListeners(resLSs, sdkLSs)

	if len(matchedResAndSDKLSs) != 0 {
		s.logger.V(util.SynLogLevel).Info("apply listeners",
			"matchedResAndSDKLSs", matchedResAndSDKLSs,
			"traceID", traceID)
	}
	if len(unmatchedResLSs) != 0 {
		s.logger.V(util.SynLogLevel).Info("apply listeners",
			"unmatchedResLSs", unmatchedResLSs,
			"traceID", traceID)
	}
	if len(unmatchedSDKLSs) != 0 {
		s.logger.V(util.SynLogLevel).Info("apply listeners",
			"unmatchedSDKLSs", unmatchedSDKLSs,
			"traceID", traceID)
	}

	var (
		errDelete error
		wgDelete  sync.WaitGroup
	)
	for _, sdkLS := range unmatchedSDKLSs {
		wgDelete.Add(1)
		go func(sdkLS albsdk.Listener) {
			util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)

			defer wgDelete.Done()
			if err := s.albProvider.DeleteALB(ctx, sdkLS.ListenerId); errDelete == nil && err != nil {
				errDelete = err
			}
		}(sdkLS)
	}
	wgDelete.Wait()
	if errDelete != nil {
		return errDelete
	}

	var (
		errCreate error
		wgCreate  sync.WaitGroup
	)
	for _, resLS := range unmatchedResLSs {
		wgCreate.Add(1)
		go func(resLS *albmodel.Listener) {
			util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)

			defer wgCreate.Done()
			lsStatus, err := s.albProvider.CreateALBListener(ctx, resLS)
			if errCreate == nil && err != nil {
				errCreate = err
			}
			resLS.SetStatus(lsStatus)
		}(resLS)
	}
	wgCreate.Wait()
	if errCreate != nil {
		return errCreate
	}

	var (
		errUpdate error
		wgUpdate  sync.WaitGroup
	)
	for _, resAndSDKLS := range matchedResAndSDKLSs {
		wgUpdate.Add(1)
		go func(resLs *albmodel.Listener, sdkLs *albsdk.Listener) {
			util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)

			defer wgUpdate.Done()
			lsStatus, err := s.albProvider.UpdateALBListener(ctx, resLs, sdkLs)
			if errUpdate == nil && err != nil {
				errUpdate = err
			}
			resLs.SetStatus(lsStatus)
		}(resAndSDKLS.resLS, resAndSDKLS.sdkLS)
	}
	wgUpdate.Wait()
	if errUpdate != nil {
		return errUpdate
	}

	return nil
}