func Setup()

in pkg/dds/global/components.go [58:169]


func Setup(rt runtime.Runtime) error {
	if rt.Config().Mode != config_core.Global {
		return nil
	}
	reg := registry.Global()
	ddsServer, err := dds_server.New(
		ddsDeltaGlobalLog,
		rt,
		reg.ObjectTypes(model.HasDDSFlag(model.GlobalToZoneSelector)),
		"global",
		rt.Config().Multizone.Global.DDS.RefreshInterval.Duration,
		rt.DDSContext().GlobalProvidedFilter,
		rt.DDSContext().GlobalResourceMapper,
		rt.Config().Multizone.Global.DDS.NackBackoff.Duration,
	)
	if err != nil {
		return err
	}
	resourceSyncer, err := sync_store.NewResourceSyncer(ddsDeltaGlobalLog, rt.ResourceManager(), rt.Transactions(), rt.Extensions())
	if err != nil {
		return err
	}
	kubeFactory := resources_k8s.NewSimpleKubeFactory()

	onGlobalToZoneSyncConnect := mux.OnGlobalToZoneSyncConnectFunc(func(stream mesh_proto.DDSSyncService_GlobalToZoneSyncServer, errChan chan error) {
		zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
		if err != nil {
			errChan <- err
		}
		log := ddsDeltaGlobalLog.WithValues("peer-id", zoneID)
		log = dubbo_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
		log.Info("Global To Zone new session created")
		if err := createZoneIfAbsent(stream.Context(), log, zoneID, rt.ResourceManager()); err != nil {
			errChan <- errors.Wrap(err, "Global CP could not create a zone")
		}
		if err := ddsServer.GlobalToZoneSync(stream); err != nil {
			errChan <- err
		} else {
			log.V(1).Info("GlobalToZoneSync finished gracefully")
		}
	})

	onZoneToGlobalSyncConnect := mux.OnZoneToGlobalSyncConnectFunc(func(stream mesh_proto.DDSSyncService_ZoneToGlobalSyncServer, errChan chan error) {
		zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
		if err != nil {
			errChan <- err
		}
		log := ddsDeltaGlobalLog.WithValues("peer-id", zoneID)
		log = dubbo_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
		ddsStream := dds_client.NewDeltaDDSStream(stream, zoneID, rt, "")
		sink := dds_client.NewDDSSyncClient(
			log,
			reg.ObjectTypes(model.HasDDSFlag(model.ZoneToGlobalFlag)),
			ddsStream,
			dds_sync_store.GlobalSyncCallback(stream.Context(), resourceSyncer, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace),
			rt.Config().Multizone.Global.DDS.ResponseBackoff.Duration,
		)
		go func() {
			if err := sink.Receive(); err != nil {
				errChan <- errors.Wrap(err, "DDSSyncClient finished with an error")
			} else {
				log.V(1).Info("DDSSyncClient finished gracefully")
			}
		}()
	})

	var streamInterceptors []service.StreamInterceptor
	for _, filter := range rt.DDSContext().GlobalServerFilters {
		streamInterceptors = append(streamInterceptors, filter)
	}

	if rt.Config().Multizone.Global.DDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
		zwLog := ddsDeltaGlobalLog.WithName("zone-watch")
		zw, err := mux.NewZoneWatch(
			zwLog,
			rt.Config().Multizone.Global.DDS.ZoneHealthCheck,
			rt.EventBus(),
			rt.ReadOnlyResourceManager(),
			rt.Extensions(),
		)
		if err != nil {
			return errors.Wrap(err, "couldn't create ZoneWatch")
		}
		if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
			return err
		}
	}
	return rt.Add(mux.NewServer(
		rt.DDSContext().GlobalServerFilters,
		rt.DDSContext().ServerStreamInterceptors,
		rt.DDSContext().ServerUnaryInterceptor,
		*rt.Config().Multizone.Global.DDS,
		service.NewGlobalDDSServiceServer(
			rt.AppContext(),
			rt.ResourceManager(),
			rt.GetInstanceId(),
			streamInterceptors,
			rt.Extensions(),
			rt.Config().Store.Upsert,
			rt.EventBus(),
			rt.Config().Multizone.Global.DDS.ZoneHealthCheck.PollInterval.Duration,
		),
		mux.NewDDSSyncServiceServer(
			rt.AppContext(),
			onGlobalToZoneSyncConnect,
			onZoneToGlobalSyncConnect,
			rt.DDSContext().GlobalServerFilters,
			rt.Extensions(),
			rt.EventBus(),
		),
	))
}