in pkg/dds/store/sync.go [121:257]
func (s *syncResourceStore) Sync(syncCtx context.Context, upstreamResponse client.UpstreamResponse, fs ...SyncOptionFunc) error {
now := core.Now()
defer func() {
s.metric.Observe(float64(time.Since(now).Milliseconds()) / 1000)
}()
opts := NewSyncOptions(fs...)
log := s.log.WithValues("type", upstreamResponse.Type)
upstream := upstreamResponse.AddedResources
downstream, err := registry.Global().NewList(upstreamResponse.Type)
if err != nil {
return err
}
if upstreamResponse.IsInitialRequest {
if err := s.resourceManager.List(syncCtx, downstream); err != nil {
return err
}
} else {
upstreamChangeKeys := append(core_model.ResourceListToResourceKeys(upstream), upstreamResponse.RemovedResourcesKey...)
if err := s.resourceManager.List(syncCtx, downstream, store.ListByResourceKeys(upstreamChangeKeys)); err != nil {
return err
}
}
log.V(1).Info("before filtering", "downstream", downstream, "upstream", upstream)
if opts.Predicate != nil {
if filtered, err := filter(downstream, opts.Predicate); err != nil {
return err
} else {
downstream = filtered
}
if filtered, err := filter(upstream, opts.Predicate); err != nil {
return err
} else {
upstream = filtered
}
}
log.V(1).Info("after filtering", "downstream", downstream, "upstream", upstream)
indexedDownstream := newIndexed(downstream)
indexedUpstream := newIndexed(upstream)
var onDelete []core_model.Resource
// 1. delete resources which were removed from the upstream
// on the first request when the control-plane starts we want to sync
// whole the resources in the store. In this case we do not check removed
// resources because we want to make stores synced. When we already
// have resources in the map, we are going to receive only updates
// so we don't want to remove resources haven't changed.
if upstreamResponse.IsInitialRequest {
for _, r := range downstream.GetItems() {
if indexedUpstream.get(core_model.MetaToResourceKey(r.GetMeta())) == nil {
onDelete = append(onDelete, r)
}
}
} else {
for _, rk := range upstreamResponse.RemovedResourcesKey {
// check if we are adding and removing the resource at the same time
if r := indexedUpstream.get(rk); r != nil {
// it isn't remove but update
continue
}
if r := indexedDownstream.get(rk); r != nil {
onDelete = append(onDelete, r)
}
}
}
// 2. create resources which are not represented in 'downstream' and update the rest of them
var (
onCreate []core_model.Resource
onUpdate []OnUpdate
)
for _, r := range upstream.GetItems() {
existing := indexedDownstream.get(core_model.MetaToResourceKey(r.GetMeta()))
if existing == nil {
onCreate = append(onCreate, r)
continue
}
newLabels := r.GetMeta().GetLabels()
if !core_model.Equal(existing.GetSpec(), r.GetSpec()) || !maps.Equal(existing.GetMeta().GetLabels(), newLabels) {
// we have to use meta of the current Store during update, because some Stores (Kubernetes, Memory)
// expect to receive ResourceMeta of own type.
r.SetMeta(existing.GetMeta())
onUpdate = append(onUpdate, OnUpdate{r: r, opts: []store.UpdateOptionsFunc{store.UpdateWithLabels(newLabels)}})
}
}
zone := system.NewZoneResource()
if opts.Zone != "" && len(onCreate) > 0 {
if err := s.resourceManager.Get(syncCtx, zone, store.GetByKey(opts.Zone, core_model.NoMesh)); err != nil {
return err
}
}
return store.InTx(syncCtx, s.transactions, func(ctx context.Context) error {
for _, r := range onDelete {
rk := core_model.MetaToResourceKey(r.GetMeta())
log.Info("deleting a resource since it's no longer available in the upstream", "name", r.GetMeta().GetName(), "mesh", r.GetMeta().GetMesh())
if err := s.resourceManager.Delete(ctx, r, store.DeleteBy(rk)); err != nil {
return err
}
}
for _, r := range onCreate {
rk := core_model.MetaToResourceKey(r.GetMeta())
log.Info("creating a new resource from upstream", "name", r.GetMeta().GetName(), "mesh", r.GetMeta().GetMesh())
createOpts := []store.CreateOptionsFunc{
store.CreateBy(rk),
store.CreatedAt(core.Now()),
store.CreateWithLabels(r.GetMeta().GetLabels()),
}
if opts.Zone != "" {
createOpts = append(createOpts, store.CreateWithOwner(zone))
}
// some Stores try to cast ResourceMeta to own Store type that's why we have to set meta to nil
r.SetMeta(nil)
if err := s.resourceManager.Create(ctx, r, createOpts...); err != nil {
return err
}
}
for _, upd := range onUpdate {
log.V(1).Info("updating a resource", "name", upd.r.GetMeta().GetName(), "mesh", upd.r.GetMeta().GetMesh())
now := time.Now()
// some stores manage ModificationTime time on they own (Kubernetes), in order to be consistent
// we set ModificationTime when we add to downstream store. This time is almost the same with ModificationTime
// from upstream store, because we update downstream only when resource have changed in upstream
if err := s.resourceManager.Update(ctx, upd.r, append(upd.opts, store.ModifiedAt(now))...); err != nil {
return err
}
}
return nil
})
}