meshcon/meshconnectord/snigate.go (119 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package meshconnectord import ( "context" "log" "net" "strings" "time" "github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/hbone" "github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/k8s" "github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/mesh" "github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/sts" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/client-go/kubernetes" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type MeshConnector struct { SNIListener net.Listener HBone *hbone.HBone Mesh *mesh.KRun Namespace string ConfigMapName string CAPool string CASRoots string // Primary client is the k8s client to use. If not set will be created based on // the config. Client *kubernetes.Clientset stop chan struct{} Services map[string]*corev1.Service EP map[string]*discoveryv1.EndpointSlice } func New(kr *mesh.KRun) *MeshConnector { return &MeshConnector{ Mesh: kr, Namespace: "istio-system", ConfigMapName: "mesh-env", EP: map[string]*discoveryv1.EndpointSlice{}, Services: map[string]*corev1.Service{}, stop: make(chan struct{}), } } // InitSNIGate will start the mesh gateway, with a special SNI router port. // The h2rPort is experimental, for dev/debug, for users running/debugging apps locally. func (sg *MeshConnector) InitSNIGate(ctx context.Context, sniPort string, h2rPort string) error { kr := sg.Mesh sg.Client = k8s.K8SClient(kr) // Locate a k8s cluster, load configs from env and from existing mesh-env. // This will load the existing mesh-env, if it exists. err := kr.LoadConfig(ctx) if err != nil { log.Println("Failed to load config", "err", err) return err } sg.InitMeshEnv(ctx) sg.InitMeshEnvGCP(ctx) // Default the XDSAddr for the envoy we start to the service created by the hgate install. // istiod.istio-system may not be created if 'revision install' is used. // This is only used if we operate in 'proxyless' mode in GCP, or as a sidecar to istiod if kr.XDSAddr == "" && (kr.MeshTenant == "" || kr.MeshTenant == "-") { // Explicitly set XDSAddr, the gate should run in the same cluster // with istiod (to forward to istiod), but will use the local in-cluster address. kr.XDSAddr = "hgate-istiod.istio-system.svc:15012" log.Println("MCP not detected, using hgate-istiod service", kr.MeshTenant) } if kr.MeshConnectorAddr == "" { // We'll need to wait for it - is used when updating the config kr.MeshConnectorAddr, err = sg.WaitService(ctx, "hgate") if err != nil { return err } } if kr.MeshConnectorInternalAddr == "" { kr.MeshConnectorInternalAddr, err = sg.WaitService(ctx, "internal-hgate") if err != nil { return err } } sg.NewWatcher() if kr.Gateway == "" { kr.Gateway = "hgate" } err = kr.StartIstioAgent() if err != nil { log.Fatal("Failed to start istio agent and envoy", err) } h2r := hbone.New() sg.HBone = h2r stsc, err := sts.NewSTS(kr) if err != nil { return err } tcache := sts.NewTokenCache(kr, stsc) h2r.TokenCallback = tcache.Token sg.updateMeshEnv(ctx) h2r.EndpointResolver = func(sni string) *hbone.Endpoint { // Current Istio SNI looks like: // // outbound_.9090_._.prometheus-1-prometheus.mon.svc.cluster.local // We need to map it to a cloudrun external address, add token based on the audience, and make the call using // the tunnel. // // Also supports the 'natural' form // // parts := strings.Split(sni, ".") remoteService := parts[0] if parts[0] == "outbound_" { remoteService = parts[3] // TODO: extract 'version' from URL, convert it to cloudrun revision ? // TODO: watcher on Service or ServiceEntry ( k8s or XDS ) to get annotation, allowing service name to be different } base := remoteService + ".a.run.app" h2c := h2r.NewClient() ep := h2c.NewEndpoint("https://" + base + "/_hbone/15003") ep.SNI = base return ep } sg.SNIListener, err = hbone.ListenAndServeTCP(sniPort, h2r.HandleSNIConn) if err != nil { return err } return nil } // Wait for the hgate and internal hgate service, set the config func (sg *MeshConnector) WaitService(ctx context.Context, name string) (string, error) { for { if ctx.Err() != nil { return "", ctx.Err() } ts, err := sg.Client.CoreV1().Services("istio-system").Get(ctx, name, metav1.GetOptions{}) if err != nil { if !Is404(err) { log.Println("Error getting service", name, err) return "", err } } if ts != nil && len(ts.Status.LoadBalancer.Ingress) > 0 { return ts.Status.LoadBalancer.Ingress[0].IP, nil } time.Sleep(200 * time.Millisecond) } }