in pkg/dds/store/sync.go [333:391]
func GlobalSyncCallback(
ctx context.Context,
syncer ResourceSyncer,
k8sStore bool,
kubeFactory resources_k8s.KubeFactory,
systemNamespace string,
) *client.Callbacks {
supportsHashSuffixes := dds.ContextHasFeature(ctx, dds.FeatureHashSuffix)
return &client.Callbacks{
OnResourcesReceived: func(upstream client.UpstreamResponse) error {
if !supportsHashSuffixes {
// todo: remove in 2 releases after 2.6.x
upstream.RemovedResourcesKey = util.AddPrefixToResourceKeyNames(upstream.RemovedResourcesKey, upstream.ControlPlaneId)
util.AddPrefixToNames(upstream.AddedResources.GetItems(), upstream.ControlPlaneId)
}
for _, r := range upstream.AddedResources.GetItems() {
r.SetMeta(util.CloneResourceMeta(r.GetMeta(),
util.WithLabel(mesh_proto.ZoneTag, upstream.ControlPlaneId),
util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.ZoneResourceOrigin)),
))
}
if k8sStore {
if err := addNamespaceSuffix(kubeFactory, upstream, systemNamespace); err != nil {
return err
}
}
switch upstream.Type {
case core_mesh.ZoneIngressType:
for _, zi := range upstream.AddedResources.(*core_mesh.ZoneIngressResourceList).Items {
zi.Spec.Zone = upstream.ControlPlaneId
}
case core_mesh.ZoneEgressType:
for _, ze := range upstream.AddedResources.(*core_mesh.ZoneEgressResourceList).Items {
ze.Spec.Zone = upstream.ControlPlaneId
}
case core_mesh.MappingType:
for _, m := range upstream.AddedResources.(*core_mesh.MappingResourceList).Items {
m.Spec.Zone = upstream.ControlPlaneId
}
case core_mesh.MetaDataType:
for _, m := range upstream.AddedResources.(*core_mesh.MetaDataResourceList).Items {
m.Spec.Zone = upstream.ControlPlaneId
}
}
return syncer.Sync(ctx, upstream, PrefilterBy(func(r model.Resource) bool {
if !supportsHashSuffixes {
// todo: remove in 2 releases after 2.6.x
return strings.HasPrefix(r.GetMeta().GetName(), fmt.Sprintf("%s.", upstream.ControlPlaneId))
}
return r.GetMeta().GetLabels()[mesh_proto.ZoneTag] == upstream.ControlPlaneId
}), Zone(upstream.ControlPlaneId))
},
}
}