in pilot/pkg/bootstrap/server.go [191:374]
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
e := &model.Environment{
PushContext: model.NewPushContext(),
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
}
e.SetLedger(buildLedger(args.RegistryOptions))
ac := aggregate.NewController(aggregate.Options{
MeshHolder: e,
})
e.ServiceDiscovery = ac
s := &Server{
clusterID: getClusterID(args),
environment: e,
fileWatcher: filewatcher.NewWatcher(),
httpMux: http.NewServeMux(),
monitoringMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
workloadTrustBundle: tb.NewTrustBundle(nil),
server: server.New(),
shutdownDuration: args.ShutdownDuration,
internalStop: make(chan struct{}),
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
}
// Apply custom initialization functions.
for _, fn := range initFuncs {
fn(s)
}
// Initialize workload Trust Bundle before XDS Server
e.TrustBundle = s.workloadTrustBundle
s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases)
prometheus.EnableHandlingTimeHistogram()
// Apply the arguments to the configuration.
if err := s.initKubeClient(args); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v", err)
}
// used for both initKubeRegistry and initClusterRegistries
args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)
s.initMeshConfiguration(args, s.fileWatcher)
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
s.initMeshNetworks(args, s.fileWatcher)
s.initMeshHandlers()
s.environment.Init()
if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
return nil, err
}
// Options based on the current 'defaults' in istio.
caOpts := &caOptions{
TrustDomain: s.environment.Mesh().TrustDomain,
Namespace: args.Namespace,
ExternalCAType: ra.CaExternalType(externalCaType),
CertSignerDomain: features.CertSignerDomain,
}
if caOpts.ExternalCAType == ra.ExtCAK8s {
// Older environment variable preserved for backward compatibility
caOpts.ExternalCASigner = k8sSigner
}
// CA signing certificate must be created first if needed.
if err := s.maybeCreateCA(caOpts); err != nil {
return nil, err
}
if err := s.initControllers(args); err != nil {
return nil, err
}
s.XDSServer.InitGenerators(e, args.Namespace)
// Initialize workloadTrustBundle after CA has been initialized
if err := s.initWorkloadTrustBundle(args); err != nil {
return nil, err
}
// Parse and validate Istiod Address.
istiodHost, _, err := e.GetDiscoveryAddress()
if err != nil {
return nil, err
}
// Create Istiod certs and setup watches.
if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {
return nil, err
}
if s.kubeClient != nil {
s.metadataServer = dubbov1alpha1.NewServiceMetadataServer(s.environment, s.kubeClient)
}
// Create Service Name mapping server
if s.kubeClient != nil {
s.snpServer = dubbov1alpha1.NewSnp(s.kubeClient)
}
// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.
if err := s.initSecureDiscoveryService(args); err != nil {
return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
}
var wh *inject.Webhook
// common https server for webhooks (e.g. injection, validation)
if s.kubeClient != nil {
s.initSecureWebhookServer(args)
wh, err = s.initSidecarInjector(args)
if err != nil {
return nil, fmt.Errorf("error initializing sidecar injector: %v", err)
}
if err := s.initConfigValidation(args); err != nil {
return nil, fmt.Errorf("error initializing config validator: %v", err)
}
}
whc := func() map[string]string {
if wh != nil {
return wh.Config.RawTemplates
}
return map[string]string{}
}
// Used for readiness, monitoring and debug handlers.
if err := s.initIstiodAdminServer(args, whc); err != nil {
return nil, fmt.Errorf("error initializing debug server: %v", err)
}
// This should be called only after controllers are initialized.
s.initRegistryEventHandlers()
s.initDiscoveryService(args)
s.initSDSServer()
// Notice that the order of authenticators matters, since at runtime
// authenticators are activated sequentially and the first successful attempt
// is used as the authentication result.
authenticators := []security.Authenticator{
&authenticate.ClientCertAuthenticator{},
}
if args.JwtRule != "" {
jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain)
if err != nil {
return nil, fmt.Errorf("error initializing OIDC: %v", err)
}
if jwtAuthn == nil {
return nil, fmt.Errorf("JWT authenticator is nil")
}
authenticators = append(authenticators, jwtAuthn)
}
// The k8s JWT authenticator requires the multicluster registry to be initialized,
// so we build it later.
authenticators = append(authenticators,
kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))
if features.XDSAuth {
s.XDSServer.Authenticators = authenticators
}
caOpts.Authenticators = authenticators
// Start CA or RA server. This should be called after CA and Istiod certs have been created.
s.startCA(caOpts)
// TODO: don't run this if galley is started, one ctlz is enough
if args.CtrlZOptions != nil {
_, _ = ctrlz.Run(args.CtrlZOptions, nil)
}
// This must be last, otherwise we will not know which informers to register
if s.kubeClient != nil {
s.addStartFunc(func(stop <-chan struct{}) error {
s.kubeClient.RunAndWait(stop)
return nil
})
}
s.addReadinessProbe("discovery", func() (bool, error) {
return s.XDSServer.IsServerReady(), nil
})
return s, nil
}