func Setup()

in pkg/dds/zone/components.go [42:125]


func Setup(rt core_runtime.Runtime) error {
	if !rt.Config().IsFederatedZoneCP() {
		return nil
	}
	zone := rt.Config().Multizone.Zone.Name
	reg := registry.Global()
	ddsCtx := rt.DDSContext()
	ddsServer, err := dds_server.New(
		ddsDeltaZoneLog,
		rt,
		reg.ObjectTypes(model.HasDDSFlag(model.ZoneToGlobalFlag)),
		zone,
		rt.Config().Multizone.Zone.DDS.RefreshInterval.Duration,
		ddsCtx.ZoneProvidedFilter,
		ddsCtx.ZoneResourceMapper,
		rt.Config().Multizone.Zone.DDS.NackBackoff.Duration,
	)
	if err != nil {
		return err
	}
	resourceSyncer, err := dds_sync_store.NewResourceSyncer(ddsDeltaZoneLog, rt.ResourceManager(), rt.Transactions(), rt.Extensions())
	if err != nil {
		return err
	}
	kubeFactory := resources_k8s.NewSimpleKubeFactory()
	cfg := rt.Config()
	cfgForDisplay, err := config.ConfigForDisplay(&cfg)
	if err != nil {
		return errors.Wrap(err, "could not construct config for display")
	}
	cfgJson, err := config.ToJson(cfgForDisplay)
	if err != nil {
		return errors.Wrap(err, "could not marshall config to json")
	}

	onGlobalToZoneSyncStarted := mux.OnGlobalToZoneSyncStartedFunc(func(stream mesh_proto.DDSSyncService_GlobalToZoneSyncClient, errChan chan error) {
		log := ddsDeltaZoneLog.WithValues("dds-version", "v2")
		syncClient := dds_client.NewDDSSyncClient(
			log,
			reg.ObjectTypes(model.HasDDSFlag(model.GlobalToZoneSelector)),
			dds_client.NewDeltaDDSStream(stream, zone, rt, string(cfgJson)),
			dds_sync_store.ZoneSyncCallback(
				stream.Context(),
				rt.DDSContext().Configs,
				resourceSyncer,
				rt.Config().Store.Type == store.KubernetesStore,
				zone,
				kubeFactory,
				rt.Config().Store.Kubernetes.SystemNamespace,
			),
			rt.Config().Multizone.Zone.DDS.ResponseBackoff.Duration,
		)
		go func() {
			if err := syncClient.Receive(); err != nil {
				errChan <- errors.Wrap(err, "GlobalToZoneSyncClient finished with an error")
			} else {
				log.V(1).Info("GlobalToZoneSyncClient finished gracefully")
			}
		}()
	})

	onZoneToGlobalSyncStarted := mux.OnZoneToGlobalSyncStartedFunc(func(stream mesh_proto.DDSSyncService_ZoneToGlobalSyncClient, errChan chan error) {
		log := ddsDeltaZoneLog.WithValues("dds-version", "v2", "peer-id", "global")
		log.Info("ZoneToGlobalSync new session created")
		session := dds_server.NewServerStream(stream)
		go func() {
			if err := ddsServer.ZoneToGlobal(session); err != nil {
				errChan <- errors.Wrap(err, "ZoneToGlobalSync finished with an error")
			} else {
				log.V(1).Info("ZoneToGlobalSync finished gracefully")
			}
		}()
	})

	muxClient := mux.NewClient(
		rt.DDSContext().ZoneClientCtx,
		rt.Config().Multizone.Zone.GlobalAddress,
		zone,
		onGlobalToZoneSyncStarted,
		onZoneToGlobalSyncStarted,
		*rt.Config().Multizone.Zone.DDS,
	)
	return rt.Add(component.NewResilientComponent(ddsDeltaZoneLog.WithName("dds-mux-client"), muxClient))
}