pkg/mesh/krun.go (376 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesh
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"
"github.com/google/uuid"
"gopkg.in/yaml.v2"
)
type Cfg interface {
GetSecret(ctx context.Context, ns string, name string) (map[string][]byte, error)
GetCM(ctx context.Context, ns string, name string) (map[string]string, error)
}
type TokenProvider interface {
GetToken(ctx context.Context, aud string) (string, error)
}
// KRun allows running an app in an Istio and K8S environment.
type KRun struct {
// BaseDir is the root directory for all created files and all lookups.
// If empty, will default to "/" when running as root, and "./" when running as regular user.
// MESH_BASE_DIR will override it.
BaseDir string
// Config maps to 'mount'. Key is the config map name, value is a path.
// Config mounts are optional (for now)
CM2Dirs map[string]string
// Audience to files. For each key, a k8s token with the given audience
// will be created. Files should be under /var/run/secrets
Aud2File map[string]string
// ProxyConfig is a subset of istio ProxyConfig
ProxyConfig *ProxyConfig
// Address of the XDS server. If not specified, MCP is used.
XDSAddr string
// MeshTenant. Only set if using MCP or external Istiod.
// Opaque, internal string that identifies the mesh to the XDS server.
// Different from meshID - which is the user-visible form.
MeshTenant string
// External address of the mesh connector
// Not used for internal workloads.
MeshConnectorAddr string
// Internal (ILB) address.
MeshConnectorInternalAddr string
// Canonical name for the application.
// Will be set as "app" and "service.istio.io/canonical-name" labels
//
// If not set "default" will be used.
// TODO: use service name as default
Name string
// Revision
Rev string
// If not empty, will run Istio-agent as a gateway (router instead of sidecar)
// with the "istio: $Gateway" label.
Gateway string
// Agent debug config (example dns:debug).
// Based on ISTIO_DEBUG
AgentDebug string
// Namespace for the application. The user running the command must have
// the appropriate Token, Secret, ConfigMap permissions in the namespace.
//
// If not set, "default" will be used.
// TODO: use the GSA name as default namespace.
Namespace string
// KSA is the k8s service account for getting tokens.
//
// If not set, "default" will be used.
// TODO: use service name as default
KSA string
// ProjectId is the name of the project where config cluster is running
// The workload may be in a different project.
ProjectId string
// ProjectNumber is used for GCP federated token exchange.
// It is populated from the mesh-env PROJECT_NUMBER setting to construct the federated P4SA
// "service-" + s.kr.ProjectNumber + "@gcp-sa-meshdataplane.iam.gserviceaccount.com"
// This is used for MeshCA and Stackdriver access.
ProjectNumber string
// Deprecated - ClusterAddress used instead.
ClusterName string
// TODO: replace with Workloadlocation. Config cluster location not used.
ClusterLocation string
Children []*exec.Cmd
agentCmd *exec.Cmd
appCmd *exec.Cmd
TrustDomain string
StartTime time.Time
EnvoyStartTime time.Time
EnvoyReadyTime time.Time
AppReadyTime time.Time
Labels map[string]string
VendorInit func(context.Context, *KRun) error
// WhiteboxMode indicates no iptables capture
WhiteboxMode bool
InCluster bool
// PEM cert roots detected in the cluster - Citadel, custom CAs from mesh config.
// Will be saved to a file.
CARoots []string
// Citadel root(s) - PEM format, may have multiple roots.
//
CitadelRoot string
// MeshAddr is the location of the mesh environment file.
// This will be loaded at startup (TODO: and periodically or on demand for dynamic changes - XDS may also
// push configs)
//
//
//
// Supported formats:
// - https://.... - regular URL, using system certificates. Will return the mesh env directly.
// - file://... - load from file
// - gke://CONFIG_PROJECT_ID[/CLUSTER_LOCATION/CLUSTER_NAME/WORKLOAD_NAMESPACE] - GKE Container API.
MeshAddr *url.URL
// Config cluster address - https://container.googleapis.com/v1/projects/%s/locations/%s/clusters/%s
// Used in the identitynamespace config for STS exchange.
ClusterAddress string
InstanceID string
// Content of the 'mesh environment' - loaded from the config file in istio-system (or the address of the mesh).
// Additional entries may be merged from env or app specific config file.
MeshEnv map[string]string
CSRSigner CSRSigner
// Interface to abstract k8s implementation
TokenProvider TokenProvider
Cfg Cfg
TransportWrapper func(transport http.RoundTripper) http.RoundTripper
// Function to call after config has been loaded, before init certs.
PostConfigLoad func(ctx context.Context, kr *KRun) error
X509KeyPair *tls.Certificate
TrustedCertPool *x509.CertPool
// Holds Traffic Director sidecar environment.
TdSidecarEnv *TdSidecarEnv
// Network Name for which the envoy configs will be requested. For TD, this refers to VPC network name
// in the forwarding rule.
NetworkName string
}
var Debug = false
// New creates an uninitialized mesh launcher.
func New() *KRun {
kr := &KRun{
MeshEnv: map[string]string{},
TrustedCertPool: x509.NewCertPool(),
StartTime: time.Now(),
Aud2File: map[string]string{},
Labels: map[string]string{},
ProxyConfig: &ProxyConfig{},
TdSidecarEnv: NewTdSidecarEnv(),
}
kr.initFromEnv()
return kr
}
func (kr *KRun) InitForTD() {
if len(kr.ProjectNumber) == 0 {
if projectNumber, err := kr.TdSidecarEnv.fetchProjectNumber(); err != nil {
log.Println("Unable to auto-generate project_number: ", err)
} else {
kr.ProjectNumber = projectNumber
}
}
if nodeID, err := kr.TdSidecarEnv.fetchNodeID(); err != nil {
kr.TdSidecarEnv.NodeID = fmt.Sprintf("%s~%s", uuid.New().String(), "127.0.0.1")
log.Println("Unable to generate proper nodeID, using: ", kr.TdSidecarEnv.NodeID)
} else {
kr.TdSidecarEnv.NodeID = nodeID
}
if zone, err := kr.TdSidecarEnv.fetchZone(); err != nil {
kr.TdSidecarEnv.EnvoyZone = "cloud-run-cluster"
log.Println("Unable to generate proper zone info, using: ", kr.TdSidecarEnv.EnvoyZone)
} else {
kr.TdSidecarEnv.EnvoyZone = zone
}
}
// Returns true if Mesh env variable refers to TD mesh
// Traffic Director expects MESH env in the following formats:
// * td:
// * td:projects={PROJECT_NUMBER}
// * td:scopes={SCOPE_NAME}
// * td:projects={PROJECT_NUMBER}&scopes={SCOPE_NAME}
func (kr *KRun) InitForTDFromMeshEnv() bool {
mesh := os.Getenv("MESH")
u, urlErr := url.Parse(mesh)
if urlErr != nil {
return false
}
if u.Scheme != "td" {
return false
}
if values, err := url.ParseQuery(u.Opaque); err == nil {
if projectNumber := values.Get("projects"); len(projectNumber) > 0 {
kr.ProjectNumber = projectNumber
}
if scope := values.Get("scopes"); len(scope) > 0 {
kr.TdSidecarEnv.Scope = scope
}
}
return true
}
// Extract Region from ClusterLocation
func (kr *KRun) Region() string {
p := strings.Split(kr.ClusterLocation, "-")
if len(p) < 3 {
return kr.ClusterLocation
}
return strings.Join(p[0:2], "-")
}
// initFromEnv will use the env variables, metadata server and cluster configmaps
// to get the initial configuration for Istio and KRun.
//
func (kr *KRun) initFromEnv() {
mesh := kr.Config("MESH", "")
if mesh != "" {
meshURL, err := url.Parse(mesh)
if err != nil {
log.Println("Ignoring invalid meshURL", mesh, err)
}
kr.MeshAddr = meshURL
}
// TODO: if meshURL is set and is file:// or gke:// - use it directly
if kr.KSA == "" {
// Same environment used for VMs
kr.KSA = os.Getenv("WORKLOAD_SERVICE_ACCOUNT")
}
// TODO: on GKE detect KSA from the JWT or workload cert.
// Same for trust domain if workload certs are enabled
if kr.KSA == "" {
kr.KSA = "default"
}
if kr.Namespace == "" {
// Same environment used for VMs
kr.Namespace = os.Getenv("WORKLOAD_NAMESPACE")
}
// TODO: detect the namespace from the JWT token if on GKE
if kr.Name == "" {
kr.Name = os.Getenv("WORKLOAD_NAME")
}
if kr.Gateway == "" {
kr.Gateway = os.Getenv("GATEWAY_NAME")
}
if kr.MeshTenant == "" {
kr.MeshTenant = os.Getenv("MESH_TENANT")
}
ks := os.Getenv("K_SERVICE")
if kr.Name == "" {
verNsName := strings.SplitN(ks, "--", 2)
if len(verNsName) > 1 {
ks = verNsName[1]
kr.Labels["ver"] = verNsName[0]
} else {
kr.Name = ks
}
}
kr.Aud2File = map[string]string{}
prefix := "."
if os.Getuid() == 0 {
prefix = ""
}
if kr.BaseDir == "" {
kr.BaseDir = os.Getenv("MESH_BASE_DIR")
}
if kr.BaseDir != "" {
prefix = kr.BaseDir
} else {
kr.BaseDir = prefix
}
if kr.TrustDomain == "" {
kr.TrustDomain = os.Getenv("TRUST_DOMAIN")
}
// This can be used to provide a k8s-like environment, for apps that need it.
// It might be better to just generate a kubeconfig file and not pretend we are inside a cluster.
//if !kr.InCluster {
// kr.Aud2File["api"] = prefix + "/var/run/secrets/kubernetes.io/serviceaccount/token"
//}
// TODO: stop using this, use ProxyConfig.DiscoveryAddress instead
if kr.XDSAddr == "" {
kr.XDSAddr = os.Getenv("XDS_ADDR")
}
pc := os.Getenv("PROXY_CONFIG")
if pc != "" {
err := yaml.Unmarshal([]byte(pc), &kr.ProxyConfig)
if err != nil {
log.Println("Invalid ProxyConfig, ignoring", err)
}
if kr.ProxyConfig.DiscoveryAddress != "" {
kr.XDSAddr = kr.ProxyConfig.DiscoveryAddress
}
}
// Advanced options
// example dns:debug
kr.AgentDebug = kr.Config("XDS_AGENT_DEBUG", "")
for _, e := range os.Environ() {
k := strings.SplitN(e, "=", 2)
if len(k) == 2 && strings.HasPrefix(k[0], "PORT_") && len(k[0]) > 5 {
kr.MeshEnv[k[0]] = k[1]
}
}
}
// Set defaults, after all config was loaded, for missing configs
func (kr *KRun) setDefaults() {
if kr.Namespace == "" {
kr.Namespace = "default"
}
if kr.Name == "" {
kr.Name = kr.Namespace
}
if kr.TrustDomain == "" && kr.ProjectId != "" {
kr.TrustDomain = kr.ProjectId + ".svc.id.goog"
}
if kr.KSA == "" {
kr.KSA = "default"
}
}
func (kr *KRun) LoadConfig(ctx context.Context) error {
if kr.XDSAddr == "" { // if the XDS_ADDR is set explicitly, no need to load mesh env.
err := kr.loadMeshEnv(ctx)
if err != nil {
log.Println("Error loadMeshEnv", "err", err)
return err
}
// Adjust 'derived' values if needed.
if kr.TrustDomain == "" && kr.ProjectId != "" {
kr.TrustDomain = kr.ProjectId + ".svc.id.goog"
}
}
if kr.ClusterAddress == "" {
kr.ClusterAddress = fmt.Sprintf("https://container.googleapis.com/v1/projects/%s/locations/%s/clusters/%s",
kr.ProjectId, kr.ClusterLocation, kr.ClusterName)
}
if kr.PostConfigLoad != nil {
kr.PostConfigLoad(ctx, kr)
}
kr.setDefaults()
err := kr.InitCertificates(ctx, WorkloadCertDir)
if err != nil {
log.Println("InitCertificates", "err", err)
return err
}
err = kr.InitRoots(ctx, WorkloadCertDir)
if err != nil {
log.Println("InitRoots", "err", err)
return err
}
return nil
}
// RefreshAndSaveTokens is run periodically to create token, secrets, config map files.
// The primary use is istio token expected by pilot agent.
// This should not be called unless pilot-agent/envoy or proxyless gRPC without library are used.
// pilot-agent is currently refreshing the certificates - WIP to move that here.
//
// Certs for 'direct' (library) use can be created without saving the tokens.
// 'library' means linking this or a similar package with the application.
func (kr *KRun) RefreshAndSaveTokens() {
// TODO: trace on errors
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
for aud, f := range kr.Aud2File {
kr.saveTokenToFile(ctx, kr.Namespace, aud, f)
}
kr.InitCertificates(ctx, WorkloadCertDir)
// TODO: we may want to reload mesh-env, and adjust behavior ( log levels, etc)
// Then we can also call kr.InitRoots(ctx, certBase).
time.AfterFunc(30*time.Minute, kr.RefreshAndSaveTokens)
}
func (kr *KRun) saveTokenToFile(ctx context.Context, ns string, audience string, destFile string) error {
t, err := kr.TokenProvider.GetToken(ctx, audience)
if err != nil {
log.Println("Error creating ", ns, kr.KSA, audience, err)
return err
}
lastSlash := strings.LastIndex(destFile, "/")
err = os.MkdirAll(destFile[:lastSlash], 0755)
if err != nil {
log.Println("Error creating dir", ns, kr.KSA, destFile[:lastSlash])
}
// Save the token, readable by app. Little value to have istio token as different user,
// for this separate container/sandbox is needed.
err = ioutil.WriteFile(destFile, []byte(t), 0644)
if err != nil {
log.Println("Error creating ", ns, kr.KSA, audience, destFile, err)
return err
}
return nil
}
// FindXDSAddr will determine which discovery address to use.
//
// The logic is:
// - if "mesh tenant" is set - use MCP. This is the main case.
// - if "mesh tehant" is not set - use the mesh connector for ASM/OSS
// - if an XDS_ADDR is explicitly set, use it - unless it is invalid ( MCP without tenant ID)
func (kr *KRun) FindXDSAddr() string {
if kr.XDSAddr != "" {
if (kr.MeshTenant == "-" || kr.MeshTenant == "") &&
strings.Contains(kr.XDSAddr, "googleapis.com") &&
strings.Contains(kr.XDSAddr, "meshconfig") {
log.Println("Ignoring meshconfig XDS address without tenant, using mesh connector")
} else {
return kr.XDSAddr
}
}
addr := ""
if kr.MeshTenant == "-" || kr.MeshTenant == "" {
// Explicitly in-cluster
addr = kr.MeshConnectorInternalAddr + ":15012"
} else {
// we have a mesh tenant - use MCP
// For staging: explicitly set XDS_ADDR in mesh-env
// To force use of in-cluster: set tenant to "-" in mesh-env
addr = "meshconfig.googleapis.com:443"
}
return addr
}
// loadMeshEnv will lookup the 'mesh-env', an opaque config for the mesh.
// Currently it is loaded from K8S
// TODO: URL, like 'konfig' ( including gcp pseudo-URL like gcp://cluster.location.project/.... )
//
func (kr *KRun) loadMeshEnv(ctx context.Context) error {
if kr.Cfg == nil {
return nil // no k8s, skip loading.
}
d, err := kr.Cfg.GetCM(ctx, "istio-system", "mesh-env")
if err != nil {
return err
}
return kr.initFromMeshEnv(d)
}
// initFromMeshEnv updates settings in KR - but only if they were not explicitly set by env
// variables.
func (kr *KRun) initFromMeshEnv(d map[string]string) error {
kr.MeshEnv = d
// See connector for supported values
kr.updateFromMap(d, "PROJECT_NUMBER", &kr.ProjectNumber)
kr.updateFromMap(d, "MESH_TENANT", &kr.MeshTenant)
kr.updateFromMap(d, "XDS_ADDR", &kr.XDSAddr)
kr.updateFromMap(d, "CLUSTER_NAME", &kr.ClusterName)
kr.updateFromMap(d, "CLUSTER_LOCATION", &kr.ClusterLocation)
kr.updateFromMap(d, "PROJECT_ID", &kr.ProjectId)
kr.updateFromMap(d, "MCON_ADDR", &kr.MeshConnectorAddr)
kr.updateFromMap(d, "IMCON_ADDR", &kr.MeshConnectorInternalAddr)
kr.updateFromMap(d, "CAROOT_ISTIOD", &kr.CitadelRoot)
if kr.CitadelRoot != "" {
kr.CARoots = append(kr.CARoots, kr.CitadelRoot)
}
return nil
}
func (kr *KRun) updateFromMap(d map[string]string, key string, dest *string) {
if d[key] != "" && *dest == "" {
*dest = d[key]
}
}
// Config returns a mesh setting, from env variable or the loaded mesh-env.
func (kr *KRun) Config(name, def string) string {
v := os.Getenv(name)
if v != "" {
return v
}
if kr.MeshEnv != nil {
v = kr.MeshEnv[name]
if v != "" {
return v
}
}
return def
}
// Signals handles the special signals.
//
// SIGTERM - send by docker on 'docker stop'.
// See https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace
func (kr *KRun) Signals() {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)
s := <-sigs
log.Println("Received SIGINT", "total_time", time.Since(kr.StartTime))
if kr.agentCmd != nil {
kr.agentCmd.Process.Signal(s)
}
if kr.appCmd != nil {
kr.appCmd.Process.Signal(s)
}
}()
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
s := <-sigs
log.Println("Received SIGTERM", "total_time", time.Since(kr.StartTime))
// Will start draining envoy
if kr.agentCmd != nil {
kr.agentCmd.Process.Signal(s)
}
if kr.appCmd != nil {
kr.appCmd.Process.Signal(s)
}
for _, a := range kr.Children {
a.Process.Signal(s)
}
}()
}
// GetTrafficDirectorIPTablesEnvVars returns env vars needed for iptables interception for TD
func (kr *KRun) GetTrafficDirectorIPTablesEnvVars() []string {
return kr.TdSidecarEnv.getIPTablesInterceptionEnvVars()
}
func (kr *KRun) PrepareTrafficDirectorBootstrap(templatePath string, outputPath string) error {
return kr.prepareTrafficDirectorBootstrap(templatePath, outputPath)
}