istio/pkg/controllers/servicecenter/connector.go (139 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package servicecenter
import (
"sync"
"time"
"github.com/apache/servicecomb-service-center/istio/pkg/event"
"github.com/go-chassis/cari/discovery"
"github.com/go-chassis/sc-client"
"istio.io/pkg/log"
)
const (
// Time in seconds to wait before re-registering a deleted Servicecomb Service Center Watcher service
REREGISTER_INTERVAL time.Duration = time.Second * 5
)
// Servicecomb Service Center go-chassis client
type Connector struct {
client *sc.Client
AppInstanceWatcherCache sync.Map // Maps appId to id of a instance watcher service. Need app-specific watchers to avoid cross-app errors.
}
func NewConnector(addr string) *Connector {
registryClient, err := sc.NewClient(
sc.Options{
Endpoints: []string{addr},
})
if err != nil {
log.Errorf("failed to create service center client, err[%v]\n", err)
}
return &Connector{
client: registryClient,
AppInstanceWatcherCache: sync.Map{},
}
}
// Check whether a service center MicroService exists in the registry.
func (c *Connector) GetServiceExistence(microServiceId string) bool {
s, _ := c.client.GetMicroService(microServiceId, sc.WithGlobal())
return s != nil
}
// Retrieve all service center MicroServices, without their instances, from the registry.
func (c *Connector) GetAllServices() ([]*discovery.MicroService, error) {
microservices, err := c.client.GetAllMicroServices(sc.WithGlobal())
if err != nil {
return nil, err
}
return microservices, err
}
// Register a new service center Watcher service that watches instance-level change events for all service center services sharing a specific appId.
func (c *Connector) RegisterAppInstanceWatcher(name string, appId string, callback func(event event.ChangeEvent)) (string, error) {
watcherService := &discovery.MicroService{
AppId: appId,
ServiceName: name,
Environment: "",
Version: "0.0.1",
}
prevId, err := c.client.GetMicroServiceID(watcherService.AppId, watcherService.ServiceName, "0.0.1", watcherService.Environment, sc.WithGlobal())
if err != nil {
log.Errorf("failed to get microservice id from service center registry with service name %s, err[%v]\n", name, err)
return "", err
}
if prevId != "" {
// Need to reregister existing watcher for this app to reestablish the websocket connection
log.Warnf("instance watcher already exists in service center registry with id %s, attempting to unregister...\n", prevId)
err := c.UnregisterInstanceWatcher(prevId)
if err != nil {
log.Errorf("failed to unregister exising instance watcher in service center registry with id %s, err[%v]\n", prevId, err)
return "", err
}
log.Infof("successfully unregistered instance watcher, sleeping for %s before re-registering...\n", REREGISTER_INTERVAL)
// Sleep allows time for service center to unregister watcher consumer/producer relationships
// Re-registering too early will cause race conditions
time.Sleep(REREGISTER_INTERVAL)
}
id, err := c.client.RegisterService(watcherService)
if err != nil {
log.Errorf("failed to register instance watcher in service center registry with service name %s, err[%v]\n", name, err)
return "", err
}
err = c.client.WatchMicroService(id, func(e *sc.MicroServiceInstanceChangedEvent) {
callback(event.ChangeEvent{Action: discovery.EventType(e.Action), Event: &event.InstanceEntry{MicroServiceInstance: e.Instance}})
})
if err != nil {
log.Errorf("failed to watch service center instances using watcher service %s, err[%v]\n", name, err)
}
// Cache the id of the app instance watcher service
c.AppInstanceWatcherCache.Store(appId, id)
log.Debugf("registered instance watcher with service name %s and id %s for appId %s\n", name, id, appId)
return id, nil
}
// Unregister a service center Watcher service.
func (c *Connector) UnregisterInstanceWatcher(serviceId string) error {
if !c.GetServiceExistence(serviceId) {
log.Debug("instance watcher no longer exists in registry, skipping unregister...")
return nil
}
if serviceId != "" {
ok, err := c.client.UnregisterMicroService(serviceId)
if !ok {
log.Warnf("failed to unregister instance watcher in service center registry with service name %s, err[%v]\n", serviceId, err)
} else {
log.Debug("instance watcher successfully unregistered")
}
return err
} else {
return nil
}
}
// GetServiceInstances fetch newly received service instances.
func (c *Connector) GetServiceInstances(entries []*event.MicroserviceEntry) map[string][]*discovery.MicroServiceInstance {
if len(entries) == 0 {
return nil
}
appServiceKeys := make(map[string][]*discovery.FindService, len(entries))
for _, e := range entries {
s := e.MicroService
appId := s.AppId
key := &discovery.MicroServiceKey{
ServiceName: s.ServiceName,
AppId: appId,
Environment: s.Environment,
Version: s.Version,
Alias: s.Alias,
}
if _, ok := c.AppInstanceWatcherCache.Load(appId); !ok {
log.Errorf("failed to watch new microservices for appId %s, watcher service failed to start\n", appId)
continue
}
appServiceKeys[appId] = append(appServiceKeys[appId], &discovery.FindService{Service: key})
}
serviceInstanceMap := map[string][]*discovery.MicroServiceInstance{}
for appId, keys := range appServiceKeys {
if ks, ok := c.AppInstanceWatcherCache.Load(appId); ok {
// Initial instance sync of services with same appId, will use app instance watcher for future instance updates
res, err := c.client.BatchFindInstances(ks.(string), keys, sc.WithGlobal(), sc.WithoutRevision())
if err != nil {
log.Errorf("failed to watch new microservices, unable to get instances, err[%v]\n", err)
}
for _, r := range res.Services.Updated {
e := entries[r.Index]
serviceInstanceMap[e.MicroService.ServiceId] = r.Instances
}
log.Infof("Started watching instances of services with appId %s\n", appId)
}
}
for _, e := range entries {
if _, ok := serviceInstanceMap[e.MicroService.ServiceId]; !ok {
log.Errorf("failed to watch instances of service %s with id %s\n", e.MicroService.ServiceName, e.MicroService.ServiceId)
}
}
return serviceInstanceMap
}
// Save the ids of currently active service center Watcher services mapped to the appIds that they are responsible for.
func (c *Connector) RefreshAppInstanceWatcherCache(appWatcherIds sync.Map) {
c.AppInstanceWatcherCache = appWatcherIds
}