istio/pkg/controllers/istioconnector/controller.go (258 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package istioconnector
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/servicecomb-service-center/istio/pkg/event"
"github.com/apache/servicecomb-service-center/istio/pkg/utils"
"github.com/go-chassis/cari/discovery"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"istio.io/client-go/pkg/clientset/versioned"
"istio.io/pkg/log"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
k8s "sigs.k8s.io/controller-runtime/pkg/client/config"
)
// Controller receives service center updates and pushes converted Istio ServiceEntry(s) to k8s api server
type Controller struct {
// Istio istioClient for k8s API
istioClient *versioned.Clientset
// Channel used to send and receive service center change events from the service center controller
events chan []event.ChangeEvent
// Cache of converted service entries, mapped to original service center service id
convertedServiceCache sync.Map
}
func NewController(kubeconfigPath string, e chan []event.ChangeEvent) (*Controller, error) {
controller := &Controller{
events: e,
convertedServiceCache: sync.Map{},
}
// get kubernetes config info, used for creating k8s client
client, err := newKubeClient(kubeconfigPath)
if err != nil {
log.Errorf("failed to create istio client: %v\n", err)
return nil, err
}
controller.istioClient = client
return controller, nil
}
// Return a debounced version of a function `fn` that will not run until `wait` seconds have passed
// after it was last called or until `maxWait` seconds have passed since its first call.
// Once `fn` is executed, the max wait timer is reset.
func debounce(fn func(), wait time.Duration, maxWait time.Duration) func() {
// Main timer, time seconds elapsed since last execution
var timer *time.Timer
// Max wait timer, time seconds elapsed since first call
var maxTimer *time.Timer
return func() {
if maxTimer == nil {
// First debounced event, start max wait timer
// will only run target func if not called again after `maxWait` duration
maxTimer = time.AfterFunc(maxWait, func() {
// Reset all timers when max wait time is reached
log.Debugf("debounce: maximum wait time reached, running target fn\n")
if timer.Stop() {
// Only run target func if main timer hasn't already
fn()
}
timer = nil
maxTimer = nil
})
log.Debugf("debounce: max timer started, will wait max time of %s\n", maxWait)
}
if timer != nil {
// Timer already started; function was called within `wait` duration, debounce this event by resetting timer
timer.Stop()
}
// Start timer, will only run target func if not called again after `wait` duration
timer = time.AfterFunc(wait, func() {
log.Debugf("debounce: timer completed, running target fn\n")
// Reset all timers and run target func when wait time is reached
fn()
maxTimer.Stop()
maxTimer = nil
timer = nil
})
log.Debugf("debounce: timer started, will wait %s\n", wait)
}
}
// Run until a signal is received, this function won't block
func (c *Controller) Run(ctx context.Context) {
go c.watchServiceCenterUpdate(ctx)
}
// Return a debounced version of the push2istio method that merges the passed events on each call.
func (c *Controller) getIstioPushDebouncer(wait time.Duration, maxWait time.Duration, maxEvents int) func([]event.ChangeEvent) {
var eventQueue []event.ChangeEvent // Queue of events merged from arguments of each call to debounced function
// Make a debounced version of push2istio, with provided wait and maxWait times
debouncedFn := debounce(func() {
log.Debugf("debounce: push callback fired, pushing events to Istio: %v\n", eventQueue)
// Timeout reached, push events to istio and reset queue
c.push2Istio(eventQueue)
eventQueue = nil
}, wait, maxWait)
return func(newEvents []event.ChangeEvent) {
log.Debugf("debounce: received and merged %d new events\n", len(newEvents))
// Merge new events with existing event queue for each received call
eventQueue = append(eventQueue, newEvents...)
log.Debugf("debounce: new total number of events in queue is %d\n", len(eventQueue))
if len(eventQueue) > maxEvents {
c.push2Istio(eventQueue)
eventQueue = nil
} else {
// Make call to debounced push2istio
debouncedFn()
}
}
}
// Watch the Service Center controller for service and instance change events.
func (c *Controller) watchServiceCenterUpdate(ctx context.Context) {
// Make a debounced push2istio function
debouncedPush := c.getIstioPushDebouncer(utils.PUSH_DEBOUNCE_INTERVAL, utils.PUSH_DEBOUNCE_MAX_INTERVAL, utils.PUSH_DEBOUNCE_MAX_EVENTS)
for {
select {
case <-ctx.Done():
return
case events := <-c.events:
// Received service center change event, use debounced push to Istio.
// Debouncing introduces latency between the time when change events are received from service center, and when they are pushed to Istio.
// Debounce latency will take on the range [PUSH_DEBOUNCE_INTERVAL, PUSH_DEBOUNCE_MAX_INTERVAL] in seconds.
debouncedPush(events)
}
}
}
// Push the received service center service/instance change events to Istio.
func (c *Controller) push2Istio(events []event.ChangeEvent) {
cachedServiceEntries := deepCopyCache(c.convertedServiceCache) // Get cached serviceentries
for _, e := range events {
switch ev := e.Event.(type) {
case *event.MicroserviceEntry:
// service center service-level change events
err := c.pushServiceEvent(e.Event.(*event.MicroserviceEntry), e.Action, cachedServiceEntries)
if err != nil {
log.Errorf("failed to push a service center service event to Istio, err[%v]\n", err)
}
case *event.InstanceEntry:
// service center instance-level change events
err := c.pushEndpointEvents(e.Event.(*event.InstanceEntry), e.Action, cachedServiceEntries)
if err != nil {
log.Errorf("failed to push a service center instance event to Istio, err[%v]\n", err)
}
default:
log.Errorf("failed to push service center event, event type %T is invalid\n", ev)
}
}
// Save updates to ServiceEntry cache
c.refreshCache(cachedServiceEntries)
}
// Convert and push service center service-level change events to Istio.
func (c *Controller) pushServiceEvent(e *event.MicroserviceEntry, action discovery.EventType, svcCache sync.Map) error {
serviceId := e.MicroService.ServiceId
var se *event.ServiceEntry
// Convert the service center MicroService to an Istio ServiceEntry
if res := e.Convert(); res == nil {
return fmt.Errorf("failed to convert service center Service event to Istio ServiceEntry")
} else {
se = res
}
name := se.ServiceEntry.GetName()
log.Debugf("syncing %s SERVICE event for service center service id %s...\n", string(action), serviceId)
switch action {
case discovery.EVT_CREATE:
// CREATE still requires check to determine whether the service already exists; UPDATE is used in this case.
// e.g. ServiceEntry fell out of local cache due to controller restart, but in fact already exists in Istio registry.
fallthrough
case discovery.EVT_UPDATE:
existingSe, err := c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Get(context.TODO(), name, v1.GetOptions{})
var returnedSe *v1alpha3.ServiceEntry
if err != nil {
returnedSe, err = c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Create(context.TODO(), se.ServiceEntry, v1.CreateOptions{})
if err != nil {
return err
}
} else {
se.ServiceEntry.Spec.Endpoints = existingSe.Spec.Endpoints // Restore endpoints, only the service itself is being updated
se.ServiceEntry.Spec.Ports = existingSe.Spec.Ports
returnedSe, err = c.pushServiceEntryUpdate(existingSe, se.ServiceEntry)
if err != nil {
return err
}
}
svcCache.Store(serviceId, returnedSe)
case discovery.EVT_DELETE:
err := c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Delete(context.TODO(), name, v1.DeleteOptions{})
if err != nil {
return err
}
svcCache.Delete(serviceId)
}
log.Infof("synced %s SERVICE event to Istio\n", string(action))
return nil
}
// Push an update for an existing ServiceEntry to Istio.
func (c *Controller) pushServiceEntryUpdate(oldServiceEntry, newServiceEntry *v1alpha3.ServiceEntry) (*v1alpha3.ServiceEntry, error) {
newServiceEntry.SetResourceVersion(oldServiceEntry.GetResourceVersion())
returnedSe, err := c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Update(context.TODO(), newServiceEntry, v1.UpdateOptions{})
if err != nil {
return nil, err
}
return returnedSe, nil
}
// Convert and push service center instance-level change events to Istio.
func (c *Controller) pushEndpointEvents(e *event.InstanceEntry, action discovery.EventType, svcCache sync.Map) error {
serviceId := e.ServiceId
log.Debugf("syncing %s INSTANCE event for instance %s of service center service %s...\n", string(action), e.InstanceId, serviceId)
value, ok := svcCache.Load(serviceId)
if !ok {
return fmt.Errorf("serviceEntry for service center Service with id %s was not found", e.ServiceId)
}
se := value.(*v1alpha3.ServiceEntry)
newSe := se.DeepCopy()
// Apply changes to the ServiceEntry's endpoints
err := updateIstioServiceEndpoints(newSe, action, e)
if err != nil {
return err
}
// Pushed updated ServiceEntry to Istio
updatedSe, err := c.pushServiceEntryUpdate(se, newSe)
if err != nil {
return err
}
log.Infof("pushed %s INSTANCE event to Istio\n", string(action))
svcCache.Store(serviceId, updatedSe)
return nil
}
// Apply an update event to a ServiceEntry's endpoint(s).
func updateIstioServiceEndpoints(se *v1alpha3.ServiceEntry, action discovery.EventType, targetInst *event.InstanceEntry) error {
targetInstanceId := targetInst.InstanceId
newInsts := []*event.InstanceEntry{}
var seAsMSE *event.MicroserviceEntry
// Convert ServiceEntry back to service center service to apply changes to its service center instances
if res := event.NewServiceEntry(se).Convert(); res == nil {
return fmt.Errorf("failed to parse existing Istio ServiceEntry")
} else {
seAsMSE = res
}
switch discovery.EventType(action) {
case discovery.EVT_DELETE:
// Filter out the deleted instance
for _, existingInst := range seAsMSE.Instances {
if existingInst.InstanceId != targetInstanceId {
newInsts = append(newInsts, existingInst)
}
}
if len(seAsMSE.Instances) == len(newInsts) {
log.Warnf("could not push delete for target Service Center instance id %s, instance was not found\n", targetInstanceId)
}
seAsMSE.Instances = newInsts
case discovery.EVT_CREATE:
// CREATE still requires check to determine whether the endpoint already exists; UPDATE is used in this case.
fallthrough
case discovery.EVT_UPDATE:
updated := false
for i, existingInst := range seAsMSE.Instances {
if existingInst.InstanceId == targetInstanceId {
// Found existing instance, update with new instance
seAsMSE.Instances[i] = targetInst
updated = true
break
}
}
if !updated {
// Instance does not already exist, add as new instance
seAsMSE.Instances = append(seAsMSE.Instances, targetInst)
}
}
// Convert the microservice entry back to istio service entry; the serviceports for the changed endpoints will be regenerated appropriately by conversion logic
var regenedSe *v1alpha3.ServiceEntry
if res := seAsMSE.Convert(); res == nil {
return fmt.Errorf("failed to parse changes for Istio ServiceEntry")
} else {
regenedSe = res.ServiceEntry
}
// Only take regened ports and new workloadentries, preserves rest of original serviceentry
se.Spec.Endpoints = regenedSe.Spec.Endpoints
se.Spec.Ports = regenedSe.Spec.Ports
return nil
}
// Save Istio ServiceEntry(s) converted from service center updates.
func (c *Controller) refreshCache(serviceEntries sync.Map) {
c.convertedServiceCache = serviceEntries
}
// Get a deep copy of the converted Istio ServiceEntry(s) pushed from service center.
func deepCopyCache(m sync.Map) sync.Map {
newMap := sync.Map{}
m.Range(func(key, value interface{}) bool {
sn := value.(*v1alpha3.ServiceEntry)
newMap.Store(key, sn.DeepCopy())
return true
})
return newMap
}
// newKubeClient creates new kube client
func newKubeClient(kubeconfigPath string) (*versioned.Clientset, error) {
var err error
var kubeConf *rest.Config
if kubeconfigPath == "" {
// creates the in-cluster config
kubeConf, err = k8s.GetConfig()
if err != nil {
return nil, fmt.Errorf("build default in cluster kube config failed: %w", err)
}
} else {
kubeConf, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("build kube client config from config file failed: %w", err)
}
}
return versioned.NewForConfig(kubeConf)
}