pkg/authority/security/server.go (208 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package security import ( "crypto/tls" "crypto/x509" "log" "math" "net" "os" "strconv" "time" "github.com/apache/dubbo-admin/pkg/authority/election" cert2 "github.com/apache/dubbo-admin/pkg/authority/cert" "github.com/apache/dubbo-admin/pkg/authority/config" "github.com/apache/dubbo-admin/pkg/authority/k8s" "github.com/apache/dubbo-admin/pkg/authority/patch" "github.com/apache/dubbo-admin/pkg/authority/rule/authentication" "github.com/apache/dubbo-admin/pkg/authority/rule/authorization" "github.com/apache/dubbo-admin/pkg/authority/rule/connection" "github.com/apache/dubbo-admin/pkg/authority/v1alpha1" "github.com/apache/dubbo-admin/pkg/authority/webhook" "github.com/apache/dubbo-admin/pkg/logger" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/reflection" ) type Server struct { StopChan chan os.Signal Options *config.Options CertStorage cert2.Storage ConnectionStorage *connection.Storage AuthenticationHandler authentication.Handler AuthorizationHandler authorization.Handler KubeClient k8s.Client CertificateServer *v1alpha1.AuthorityServiceImpl ObserveServer *v1alpha1.RuleServiceImpl PlainServer *grpc.Server SecureServer *grpc.Server WebhookServer *webhook.Webhook JavaInjector *patch.JavaSdk Elec election.LeaderElection } func NewServer(options *config.Options) *Server { return &Server{ Options: options, StopChan: make(chan os.Signal, 1), } } func (s *Server) Init() { // TODO bypass k8s work if s.KubeClient == nil { s.KubeClient = k8s.NewClient() } if !s.KubeClient.Init(s.Options) { logger.Sugar().Warnf("Failed to connect to Kubernetes cluster. Will ignore OpenID Connect check.") s.Options.IsKubernetesConnected = false } else { s.Options.IsKubernetesConnected = true } if s.CertStorage == nil { s.CertStorage = cert2.NewStorage(s.Options) } if s.Elec == nil { s.Elec = election.NewleaderElection() } go s.CertStorage.RefreshServerCert() s.LoadRootCert() s.LoadAuthorityCert() s.PlainServer = grpc.NewServer() reflection.Register(s.PlainServer) pool := x509.NewCertPool() tlsConfig := &tls.Config{ GetCertificate: func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { for _, cert := range s.CertStorage.GetTrustedCerts() { pool.AddCert(cert.Cert) } return s.CertStorage.GetServerCert(info.ServerName), nil }, ClientCAs: pool, ClientAuth: tls.VerifyClientCertIfGiven, } s.CertStorage.GetServerCert("localhost") s.CertStorage.GetServerCert("dubbo-ca." + s.Options.Namespace + ".svc") s.CertStorage.GetServerCert("dubbo-ca." + s.Options.Namespace + ".svc.cluster.local") s.SecureServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) s.initRuleHandler() s.registerCertificateService() s.registerObserveService() reflection.Register(s.SecureServer) if s.Options.InPodEnv { s.WebhookServer = webhook.NewWebhook( func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { return s.CertStorage.GetServerCert(info.ServerName), nil }) s.WebhookServer.Init(s.Options) s.JavaInjector = patch.NewJavaSdk(s.Options, s.KubeClient) s.WebhookServer.Patches = append(s.WebhookServer.Patches, s.JavaInjector.NewPod) } } func (s *Server) registerObserveService() { ruleImpl := &v1alpha1.RuleServiceImpl{ Storage: s.ConnectionStorage, KubeClient: s.KubeClient, Options: s.Options, CertStorage: s.CertStorage, } v1alpha1.RegisterRuleServiceServer(s.SecureServer, ruleImpl) v1alpha1.RegisterRuleServiceServer(s.PlainServer, ruleImpl) } func (s *Server) initRuleHandler() { s.ConnectionStorage = connection.NewStorage() s.AuthenticationHandler = authentication.NewHandler(s.ConnectionStorage) s.AuthorizationHandler = authorization.NewHandler(s.ConnectionStorage) } func (s *Server) registerCertificateService() { impl := &v1alpha1.AuthorityServiceImpl{ Options: s.Options, CertStorage: s.CertStorage, KubeClient: s.KubeClient, } v1alpha1.RegisterAuthorityServiceServer(s.PlainServer, impl) v1alpha1.RegisterAuthorityServiceServer(s.SecureServer, impl) } func (s *Server) LoadRootCert() { // todo } func (s *Server) LoadAuthorityCert() { if s.Options.IsKubernetesConnected { certStr, priStr := s.KubeClient.GetAuthorityCert(s.Options.Namespace) if certStr != "" && priStr != "" { s.CertStorage.GetAuthorityCert().Cert = cert2.DecodeCert(certStr) s.CertStorage.GetAuthorityCert().CertPem = certStr s.CertStorage.GetAuthorityCert().PrivateKey = cert2.DecodePrivateKey(priStr) } } s.RefreshAuthorityCert() go s.ScheduleRefreshAuthorityCert() } func (s *Server) ScheduleRefreshAuthorityCert() { interval := math.Min(math.Floor(float64(s.Options.CaValidity)/100), 10_000) for { time.Sleep(time.Duration(interval) * time.Millisecond) if s.CertStorage.GetAuthorityCert().NeedRefresh() { logger.Sugar().Infof("Authority cert is invalid, refresh it.") // TODO lock if multi server // TODO refresh signed cert s.Elec.Election(s.CertStorage, s.Options, s.KubeClient.GetKubClient()) if s.Options.IsKubernetesConnected { s.KubeClient.UpdateAuthorityCert(s.CertStorage.GetAuthorityCert().CertPem, cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey), s.Options.Namespace) s.KubeClient.UpdateWebhookConfig(s.Options, s.CertStorage) if s.KubeClient.UpdateAuthorityPublicKey(s.CertStorage.GetAuthorityCert().CertPem) { logger.Sugar().Infof("Write ca to config maps success.") } else { logger.Sugar().Warnf("Write ca to config maps failed.") } } } select { case <-s.StopChan: return default: continue } } } func (s *Server) RefreshAuthorityCert() { if s.CertStorage.GetAuthorityCert().IsValid() { logger.Sugar().Infof("Load authority cert from kubernetes secrect success.") } else { logger.Sugar().Warnf("Load authority cert from kubernetes secrect failed.") s.CertStorage.SetAuthorityCert(cert2.GenerateAuthorityCert(s.CertStorage.GetRootCert(), s.Options.CaValidity)) // TODO lock if multi server if s.Options.IsKubernetesConnected { s.KubeClient.UpdateAuthorityCert(s.CertStorage.GetAuthorityCert().CertPem, cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey), s.Options.Namespace) } } if s.Options.IsKubernetesConnected { logger.Sugar().Info("Writing ca to config maps.") if s.KubeClient.UpdateAuthorityPublicKey(s.CertStorage.GetAuthorityCert().CertPem) { logger.Sugar().Info("Write ca to config maps success.") } else { logger.Sugar().Warnf("Write ca to config maps failed.") } } s.CertStorage.AddTrustedCert(s.CertStorage.GetAuthorityCert()) } func (s *Server) Start() { go func() { lis, err := net.Listen("tcp", ":"+strconv.Itoa(s.Options.PlainServerPort)) if err != nil { log.Fatal(err) } err = s.PlainServer.Serve(lis) if err != nil { log.Fatal(err) } }() go func() { lis, err := net.Listen("tcp", ":"+strconv.Itoa(s.Options.SecureServerPort)) if err != nil { log.Fatal(err) } err = s.SecureServer.Serve(lis) if err != nil { log.Fatal(err) } }() if s.Options.InPodEnv { go s.WebhookServer.Serve() s.KubeClient.UpdateWebhookConfig(s.Options, s.CertStorage) } s.KubeClient.InitController(s.AuthenticationHandler, s.AuthorizationHandler) logger.Sugar().Info("Server started.") }