pkg/core/bootstrap/bootstrap.go (397 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package bootstrap import ( "context" "fmt" "net/http" "net/url" "sync" ) import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/config_center" "github.com/pkg/errors" kube_ctrl "sigs.k8s.io/controller-runtime" ) import ( dubbo_cp "github.com/apache/dubbo-kubernetes/pkg/config/app/dubbo-cp" config_core "github.com/apache/dubbo-kubernetes/pkg/config/core" "github.com/apache/dubbo-kubernetes/pkg/config/core/resources/store" "github.com/apache/dubbo-kubernetes/pkg/core" config_manager "github.com/apache/dubbo-kubernetes/pkg/core/config/manager" "github.com/apache/dubbo-kubernetes/pkg/core/datasource" "github.com/apache/dubbo-kubernetes/pkg/core/extensions" "github.com/apache/dubbo-kubernetes/pkg/core/governance" "github.com/apache/dubbo-kubernetes/pkg/core/logger" "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/condition_route" dataplane_managers "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/dataplane" "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/dynamic_config" mapping_managers "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/mapping" mesh_managers "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/mesh" metadata_managers "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/metadata" "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/tag_route" "github.com/apache/dubbo-kubernetes/pkg/core/managers/apis/zone" core_plugins "github.com/apache/dubbo-kubernetes/pkg/core/plugins" dubbo_registry "github.com/apache/dubbo-kubernetes/pkg/core/registry" "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh" "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system" core_manager "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager" "github.com/apache/dubbo-kubernetes/pkg/core/resources/registry" core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store" core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime" "github.com/apache/dubbo-kubernetes/pkg/core/runtime/component" dds_context "github.com/apache/dubbo-kubernetes/pkg/dds/context" "github.com/apache/dubbo-kubernetes/pkg/dp-server/server" "github.com/apache/dubbo-kubernetes/pkg/events" k8s_extensions "github.com/apache/dubbo-kubernetes/pkg/plugins/extensions/k8s" mesh_cache "github.com/apache/dubbo-kubernetes/pkg/xds/cache/mesh" xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context" xds_server "github.com/apache/dubbo-kubernetes/pkg/xds/server" ) var log = core.Log.WithName("bootstrap") func buildRuntime(appCtx context.Context, cfg dubbo_cp.Config) (core_runtime.Runtime, error) { if err := autoconfigure(&cfg); err != nil { return nil, err } builder, err := core_runtime.BuilderFor(appCtx, cfg) if err != nil { return nil, err } for _, plugin := range core_plugins.Plugins().BootstrapPlugins() { if err := plugin.BeforeBootstrap(builder, cfg); err != nil { return nil, errors.Wrapf(err, "failed to run beforeBootstrap plugin:'%s'", plugin.Name()) } } // 定义store的状态 if cfg.DeployMode == config_core.UniversalMode || cfg.DeployMode == config_core.HalfHostMode { cfg.Store.Type = store.Traditional builder.WithAppRegCtx(dubbo_registry.NewApplicationContext()) builder.WithInfRegCtx(dubbo_registry.NewInterfaceContext()) } else { cfg.Store.Type = store.KubernetesStore } // 初始化cache builder.WithDataplaneCache(&sync.Map{}) // 初始化传统微服务体系所需要的组件 if err := initializeTraditional(cfg, builder); err != nil { return nil, err } if err := initializeResourceStore(cfg, builder); err != nil { return nil, err } // 隐蔽了configStore, 后期再补全 builder.WithResourceValidators(core_runtime.ResourceValidators{}) if err := initializeResourceManager(cfg, builder); err != nil { //nolint:contextcheck return nil, err } builder.WithDataSourceLoader(datasource.NewDataSourceLoader(builder.ReadOnlyResourceManager())) leaderInfoComponent := &component.LeaderInfoComponent{} builder.WithLeaderInfo(leaderInfoComponent) builder.WithDpServer(server.NewDpServer(*cfg.DpServer, func(writer http.ResponseWriter, request *http.Request) bool { return true })) resourceManager := builder.ResourceManager() ddsContext := dds_context.DefaultContext(appCtx, resourceManager, cfg) builder.WithDDSContext(ddsContext) if err := initializeMeshCache(builder); err != nil { return nil, err } for _, plugin := range core_plugins.Plugins().BootstrapPlugins() { if err := plugin.AfterBootstrap(builder, cfg); err != nil { return nil, errors.Wrapf(err, "failed to run afterBootstrap plugin:'%s'", plugin.Name()) } } rt, err := builder.Build() if err != nil { return nil, err } if err := rt.Add(leaderInfoComponent); err != nil { return nil, err } for name, plugin := range core_plugins.Plugins().RuntimePlugins() { if err := plugin.Customize(rt); err != nil { return nil, errors.Wrapf(err, "failed to configure runtime plugin:'%s'", name) } } return rt, nil } func Bootstrap(appCtx context.Context, cfg dubbo_cp.Config) (core_runtime.Runtime, error) { runtime, err := buildRuntime(appCtx, cfg) if err != nil { return nil, err } return runtime, nil } func initializeTraditional(cfg dubbo_cp.Config, builder *core_runtime.Builder) error { // 如果是k8s环境模式直接返回, 这里针对传统的微服务体系(包括纯vm和半托管) if cfg.DeployMode == config_core.KubernetesMode { return nil } configCenterAddress := cfg.Store.Traditional.ConfigCenter.Address registryAddress := cfg.Store.Traditional.Registry.Address metadataReportAddress := cfg.Store.Traditional.MetadataReport.Address c, addrUrl := getValidConfigCenterConfig(configCenterAddress, registryAddress) configCenter := newConfigCenter(c, addrUrl) if len(registryAddress) > 0 { logger.Infof("Valid registry address is %s", registryAddress) c := newAddressConfig(registryAddress) addrUrl, err := c.ToURL() if err != nil { panic(err) } fac := extensions.GetRegClientFactory(addrUrl.Protocol) if fac != nil { regClient := fac.CreateRegClient(addrUrl) builder.WithRegClient(regClient) } else { logger.Sugar().Infof("Metadata of type %v not registered.", addrUrl.Protocol) } registryCenter, err := extension.GetRegistry(c.GetProtocol(), addrUrl) if err != nil { return err } builder.WithGovernanceConfig(governance.NewGovernanceConfig(configCenter, registryCenter, c.GetProtocol())) builder.WithRegistryCenter(registryCenter) delegate, err := extension.GetRegistry(addrUrl.Protocol, addrUrl) if err != nil { logger.Error("Error initialize registry instance.") return err } sdUrl := addrUrl.Clone() sdUrl.AddParam("registry", addrUrl.Protocol) sdUrl.Protocol = "service-discovery" sdDelegate, err := extension.GetServiceDiscovery(sdUrl) if err != nil { logger.Error("Error initialize service discovery instance.") return err } builder.WithServiceDiscovery(sdDelegate) adminRegistry := dubbo_registry.NewRegistry(delegate, sdDelegate, builder.AppRegCtx(), builder.InfRegCtx(), configCenter) builder.WithAdminRegistry(adminRegistry) } if len(metadataReportAddress) > 0 { logger.Infof("Valid meta center address is %s", metadataReportAddress) c := newAddressConfig(metadataReportAddress) addrUrl, err := c.ToURL() if err != nil { panic(err) } factory := extension.GetMetadataReportFactory(c.GetProtocol()) metadataReport := factory.CreateMetadataReport(addrUrl) builder.WithMetadataReport(metadataReport) dubbo_registry.AddMetadataReport(metadataReport, addrUrl.Address()) } return nil } func getValidConfigCenterConfig(address string, registryAddress string) (store.AddressConfig, *common.URL) { if len(address) <= 0 && len(registryAddress) <= 0 { panic("Must at least specify `admin.configCenter.address` or `admin.registry.address`!") } var c store.AddressConfig if len(address) > 0 { logger.Infof("Specified config center address is %s", address) c = newAddressConfig(address) } else { logger.Info("Using registry address as default config center address") c = newAddressConfig(registryAddress) } configUrl, err := c.ToURL() if err != nil { panic(err) } return c, configUrl } func newAddressConfig(address string) store.AddressConfig { cfg := store.AddressConfig{} cfg.Address = address var err error cfg.Url, err = url.Parse(address) if err != nil { panic(err) } return cfg } func newConfigCenter(c store.AddressConfig, url *common.URL) config_center.DynamicConfiguration { factory, err := extension.GetConfigCenterFactory(c.GetProtocol()) if err != nil { logger.Info(err.Error()) panic(err) } configCenter, err := factory.GetDynamicConfiguration(url) if err != nil { logger.Info("Failed to init config center, error msg is %s.", err.Error()) panic(err) } return configCenter } func initializeResourceStore(cfg dubbo_cp.Config, builder *core_runtime.Builder) error { var pluginName core_plugins.PluginName var pluginConfig core_plugins.PluginConfig switch cfg.Store.Type { case store.KubernetesStore: pluginName = core_plugins.Kubernetes pluginConfig = nil case store.Traditional: pluginName = core_plugins.Traditional pluginConfig = nil case store.MemoryStore: pluginName = core_plugins.Memory pluginConfig = nil default: return errors.Errorf("unknown store type %s", cfg.Store.Type) } plugin, err := core_plugins.Plugins().ResourceStore(pluginName) if err != nil { return errors.Wrapf(err, "could not retrieve store %s plugin", pluginName) } rs, transactions, err := plugin.NewResourceStore(builder, pluginConfig) if err != nil { return err } builder.WithResourceStore(core_store.NewCustomizableResourceStore(rs)) eventBus, err := events.NewEventBus(cfg.EventBus.BufferSize) if err != nil { return err } if err := plugin.EventListener(builder, eventBus); err != nil { return err } builder.WithEventBus(eventBus) paginationStore := core_store.NewPaginationStore(rs) builder.WithResourceStore(core_store.NewCustomizableResourceStore(paginationStore)) builder.WithTransactions(transactions) return nil } func initializeConfigStore(cfg dubbo_cp.Config, builder *core_runtime.Builder) error { var pluginName core_plugins.PluginName var pluginConfig core_plugins.PluginConfig switch cfg.Store.Type { case store.KubernetesStore: pluginName = core_plugins.Kubernetes case store.MemoryStore: pluginName = core_plugins.Universal case store.Traditional: pluginName = core_plugins.Universal default: return errors.Errorf("unknown store type %s", cfg.Store.Type) } plugin, err := core_plugins.Plugins().ConfigStore(pluginName) if err != nil { return errors.Wrapf(err, "could not retrieve secret store %s plugin", pluginName) } if cs, err := plugin.NewConfigStore(builder, pluginConfig); err != nil { return err } else { builder.WithConfigStore(cs) return nil } } func initializeResourceManager(cfg dubbo_cp.Config, builder *core_runtime.Builder) error { defaultManager := core_manager.NewResourceManager(builder.ResourceStore()) customizableManager := core_manager.NewCustomizableResourceManager(defaultManager, nil) var ( manager kube_ctrl.Manager ok bool ) deployMode := builder.GetDeployMode() if deployMode != config_core.UniversalMode { manager, ok = k8s_extensions.FromManagerContext(builder.Extensions()) if !ok { return errors.New("get kube manager err") } } dataplaneManager, err := dataplane_managers.NewDataplaneManager( builder.ResourceStore(), cfg.Multizone.Zone.Name, manager, deployMode, ) if err != nil { return fmt.Errorf("initializing datalane manager error: %w", err) } customizableManager.Customize( mesh.DataplaneType, dataplaneManager, ) customizableManager.Customize( mesh.MappingType, mapping_managers.NewMappingManager( builder.ResourceStore(), manager, deployMode, )) customizableManager.Customize( mesh.MetaDataType, metadata_managers.NewMetadataManager( builder.ResourceStore(), manager, deployMode, )) customizableManager.Customize( mesh.ConditionRouteType, condition_route.NewConditionRouteManager( builder.ResourceStore(), manager, deployMode, )) customizableManager.Customize( mesh.TagRouteType, tag_route.NewTagRouteManager( builder.ResourceStore(), manager, deployMode, )) customizableManager.Customize( mesh.DynamicConfigType, dynamic_config.NewDynamicConfigManager( builder.ResourceStore(), manager, deployMode, )) customizableManager.Customize( mesh.MeshType, mesh_managers.NewMeshManager( builder.ResourceStore(), customizableManager, registry.Global(), builder.ResourceValidators().Mesh, builder.Extensions(), cfg, manager, deployMode, ), ) customizableManager.Customize( system.ZoneType, zone.NewZoneManager(builder.ResourceStore(), zone.Validator{Store: builder.ResourceStore()}, builder.Config().Store.UnsafeDelete, )) builder.WithResourceManager(customizableManager) if builder.Config().Store.Cache.Enabled { cachedManager, err := core_manager.NewCachedManager( customizableManager, builder.Config().Store.Cache.ExpirationTime.Duration, ) if err != nil { return err } builder.WithReadOnlyResourceManager(cachedManager) } else { builder.WithReadOnlyResourceManager(customizableManager) } return nil } func initializeConfigManager(builder *core_runtime.Builder) { builder.WithConfigManager(config_manager.NewConfigManager(builder.ConfigStore())) } func initializeMeshCache(builder *core_runtime.Builder) error { meshContextBuilder := xds_context.NewMeshContextBuilder( builder.ReadOnlyResourceManager(), xds_server.MeshResourceTypes(), builder.LookupIP(), builder.Config().Multizone.Zone.Name) meshSnapshotCache, err := mesh_cache.NewCache( builder.Config().Store.Cache.ExpirationTime.Duration, meshContextBuilder) if err != nil { return err } builder.WithMeshCache(meshSnapshotCache) return nil }