func()

in pkg/dds/store/sync.go [121:257]


func (s *syncResourceStore) Sync(syncCtx context.Context, upstreamResponse client.UpstreamResponse, fs ...SyncOptionFunc) error {
	now := core.Now()
	defer func() {
		s.metric.Observe(float64(time.Since(now).Milliseconds()) / 1000)
	}()
	opts := NewSyncOptions(fs...)
	log := s.log.WithValues("type", upstreamResponse.Type)
	upstream := upstreamResponse.AddedResources
	downstream, err := registry.Global().NewList(upstreamResponse.Type)
	if err != nil {
		return err
	}
	if upstreamResponse.IsInitialRequest {
		if err := s.resourceManager.List(syncCtx, downstream); err != nil {
			return err
		}
	} else {
		upstreamChangeKeys := append(core_model.ResourceListToResourceKeys(upstream), upstreamResponse.RemovedResourcesKey...)
		if err := s.resourceManager.List(syncCtx, downstream, store.ListByResourceKeys(upstreamChangeKeys)); err != nil {
			return err
		}
	}
	log.V(1).Info("before filtering", "downstream", downstream, "upstream", upstream)

	if opts.Predicate != nil {
		if filtered, err := filter(downstream, opts.Predicate); err != nil {
			return err
		} else {
			downstream = filtered
		}
		if filtered, err := filter(upstream, opts.Predicate); err != nil {
			return err
		} else {
			upstream = filtered
		}
	}
	log.V(1).Info("after filtering", "downstream", downstream, "upstream", upstream)

	indexedDownstream := newIndexed(downstream)
	indexedUpstream := newIndexed(upstream)

	var onDelete []core_model.Resource
	// 1. delete resources which were removed from the upstream
	// on the first request when the control-plane starts we want to sync
	// whole the resources in the store. In this case we do not check removed
	// resources because we want to make stores synced. When we already
	// have resources in the map, we are going to receive only updates
	// so we don't want to remove resources haven't changed.
	if upstreamResponse.IsInitialRequest {
		for _, r := range downstream.GetItems() {
			if indexedUpstream.get(core_model.MetaToResourceKey(r.GetMeta())) == nil {
				onDelete = append(onDelete, r)
			}
		}
	} else {
		for _, rk := range upstreamResponse.RemovedResourcesKey {
			// check if we are adding and removing the resource at the same time
			if r := indexedUpstream.get(rk); r != nil {
				// it isn't remove but update
				continue
			}
			if r := indexedDownstream.get(rk); r != nil {
				onDelete = append(onDelete, r)
			}
		}
	}

	// 2. create resources which are not represented in 'downstream' and update the rest of them
	var (
		onCreate []core_model.Resource
		onUpdate []OnUpdate
	)
	for _, r := range upstream.GetItems() {
		existing := indexedDownstream.get(core_model.MetaToResourceKey(r.GetMeta()))
		if existing == nil {
			onCreate = append(onCreate, r)
			continue
		}
		newLabels := r.GetMeta().GetLabels()
		if !core_model.Equal(existing.GetSpec(), r.GetSpec()) || !maps.Equal(existing.GetMeta().GetLabels(), newLabels) {
			// we have to use meta of the current Store during update, because some Stores (Kubernetes, Memory)
			// expect to receive ResourceMeta of own type.
			r.SetMeta(existing.GetMeta())
			onUpdate = append(onUpdate, OnUpdate{r: r, opts: []store.UpdateOptionsFunc{store.UpdateWithLabels(newLabels)}})
		}
	}

	zone := system.NewZoneResource()
	if opts.Zone != "" && len(onCreate) > 0 {
		if err := s.resourceManager.Get(syncCtx, zone, store.GetByKey(opts.Zone, core_model.NoMesh)); err != nil {
			return err
		}
	}

	return store.InTx(syncCtx, s.transactions, func(ctx context.Context) error {
		for _, r := range onDelete {
			rk := core_model.MetaToResourceKey(r.GetMeta())
			log.Info("deleting a resource since it's no longer available in the upstream", "name", r.GetMeta().GetName(), "mesh", r.GetMeta().GetMesh())
			if err := s.resourceManager.Delete(ctx, r, store.DeleteBy(rk)); err != nil {
				return err
			}
		}

		for _, r := range onCreate {
			rk := core_model.MetaToResourceKey(r.GetMeta())
			log.Info("creating a new resource from upstream", "name", r.GetMeta().GetName(), "mesh", r.GetMeta().GetMesh())

			createOpts := []store.CreateOptionsFunc{
				store.CreateBy(rk),
				store.CreatedAt(core.Now()),
				store.CreateWithLabels(r.GetMeta().GetLabels()),
			}
			if opts.Zone != "" {
				createOpts = append(createOpts, store.CreateWithOwner(zone))
			}

			// some Stores try to cast ResourceMeta to own Store type that's why we have to set meta to nil
			r.SetMeta(nil)

			if err := s.resourceManager.Create(ctx, r, createOpts...); err != nil {
				return err
			}
		}

		for _, upd := range onUpdate {
			log.V(1).Info("updating a resource", "name", upd.r.GetMeta().GetName(), "mesh", upd.r.GetMeta().GetMesh())
			now := time.Now()
			// some stores manage ModificationTime time on they own (Kubernetes), in order to be consistent
			// we set ModificationTime when we add to downstream store. This time is almost the same with ModificationTime
			// from upstream store, because we update downstream only when resource have changed in upstream
			if err := s.resourceManager.Update(ctx, upd.r, append(upd.opts, store.ModifiedAt(now))...); err != nil {
				return err
			}
		}
		return nil
	})
}