istio/pkg/bootstrap/server.go (109 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bootstrap
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/servicecomb-service-center/istio/pkg/controllers/istioconnector"
"github.com/apache/servicecomb-service-center/istio/pkg/controllers/servicecenter"
"github.com/apache/servicecomb-service-center/istio/pkg/event"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"istio.io/pkg/log"
)
// cli args
type Args struct {
// servicecomb-service-center address
ServiceCenterAddr string
// kubeconfig file path
Kubeconfig string
// enable leader election or not for high abalibility
HA bool
}
const (
// leader election check locked resource namespace
lockNameSpace = "istio-system"
// leader election check locked resource name
resourceName = "servicecenter2mesh"
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack.
//
// A client needs to wait a full LeaseDuration without observing a change to
// the record before it can attempt to take over. When all clients are
// shutdown and a new set of clients are started with different names against
// the same leader record, they must wait the full LeaseDuration before
// attempting to acquire the lease. Thus LeaseDuration should be as short as
// possible (within your tolerance for clock skew rate) to avoid a possible
// long waits in the scenario.
//
// Core clients default this value to 15 seconds.
defaultLeaseDuration = 15 * time.Second
// RenewDeadline is the duration that the acting master will retry
// refreshing leadership before giving up.
//
// Core clients default this value to 10 seconds.
defaultRenewDeadline = 10 * time.Second
// RetryPeriod is the duration the LeaderElector clients should wait
// between tries of actions.
//
// Core clients default this value to 2 seconds.
defaultRetryPeriod = 2 * time.Second
)
type Server struct {
// service center controller watches service center update, and push to istio controller
serviceCenterController *servicecenter.Controller
// istio controller receives updates from service center controller and push to k8s api server
istioController *istioconnector.Controller
// channel for passing service center event from service center controller to istio controller
serviceCenterEvent chan []event.ChangeEvent
}
func NewServer(args *Args) (*Server, error) {
// only allow 1 eventlist at a time
changeEvent := make(chan []event.ChangeEvent, 1)
// Create a new istio controller, the controller is ready to push configs to istio
istioController, err := istioconnector.NewController(args.Kubeconfig, changeEvent)
if err != nil {
return nil, err
}
serviceCenterController := servicecenter.NewController(args.ServiceCenterAddr, changeEvent)
s := &Server{
serviceCenterController: serviceCenterController,
istioController: istioController,
serviceCenterEvent: changeEvent,
}
return s, nil
}
// start the server need to start both service center and istio controller
func (s *Server) Start(ctx context.Context, args *Args) error {
// by default the leader election is disabled, just do regular start
if !args.HA {
s.doRun(ctx)
return nil
}
return s.doLeaderElectionRun(ctx)
}
// This function is used to enable leader election using k8s client-go api. leaderElectAndRun runs the leader election,
// and runs the callbacks once the leader lease is acquired.
//
// For k8s clint-go API:
// DISCLAIMER: this is an alpha API. This library will likely change significantly or even be removed entirely in subsequent releases.
// Depend on this API at your own risk.
//
// Note: this API is also used by K8S controller and Cluster auto scaler.
func (s *Server) doLeaderElectionRun(ctx context.Context) error {
id, err := os.Hostname()
if err != nil {
return err
}
// creates the in-cluster config
kubeConf, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("build default in cluster kube config failed: %w", err)
}
client, err := kubernetes.NewForConfig(kubeConf)
if err != nil {
log.Fatalf("build kube client failed: %v", err)
return err
}
rl, err := resourcelock.New(resourcelock.LeasesResourceLock,
lockNameSpace,
resourceName,
client.CoreV1(),
client.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
})
if err != nil {
log.Fatalf("error creating lock: %v", err)
return err
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
ReleaseOnCancel: true,
LeaseDuration: defaultLeaseDuration,
RenewDeadline: defaultRenewDeadline,
RetryPeriod: defaultRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
s.doRun(ctx)
},
OnStoppedLeading: func() {
log.Infof("%s: stopped leading", id)
},
},
})
return nil
}
func (s *Server) doRun(ctx context.Context) {
go s.serviceCenterController.Run(ctx)
go s.istioController.Run(ctx)
log.Info("servicecenter2mesh Server Started !!!")
s.waitForShutdown(ctx)
}
// on server stop
func (s *Server) waitForShutdown(ctx context.Context) {
go func() {
<-ctx.Done()
s.serviceCenterController.Stop()
close(s.serviceCenterEvent)
}()
}