istio/pkg/controllers/servicecenter/controller.go (149 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package servicecenter import ( "context" "reflect" "sync" "time" "github.com/apache/servicecomb-service-center/istio/pkg/event" "github.com/apache/servicecomb-service-center/istio/pkg/utils" "github.com/go-chassis/cari/discovery" "istio.io/pkg/log" ) type Controller struct { // servicecomb service center go-chassis API client conn *Connector // Channel used to send and receive servicecomb service center change events from the service center controller events chan []event.ChangeEvent // Cache of retrieved servicecomb service center microservices, mapped to their service ids serviceCache sync.Map } func NewController(addr string, e chan []event.ChangeEvent) *Controller { controller := &Controller{ conn: NewConnector(addr), events: e, serviceCache: sync.Map{}, } return controller } // Run until a stop signal is received func (c *Controller) Run(ctx context.Context) { // start a new go routine to watch service center update go c.watchServiceCenter(ctx) } // Stop the controller. func (c *Controller) Stop() { // Unregister app instance watcher services c.conn.AppInstanceWatcherCache.Range(func(_, value interface{}) bool { c.conn.UnregisterInstanceWatcher(value.(string)) return true }) } // Watch the service center registry for MicroService changes. func (c *Controller) watchServiceCenter(ctx context.Context) { for { select { case <-ctx.Done(): return default: // Full sync all services from service center services, err := c.conn.GetAllServices() if err != nil { log.Errorf("failed to retrieve service center services from registry: err[%v]\n", err) } // Process received services c.onServiceCenterUpdate(services) // Number of seconds to wait between syncs time.Sleep(utils.PULL_INTERVAL) } } } // Send all new services to Istio controller. func (c *Controller) onServiceCenterUpdate(services []*discovery.MicroService) { svcs := c.getChangedServices(services) if len(svcs) > 0 { c.events <- svcs } } // Send all instance events to Istio controller. func (c *Controller) onInstanceUpdate(e event.ChangeEvent) { log.Debugf("new service center instance event received from watcher: %+v\n", e) c.events <- []event.ChangeEvent{e} } // Get service center service changes, register watcher for newly created services. func (c *Controller) getChangedServices(services []*discovery.MicroService) []event.ChangeEvent { // All non-watcher service center services mapped to their ids currServices := sync.Map{} // All new non-watcher service center services newServices := []*event.MicroserviceEntry{} // IDs of current service center watcher services currAppInstanceWatcherIds := sync.Map{} // Service events that must be pushed changes := []event.ChangeEvent{} for _, s := range services { name := s.ServiceName appId := s.AppId id := s.ServiceId if name != utils.WATCHER_SVC_NAME && name != utils.SERVICECENTER_ETCD_NAME && name != utils.SERVICECENTER_MONGO_NAME { entry := &event.MicroserviceEntry{MicroService: s} if cachedEntry, ok := c.serviceCache.Load(id); !ok { if _, ok := c.conn.AppInstanceWatcherCache.Load(appId); !ok { // Register new app instance watcher service watcherId, err := c.conn.RegisterAppInstanceWatcher(utils.WATCHER_SVC_NAME, appId, c.onInstanceUpdate) if err != nil { continue } // Record the id of the watcher service for this app currAppInstanceWatcherIds.Store(appId, watcherId) } // Collect newly created service changeEvent := event.ChangeEvent{Action: discovery.EVT_CREATE, Event: entry} changes = append(changes, changeEvent) newServices = append(newServices, entry) currServices.Store(id, entry) } else { cachedEntryEvent := cachedEntry.(*event.MicroserviceEntry) if !reflect.DeepEqual(s, cachedEntryEvent.MicroService) { // Collect updated service changeEvent := event.ChangeEvent{Action: discovery.EVT_UPDATE, Event: entry} changes = append(changes, changeEvent) currServices.Store(id, entry) } else { // No change, keep cache entry currServices.Store(id, cachedEntry) } } } else if name == utils.WATCHER_SVC_NAME { if k, ok := c.conn.AppInstanceWatcherCache.Load(appId); ok { if k.(string) == id { // Watcher still exists as expected, record its current id currAppInstanceWatcherIds.Store(appId, id) } } } } // Collect deleted services c.serviceCache.Range(func(key, value interface{}) bool { if _, ok := currServices.Load(key); !ok { changes = append(changes, event.ChangeEvent{ Action: discovery.EVT_DELETE, Event: value.(*event.MicroserviceEntry), }) } return true }) // Initial sync-up for newly created services; retrieve and start watching their instances c.initNewServices(newServices) // Update service ID cache with current services c.refreshServiceCache(currServices) // Check for app instance watcher changes c.checkAppInstanceWatchers(currAppInstanceWatcherIds) return changes } // Save MicroService(s) retrieved from service center registry func (c *Controller) refreshServiceCache(services sync.Map) { c.serviceCache = services } // Detect missing watcher services in registry. If a watcher service was expected but is missing, flag it to be re-registered. func (c *Controller) checkAppInstanceWatchers(currAppInstanceWatcherIds sync.Map) { c.conn.AppInstanceWatcherCache.Range(func(appId, _ interface{}) bool { if _, ok := currAppInstanceWatcherIds.Load(appId); !ok { log.Warnf("instance watcher for appId %s is invalid, invalidating its cache entries", appId) // Watcher is missing for this app, remove all app's services from cache newServiceCache := sync.Map{} c.serviceCache.Range(func(key, value interface{}) bool { microServiceValue := value.(*event.MicroserviceEntry) if microServiceValue.MicroService.AppId != appId { newServiceCache.Store(key, value) } return true }) c.refreshServiceCache(newServiceCache) } return true }) // Cache current watcher ids (if any are missing, will be re-registered on next sync) c.conn.RefreshAppInstanceWatcherCache(currAppInstanceWatcherIds) } // Watch services, has side effect of adding instances to MicroserviceEntry(s) func (c *Controller) initNewServices(newServices []*event.MicroserviceEntry) { serviceInstanceMap := c.conn.GetServiceInstances(newServices) if serviceInstanceMap == nil { return } for _, s := range newServices { if instances, ok := serviceInstanceMap[s.MicroService.ServiceId]; ok { for _, instance := range instances { s.Instances = append(s.Instances, &event.InstanceEntry{ MicroServiceInstance: instance, }) } } } }