in pkg/providers/apisix/apisix_upstream.go [134:280]
func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) error {
event := ev.Object.(kube.ApisixUpstreamEvent)
key := event.Key
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorf("found ApisixUpstream resource with invalid meta namespace key %s: %s", key, err)
return err
}
var multiVersioned kube.ApisixUpstream
switch event.GroupVersion {
case config.ApisixV2:
multiVersioned, err = c.ApisixUpstreamLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixUpstream group version %s", event.GroupVersion)
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get ApisixUpstream",
zap.Error(err),
zap.String("key", key),
zap.String("version", event.GroupVersion),
)
return err
}
if ev.Type == types.EventSync {
// ignore not found error in delay sync
return nil
}
if ev.Type != types.EventDelete {
log.Warnw("ApisixUpstream was deleted before it can be delivered",
zap.String("key", key),
zap.String("version", event.GroupVersion),
)
// Don't need to retry.
return nil
}
}
if ev.Type == types.EventDelete {
if multiVersioned != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnf("discard the stale ApisixUpstream delete event since the %s exists", key)
return nil
}
multiVersioned = ev.Tombstone.(kube.ApisixUpstream)
}
c.syncRelationship(ev, key, multiVersioned)
var errRecord error
switch event.GroupVersion {
case config.ApisixV2:
au := multiVersioned.V2()
if au.Spec == nil {
return nil
}
// We will prioritize ExternalNodes and Discovery.
if len(au.Spec.ExternalNodes) != 0 || au.Spec.Discovery != nil {
var newUps *apisixv1.Upstream
if ev.Type != types.EventDelete {
cfg := &au.Spec.ApisixUpstreamConfig
newUps, err = c.translator.TranslateUpstreamConfigV2(cfg)
if err != nil {
log.Errorw("failed to translate upstream config",
zap.Any("object", au),
zap.Error(err),
)
errRecord = err
goto updateStatus
}
}
if len(au.Spec.ExternalNodes) != 0 {
errRecord = c.updateExternalNodes(ctx, au, nil, newUps, au.Namespace, au.Name, ev.Type.IsSyncEvent())
goto updateStatus
}
// for service discovery related configuration
if au.Spec.Discovery.ServiceName == "" || au.Spec.Discovery.Type == "" {
log.Error("If you setup Discovery for ApisixUpstream, you need to specify the ServiceName and Type fields.")
errRecord = fmt.Errorf("No ServiceName or Type fields found")
goto updateStatus
}
// updateUpstream for real
upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
errRecord = c.updateUpstream(ctx, upsName, &au.Spec.ApisixUpstreamConfig, ev.Type.IsSyncEvent())
goto updateStatus
}
var portLevelSettings map[int32]configv2.ApisixUpstreamConfig
if len(au.Spec.PortLevelSettings) > 0 {
portLevelSettings = make(map[int32]configv2.ApisixUpstreamConfig, len(au.Spec.PortLevelSettings))
for _, port := range au.Spec.PortLevelSettings {
portLevelSettings[port.Port] = port.ApisixUpstreamConfig
}
}
svc, err := c.SvcLister.Services(namespace).Get(name)
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
errRecord = err
goto updateStatus
}
var subsets []configv2.ApisixUpstreamSubset
subsets = append(subsets, configv2.ApisixUpstreamSubset{})
if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
var cfg configv2.ApisixUpstreamConfig
if ev.Type != types.EventDelete {
var ok bool
cfg, ok = portLevelSettings[port.Port]
if !ok {
cfg = au.Spec.ApisixUpstreamConfig
}
}
err := c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Endpoint), &cfg, ev.Type.IsSyncEvent())
if err != nil {
errRecord = err
goto updateStatus
}
err = c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Service), &cfg, ev.Type.IsSyncEvent())
if err != nil {
errRecord = err
goto updateStatus
}
}
}
}
updateStatus:
c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
c.updateStatus(multiVersioned, errRecord)
return true, nil
})
return errRecord
}