in pkg/dds/global/components.go [58:169]
func Setup(rt runtime.Runtime) error {
if rt.Config().Mode != config_core.Global {
return nil
}
reg := registry.Global()
ddsServer, err := dds_server.New(
ddsDeltaGlobalLog,
rt,
reg.ObjectTypes(model.HasDDSFlag(model.GlobalToZoneSelector)),
"global",
rt.Config().Multizone.Global.DDS.RefreshInterval.Duration,
rt.DDSContext().GlobalProvidedFilter,
rt.DDSContext().GlobalResourceMapper,
rt.Config().Multizone.Global.DDS.NackBackoff.Duration,
)
if err != nil {
return err
}
resourceSyncer, err := sync_store.NewResourceSyncer(ddsDeltaGlobalLog, rt.ResourceManager(), rt.Transactions(), rt.Extensions())
if err != nil {
return err
}
kubeFactory := resources_k8s.NewSimpleKubeFactory()
onGlobalToZoneSyncConnect := mux.OnGlobalToZoneSyncConnectFunc(func(stream mesh_proto.DDSSyncService_GlobalToZoneSyncServer, errChan chan error) {
zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
errChan <- err
}
log := ddsDeltaGlobalLog.WithValues("peer-id", zoneID)
log = dubbo_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
log.Info("Global To Zone new session created")
if err := createZoneIfAbsent(stream.Context(), log, zoneID, rt.ResourceManager()); err != nil {
errChan <- errors.Wrap(err, "Global CP could not create a zone")
}
if err := ddsServer.GlobalToZoneSync(stream); err != nil {
errChan <- err
} else {
log.V(1).Info("GlobalToZoneSync finished gracefully")
}
})
onZoneToGlobalSyncConnect := mux.OnZoneToGlobalSyncConnectFunc(func(stream mesh_proto.DDSSyncService_ZoneToGlobalSyncServer, errChan chan error) {
zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
errChan <- err
}
log := ddsDeltaGlobalLog.WithValues("peer-id", zoneID)
log = dubbo_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
ddsStream := dds_client.NewDeltaDDSStream(stream, zoneID, rt, "")
sink := dds_client.NewDDSSyncClient(
log,
reg.ObjectTypes(model.HasDDSFlag(model.ZoneToGlobalFlag)),
ddsStream,
dds_sync_store.GlobalSyncCallback(stream.Context(), resourceSyncer, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace),
rt.Config().Multizone.Global.DDS.ResponseBackoff.Duration,
)
go func() {
if err := sink.Receive(); err != nil {
errChan <- errors.Wrap(err, "DDSSyncClient finished with an error")
} else {
log.V(1).Info("DDSSyncClient finished gracefully")
}
}()
})
var streamInterceptors []service.StreamInterceptor
for _, filter := range rt.DDSContext().GlobalServerFilters {
streamInterceptors = append(streamInterceptors, filter)
}
if rt.Config().Multizone.Global.DDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
zwLog := ddsDeltaGlobalLog.WithName("zone-watch")
zw, err := mux.NewZoneWatch(
zwLog,
rt.Config().Multizone.Global.DDS.ZoneHealthCheck,
rt.EventBus(),
rt.ReadOnlyResourceManager(),
rt.Extensions(),
)
if err != nil {
return errors.Wrap(err, "couldn't create ZoneWatch")
}
if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
return err
}
}
return rt.Add(mux.NewServer(
rt.DDSContext().GlobalServerFilters,
rt.DDSContext().ServerStreamInterceptors,
rt.DDSContext().ServerUnaryInterceptor,
*rt.Config().Multizone.Global.DDS,
service.NewGlobalDDSServiceServer(
rt.AppContext(),
rt.ResourceManager(),
rt.GetInstanceId(),
streamInterceptors,
rt.Extensions(),
rt.Config().Store.Upsert,
rt.EventBus(),
rt.Config().Multizone.Global.DDS.ZoneHealthCheck.PollInterval.Duration,
),
mux.NewDDSSyncServiceServer(
rt.AppContext(),
onGlobalToZoneSyncConnect,
onZoneToGlobalSyncConnect,
rt.DDSContext().GlobalServerFilters,
rt.Extensions(),
rt.EventBus(),
),
))
}