pkg/gcp/gcp-kubeconfig.go (416 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcp
import (
"context"
"encoding/base64"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"cloud.google.com/go/compute/metadata"
"github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/cas"
"github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/k8s"
"github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/sts"
"google.golang.org/api/option"
"google.golang.org/grpc"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
container "cloud.google.com/go/container/apiv1"
"github.com/GoogleCloudPlatform/cloud-run-mesh/pkg/mesh"
"k8s.io/client-go/kubernetes"
kubeconfig "k8s.io/client-go/tools/clientcmd/api"
// Required for k8s client to link in the authenticator
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
crm "google.golang.org/api/cloudresourcemanager/v1"
containerpb "google.golang.org/genproto/googleapis/container/v1"
)
// Integration with GCP - use metadata server or GCP-specific env variables to auto-configure connection to a
// GKE cluster and extract metadata.
// Using the metadata package, which connects to 169.254.169.254, metadata.google.internal or $GCE_METADATA_HOST (http, no prefix)
// Will attempt to guess if running on GCP if env variable is not set.
// Note that requests are using a 2 sec timeout.
// TODO: finish hub.
// Cluster wraps cluster information for a discovered hub or gke cluster.
type Cluster struct {
ClusterName string
ClusterLocation string
ProjectId string
GKECluster *containerpb.Cluster
KubeConfig *kubeconfig.Config
}
var (
GCPInitTime time.Duration
)
// configFromEnvAndMD will attempt to configure ProjectId, ClusterName, ClusterLocation, ProjectNumber, used on GCP
// Metadata server will be tried if env variables don't exist.
func configFromEnvAndMD(ctx context.Context, kr *mesh.KRun) error {
if kr.ProjectId == "" {
kr.ProjectId = os.Getenv("PROJECT_ID")
}
if kr.ClusterName == "" {
kr.ClusterName = os.Getenv("CLUSTER_NAME")
}
if kr.ClusterLocation == "" {
kr.ClusterLocation = os.Getenv("CLUSTER_LOCATION")
}
if kr.ProjectNumber == "" {
kr.ProjectNumber = os.Getenv("PROJECT_NUMBER")
}
t0 := time.Now()
var gsa string
if metadata.OnGCE() {
// TODO: detect if the cluster is k8s from some env ?
// If ADC is set, we will only use the env variables. Else attempt to init from metadata server.
metaProjectId, _ := metadata.ProjectID()
if kr.ProjectId == "" {
kr.ProjectId = metaProjectId
}
//instanceID, _ := metadata.InstanceID() // 00bf...f23
//instanceName, _ := metadata.InstanceName()
//zone, _ := metadata.Zone()
//pAttr, _ := metadata.ProjectAttributes() //
//hn, _ := metadata.Hostname() //
//iAttr, _ := metadata.InstanceAttributes() // zone us-central1-1
gsa, _ = metadata.Email("default")
//log.Println("Additional metadata:", "iid", instanceID, "iname", instanceName, "iattr", iAttr,
// "zone", zone, "hostname", hn, "pAttr", pAttr, "email", email)
if kr.Namespace == "" {
if strings.HasPrefix(gsa, "k8s-") {
parts := strings.Split(gsa[4:], "@")
kr.Namespace = parts[0]
if mesh.Debug {
log.Println("Defaulting Namespace based on GSA: ", kr.Namespace, gsa)
}
}
}
var err error
if kr.InCluster && kr.ClusterName == "" {
kr.ClusterName, err = metadata.Get("instance/attributes/cluster-name")
if err != nil {
log.Println("Can't find cluster name")
// Not a critical problem - it's using in-cluster credentials.
}
}
if kr.InCluster && kr.ClusterLocation == "" {
kr.ClusterLocation, err = metadata.Get("instance/attributes/cluster-location")
if err != nil {
// Not a critical problem.
log.Println("Can't find cluster location")
}
}
if kr.InstanceID == "" {
kr.InstanceID, _ = metadata.InstanceID()
}
}
if kr.Namespace == "" {
// Explicitly set ?
labels, err := ProjectLabels(ctx, kr.ProjectId)
if err != nil {
log.Println("Failed to find labels")
return err
}
kr.Namespace = labels["mesh-namespace"]
}
if kr.Namespace == "" {
// Default convention for project-as-namespace:
// PREFIX-CONFIG_PROJECT-NAMESPACE-SUFFIX
// Pid may have lowercase letters/digits/hyphens - mostly DNS conventions.
pidparts := strings.Split(kr.ProjectId, "-")
if len(pidparts) > 2 {
kr.Namespace = pidparts[len(pidparts) - 2]
if mesh.Debug {
log.Println("Defaulting Namespace based on project ID: ", kr.Namespace, kr.ProjectId)
}
}
}
log.Println("GCP config ",
"gsa", gsa,
"cluster", kr.ProjectId + "/" + kr.ClusterLocation + "/" + kr.ClusterName,
"projectNumber", kr.ProjectNumber,
"iid", kr.InstanceID,
"location", kr.ClusterLocation,
"sinceStart", time.Since(t0))
return nil
}
func RegionFromMetadata() (string, error) {
v, err := metadata.Get("instance/region")
if err != nil {
return "", err
}
vs := strings.SplitAfter(v, "/regions/")
if len(vs) != 2 {
return "", fmt.Errorf("malformed region value split into %#v", vs)
}
return vs[1], nil
}
func InitGCP(ctx context.Context, kr *mesh.KRun) error {
// Avoid direct dependency on GCP libraries - may be replaced by a REST client or different XDS server discovery.
kc := &k8s.K8S{Mesh: kr}
err := kc.K8SClient(ctx)
if err != nil {
return err
}
// Load GCP env variables - will be needed.
configFromEnvAndMD(ctx, kc.Mesh)
// Init additional GCP-specific env, and load the k8s cluster using discovery
err = initGKE(ctx, kc)
if err != nil {
return err
}
kr.Cfg = kc
kr.TokenProvider = kc
// After the config was loaded.
kr.PostConfigLoad = PostConfigLoad
return err
}
func PostConfigLoad(ctx context.Context, kr *mesh.KRun) error {
var err error
// TODO: Use MeshCA if citadel is not in cluster
tokenProvider, err := sts.NewSTS(kr)
// This doesn't work
// Could not use the REFLECTED_SPIFFE subject mode because the caller does not have a SPIFFE identity. Please visit the CA Service documentation to ensure that this is a supported use-case
// tokenProvider.MDPSA = true
// The token MUST be the federated access token
tokenProvider.UseAccessToken = true // even if audience is provided.
var ol []grpc.DialOption
ol = append(ol, grpc.WithPerRPCCredentials(tokenProvider))
//ol = append(ol, OTELGRPCClient()...)
// TODO: only if mesh_env contains a WorkloadCertificateConfig with endpoint starting with //privateca.googleapis.com
// Errors results to fallback to pilot-agent and istio.
cap := kr.Config("CA_POOL", "")
if cap != "" {
kr.CSRSigner, err = cas.NewCASCertProvider(cap, ol)
}
return err
}
// InitGCP loads GCP-specific metadata and discovers the config cluster.
// This step is skipped if user has explicit configuration for required settings.
//
// Namespace,
// ProjectId, ProjectNumber
// ClusterName, ClusterLocation
func initGKE(ctx context.Context, kc *k8s.K8S) error {
if kc.Client != nil {
// Running in-cluster or using kube config
return nil
}
t0 := time.Now()
var kConfig *kubeconfig.Config
var err error
// TODO: attempt to get the config project ID from a label on the workload or project
// (if metadata servere or CR can provide them)
kr := kc.Mesh
configProjectID := kr.ProjectId
configLocation := kr.ClusterLocation
configClusterName := kr.ClusterName
if kr.MeshAddr != nil {
if kr.MeshAddr.Scheme == "gke" {
configProjectID = kr.MeshAddr.Host
} else if kr.MeshAddr.Host == "container.googleapis.com" {
// Not using the hub resourceLink format:
// container.googleapis.com/projects/wlhe-cr/locations/us-central1-c/clusters/asm-cr
// or the 'selfLink' from GKE list API
// "https://container.googleapis.com/v1/projects/wlhe-cr/locations/us-west1/clusters/istio"
configProjectID = kr.MeshAddr.Host
if len(kr.MeshAddr.Path) > 1 {
parts := strings.Split(kr.MeshAddr.Path, "/")
for i := 0 ; i < len(parts); i++ {
if parts[i] == "projects" && i+1 < len(parts) {
configProjectID = parts[i+1]
}
if parts[i] == "locations" && i+1 < len(parts) {
configLocation = parts[i+1]
}
if parts[i] == "clusters" && i+1 < len(parts) {
configClusterName = parts[i+1]
}
}
}
}
}
if configProjectID == "" {
// GCP can't be initialized without a project ID
return nil
}
var cl *Cluster
if configLocation == "" || configClusterName == "" {
// ~500ms
label := "mesh_id"
// Try to get the region from metadata server. For Cloudrun, this is not the same with the cluster - it may be zonal
myRegion, _ := RegionFromMetadata()
if myRegion == "" {
myRegion = configLocation
}
if kr.MeshAddr != nil && kr.MeshAddr.Scheme == "gke" {
// Explicit mesh config clusters, no label selector ( used for ASM clusters in current project )
label = ""
}
log.Println("Selecting a GKE cluster ", kr.ProjectId, configProjectID, myRegion)
cll, err := AllClusters(ctx, kr, configProjectID, label, "")
if err != nil {
return err
}
if len(cll) == 0 {
return nil // no cluster to use
}
cl = findCluster(kc, cll, myRegion, cl)
// TODO: connect to cluster, find istiod - and keep trying until a working one is found ( fallback )
} else {
// Explicit override - user specified the full path to the cluster.
// ~400 ms
if mesh.Debug {
log.Println("Load GKE cluster explicitly", configProjectID, configLocation, configClusterName)
}
cl, err = GKECluster(ctx, kr, configProjectID, configLocation, configClusterName)
if err != nil {
return err
}
if err != nil {
log.Println("Failed in NewForConfig", kr, err)
return err
}
}
kr.ProjectId = configProjectID
kr.TrustDomain = configProjectID + ".svc.id.goog"
kConfig = cl.KubeConfig
if kr.ClusterName == "" {
kr.ClusterName = cl.ClusterName
}
if kr.ClusterLocation == "" {
kr.ClusterLocation = cl.ClusterLocation
}
GCPInitTime = time.Since(t0)
rc, err := restConfig(kConfig)
if err != nil {
return err
}
kc.Client, err = kubernetes.NewForConfig(rc)
if err != nil {
return err
}
return nil
}
func restConfig(kc *kubeconfig.Config) (*rest.Config, error) {
// TODO: set default if not set ?
return clientcmd.NewNonInteractiveClientConfig(*kc, "", &clientcmd.ConfigOverrides{}, nil).ClientConfig()
}
func findCluster(kr *k8s.K8S, cll []*Cluster, myRegion string, cl *Cluster) *Cluster {
if kr.Mesh.ClusterName != "" {
for _, c := range cll {
if myRegion != "" && !strings.HasPrefix(c.ClusterLocation, myRegion) {
continue
}
if c.ClusterName == kr.Mesh.ClusterName {
cl = c
break
}
}
if cl == nil {
for _, c := range cll {
if c.ClusterName == kr.Mesh.ClusterName {
cl = c
break
}
}
}
}
// First attempt to find a cluster in same region, with the name prefix istio (TODO: label or other way to identify
// preferred config clusters)
if cl == nil {
for _, c := range cll {
if myRegion != "" && !strings.HasPrefix(c.ClusterLocation, myRegion) {
continue
}
if strings.HasPrefix(c.ClusterName, "istio") {
cl = c
break
}
}
}
if cl == nil {
for _, c := range cll {
if myRegion != "" && !strings.HasPrefix(c.ClusterLocation, myRegion) {
continue
}
cl = c
break
}
}
if cl == nil {
for _, c := range cll {
if strings.HasPrefix(c.ClusterName, "istio") {
cl = c
}
}
}
// Nothing in same region, pick the first
if cl == nil {
cl = cll[0]
}
return cl
}
func GKECluster(ctx context.Context, kr *mesh.KRun, p, l, clusterName string) (*Cluster, error) {
opts := []option.ClientOption{}
if p != kr.ProjectId {
opts = append(opts, option.WithQuotaProject(p))
}
cl, err := container.NewClusterManagerClient(ctx, opts...)
if err != nil {
log.Println("Failed NewClusterManagerClient", p, l, clusterName, err)
return nil, err
}
for i := 0; i < 5; i++ {
gcr := &containerpb.GetClusterRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/cluster/%s", p, l, clusterName),
}
c, e := cl.GetCluster(ctx, gcr)
if e == nil {
rc := &Cluster{
ProjectId: p,
ClusterLocation: c.Location,
ClusterName: c.Name,
GKECluster: c,
KubeConfig: addClusterConfig(c, p, l, clusterName),
}
return rc, nil
}
log.Println("Failed GetCluster, retry", gcr, p, l, clusterName, err)
time.Sleep(1 * time.Second)
err = e
}
return nil, err
}
func ProjectLabels(ctx context.Context, p string) (map[string]string, error) {
cr, err := crm.NewService(ctx)
if err != nil {
return nil, err
}
pdata, err := cr.Projects.Get(p).Context(ctx).Do()
if err != nil {
log.Println("Error getting project number", p, err)
return nil, err
}
return pdata.Labels, nil
}
func ProjectNumber(p string) string {
ctx := context.Background()
cr, err := crm.NewService(ctx)
if err != nil {
return ""
}
pdata, err := cr.Projects.Get(p).Do()
if err != nil {
log.Println("Error getting project number", p, err)
return ""
}
// This is in v1 - v3 has it encoded in name.
return strconv.Itoa(int(pdata.ProjectNumber))
}
func AllClusters(ctx context.Context, kr *mesh.KRun, configProjectId string,
label string, meshID string) ([]*Cluster, error) {
clustersL := []*Cluster{}
if configProjectId == "" {
configProjectId = kr.ProjectId
}
opts := []option.ClientOption{}
if configProjectId != kr.ProjectId {
opts = append(opts, option.WithQuotaProject(configProjectId))
}
cl, err := container.NewClusterManagerClient(ctx,opts...)
if err != nil {
return nil, err
}
if configProjectId == "" {
configProjectId = kr.ProjectId
}
clcr := &containerpb.ListClustersRequest{
Parent: "projects/" + configProjectId + "/locations/-",
}
clusters, err := cl.ListClusters(ctx, clcr)
if err != nil {
return nil, err
}
for _, c := range clusters.Clusters {
if label != "" { // Filtered by label - if the filter is specified, ignore non-labeled clusters
if meshID == "" { // If a value for label was specified, used it for filtering
if c.ResourceLabels[label] == "" {
continue
}
} else {
if c.ResourceLabels[label] != meshID {
continue
}
}
}
clustersL = append(clustersL, &Cluster{
ProjectId: configProjectId,
ClusterName: c.Name,
ClusterLocation: c.Location,
GKECluster: c,
KubeConfig: addClusterConfig(c, configProjectId, c.Location, c.Name),
})
}
return clustersL, nil
}
func addClusterConfig(c *containerpb.Cluster, p, l, clusterName string) *kubeconfig.Config {
kc := kubeconfig.NewConfig()
caCert, err := base64.StdEncoding.DecodeString(c.MasterAuth.ClusterCaCertificate)
if err != nil {
caCert = nil
}
ctxName := "gke_" + p + "_" + l + "_" + clusterName
// We need a KUBECONFIG - tools/clientcmd/api/Config object
kc.CurrentContext = ctxName
kc.Contexts[ctxName] = &kubeconfig.Context{
Cluster: ctxName,
AuthInfo: ctxName,
}
kc.Clusters[ctxName] = &kubeconfig.Cluster{
Server: "https://" + c.Endpoint,
CertificateAuthorityData: caCert,
}
kc.AuthInfos[ctxName] = &kubeconfig.AuthInfo{
AuthProvider: &kubeconfig.AuthProviderConfig{
Name: "gcp",
},
}
kc.CurrentContext = ctxName
return kc
}