in pkg/controller/ingress/reconcile/applier/server_group.go [95:215]
func (s *serverGroupApplier) Apply(ctx context.Context) error {
traceID := ctx.Value(util.TraceID)
var resSGPs []*albmodel.ServerGroup
s.stack.ListResources(&resSGPs)
sdkSGPs, err := s.findSDKServerGroups(ctx)
if err != nil {
return err
}
matchedResAndSDKSGPs, unmatchedResSGPs, unmatchedSDKSGPs, err := matchResAndSDKServerGroupsSGP(resSGPs, sdkSGPs, s.trackingProvider.ResourceIDTagKey())
if err != nil {
return err
}
if len(matchedResAndSDKSGPs) != 0 {
s.logger.V(util.SynLogLevel).Info("apply serverGroups",
"matchedResAndSDKSGPs", matchedResAndSDKSGPs,
"traceID", traceID)
}
if len(unmatchedResSGPs) != 0 {
s.logger.V(util.SynLogLevel).Info("apply serverGroups",
"unmatchedResSGPs", unmatchedResSGPs,
"traceID", traceID)
}
if len(unmatchedSDKSGPs) != 0 {
s.logger.V(util.SynLogLevel).Info("apply serverGroups",
"unmatchedSDKSGPs", unmatchedSDKSGPs,
"traceID", traceID)
}
s.unmatchedSDKSGPs = unmatchedSDKSGPs
var (
errCreate error
wgCreate sync.WaitGroup
chCreate = make(chan struct{}, util.ServerGroupConcurrentNum)
)
for _, resSGP := range unmatchedResSGPs {
chCreate <- struct{}{}
wgCreate.Add(1)
go func(res *albmodel.ServerGroup) {
util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)
defer func() {
wgCreate.Done()
<-chCreate
}()
sgpStatus, errOnce := s.albProvider.CreateALBServerGroup(ctx, res, s.trackingProvider)
if errCreate == nil && errOnce != nil {
errCreate = errOnce
return
}
res.SetStatus(sgpStatus)
if strings.Contains(res.Spec.ServerGroupNamedKey.IngressName, util.DefaultListenerFlag) {
return
}
if errOnce = s.addServerToServerGroup(ctx, sgpStatus.ServerGroupID, types.NamespacedName{
Namespace: res.Spec.ServerGroupNamedKey.Namespace,
Name: res.Spec.ServerGroupNamedKey.ServiceName},
intstr.FromInt(res.Spec.ServerGroupNamedKey.ServicePort)); errCreate == nil && errOnce != nil {
if errTmp := s.albProvider.DeleteALBServerGroup(ctx, sgpStatus.ServerGroupID); errTmp != nil {
s.logger.V(util.SynLogLevel).Error(errTmp, "apply serverGroups roll back server group failed",
"serverGroupID", sgpStatus.ServerGroupID, "traceID", traceID)
}
errCreate = errOnce
return
}
}(resSGP)
}
wgCreate.Wait()
if errCreate != nil {
return errCreate
}
var (
errUpdate error
wgUpdate sync.WaitGroup
chUpdate = make(chan struct{}, util.ServerGroupConcurrentNum)
)
for _, resAndSDKSGP := range matchedResAndSDKSGPs {
chUpdate <- struct{}{}
wgUpdate.Add(1)
go func(resSGP *albmodel.ServerGroup, sdkSGP albmodel.ServerGroupWithTags) {
util.RandomSleepFunc(util.ConcurrentMaxSleepMillisecondTime)
defer func() {
wgUpdate.Done()
<-chUpdate
}()
sgpStatus, errOnce := s.albProvider.UpdateALBServerGroup(ctx, resSGP, sdkSGP)
if errUpdate == nil && errOnce != nil {
errUpdate = errOnce
}
resSGP.SetStatus(sgpStatus)
if strings.Contains(resSGP.Spec.ServerGroupNamedKey.IngressName, util.DefaultListenerFlag) {
return
}
if errOnce = s.removeServerFromServerGroup(ctx, sgpStatus.ServerGroupID, types.NamespacedName{
Namespace: resSGP.Spec.ServerGroupNamedKey.Namespace,
Name: resSGP.Spec.ServerGroupNamedKey.ServiceName},
intstr.FromInt(resSGP.Spec.ServerGroupNamedKey.ServicePort)); errCreate == nil && errOnce != nil {
errCreate = errOnce
return
}
}(resAndSDKSGP.ResSGP, resAndSDKSGP.SdkSGP)
}
wgUpdate.Wait()
if errUpdate != nil {
return errUpdate
}
return nil
}