tools/gke-billing-export/billing.go (296 lines of code) (raw):
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"time"
autoconfig "github.com/dparrish/go-autoconfig"
"cloud.google.com/go/bigquery"
"golang.org/x/oauth2/google"
"google.golang.org/api/container/v1"
"google.golang.org/api/googleapi"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
)
type PodMetricsList struct {
Kind string
APIVersion string
Items []struct {
Timestamp string
Window string
Metadata struct {
Namespace string
Name string
SelfLink string
}
Containers []struct {
Name string
Usage struct {
CPU string
Memory string
}
}
}
}
type Usage struct {
Timestamp time.Time `bigquery:"timestamp"`
Project string `bigquery:"project"`
Cluster string `bigquery:"cluster"`
Namespace string `bigquery:"namespace"`
ServiceAccount string `bigquery:"serviceaccount"`
Pod string `bigquery:"pod"`
ReservedCPU int64 `bigquery:"reserved_cpu"`
ReservedRAM int64 `bigquery:"reserved_ram"`
UsedCPU int64 `bigquery:"used_cpu"`
UsedRAM int64 `bigquery:"used_ram"`
}
var (
configFile = flag.String("config", "config.json", "Configuration file")
numGoroutines = flag.Int("goroutines", 10, "Number of parallel goroutines")
)
func createDataset(ctx context.Context, dataset *bigquery.Dataset) error {
err := dataset.Create(ctx, nil)
if err == nil {
log.Println("Created dataset")
return nil
}
if e, ok := err.(*googleapi.Error); ok {
if e.Code == 409 {
// Dataset already exists. This is fine.
return nil
}
}
return err
}
func createTable(ctx context.Context, table *bigquery.Table) error {
schema, err := bigquery.InferSchema(Usage{})
if err != nil {
return fmt.Errorf("unable to infer schema")
}
err = table.Create(ctx, &bigquery.TableMetadata{Schema: schema})
if err == nil {
log.Println("Created table")
return nil
}
if e, ok := err.(*googleapi.Error); ok {
if e.Code == 409 {
// Table already exists. This is fine.
return nil
}
}
return err
}
func backgroundThread(ctx context.Context, table *bigquery.Table, ch <-chan *Cluster) {
for {
c, ok := <-ch
if !ok {
return
}
st := time.Now()
usage := make(map[string]*Usage)
// Fetch pods.
pods, err := c.clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
log.Printf("Error fetching pods for %s/%s: %v", c.project, c.cluster, err)
if errors.IsUnauthorized(err) {
log.Printf("Authorization has expired, exiting")
os.Exit(0)
}
continue
}
for _, pod := range pods.Items {
u := getPod(usage, c.project, c.cluster, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
u.ServiceAccount = pod.Spec.ServiceAccountName
for _, container := range pod.Spec.Containers {
if value, ok := container.Resources.Requests["cpu"]; ok {
u.ReservedCPU += value.MilliValue()
}
if value, ok := container.Resources.Requests["memory"]; ok {
u.ReservedRAM += value.Value()
}
}
}
// Fetch current usage from metrics-server.
metrics := &PodMetricsList{}
res := c.clientset.RESTClient().Get().Prefix("/apis/metrics.k8s.io/v1beta1/pods").Do()
obj, err := res.Raw()
if err != nil {
log.Printf("Error fetching metrics for %s/%s: %v", c.project, c.cluster, err)
continue
}
if err := json.Unmarshal(obj, metrics); err != nil {
log.Printf("Error fetching metrics for %s/%s: %v", c.project, c.cluster, err)
continue
}
for _, item := range metrics.Items {
u := getPod(usage, c.project, c.cluster, item.Metadata.Namespace, item.Metadata.Name)
for _, container := range item.Containers {
m, err := resource.ParseQuantity(container.Usage.Memory)
if err != nil {
log.Printf("Error parsing memory %q: %v", container.Usage.Memory, err)
} else {
u.UsedRAM += m.Value()
}
c, err := resource.ParseQuantity(container.Usage.CPU)
if err != nil {
log.Printf("Error parsing CPU %q: %v", container.Usage.CPU, err)
} else {
u.UsedCPU += c.MilliValue()
}
}
}
// Write rows to BigQuery.
var rows []*Usage
for _, row := range usage {
rows = append(rows, row)
}
if len(rows) == 0 {
continue
}
u := table.Uploader()
if err := u.Put(ctx, rows); err != nil {
log.Printf("Error writing to BigQuery: %v", err)
continue
}
log.Printf("Sent %d rows to bigquery for project %q cluster %q in %s", len(rows), c.project, c.cluster, time.Since(st))
}
}
func main() {
flag.Parse()
// Load the configuration file from disk.
ctx := context.Background()
config, err := autoconfig.Load(ctx, *configFile)
if err != nil {
log.Printf("Unable to load configuration %q: %v", *configFile, err)
}
if err := config.Watch(ctx); err != nil {
log.Printf("Unable to watch configuration, auto reload is disabled", err)
}
// Create the BigQuery client.
bq, err := bigquery.NewClient(ctx, config.Get("bigquery.project"))
if err != nil {
log.Fatal(err)
}
dataset := bq.DatasetInProject(config.Get("bigquery.project"), config.Get("bigquery.dataset"))
if err := createDataset(ctx, dataset); err != nil {
log.Fatalf("Error creating dataset %s/%s: %v", config.Get("bigquery.project"), config.Get("bigquery.dataset"), err)
}
table := dataset.Table(config.Get("bigquery.table"))
if err := createTable(ctx, table); err != nil {
log.Fatalf("Error creating table %s/%s.%s: %v", config.Get("bigquery.project"), config.Get("bigquery.dataset"), config.Get("bigquery.table"), err)
}
clusters := getAllClusters(ctx, config)
config.AddValidator(func(old, new *autoconfig.Config) error {
clusters = getAllClusters(ctx, new)
return nil
})
// Start all background goroutines.
ch := make(chan *Cluster, *numGoroutines)
for i := 0; i < *numGoroutines; i++ {
go backgroundThread(ctx, table, ch)
}
// Loop forever, pushing each cluster to the background workers for processing, then sleep.
for {
for _, c := range clusters {
ch <- c
}
d, err := time.ParseDuration(config.Get("interval"))
if err != nil {
time.Sleep(60 * time.Second)
} else {
time.Sleep(d)
}
}
}
func getPod(usage map[string]*Usage, project, cluster, namespace, pod string) *Usage {
key := fmt.Sprintf("%s/%s/%s/%s", project, cluster, namespace, pod)
if u, ok := usage[key]; ok {
return u
}
u := &Usage{
Timestamp: time.Now(),
Project: project,
Cluster: cluster,
Namespace: namespace,
Pod: pod,
}
usage[key] = u
return u
}
func getClientset(ctx context.Context, cluster *container.Cluster, token string) (*kubernetes.Clientset, error) {
if cluster.Status != "RUNNING" {
return nil, fmt.Errorf("cluster is not running")
}
ca, err := base64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate)
if err != nil {
return nil, fmt.Errorf("error decoding cluster masterAuth: %v", err)
}
// Configure TLS, with certificates if basic auth is enabled.
tlsconfig := rest.TLSClientConfig{CAData: ca}
if cluster.MasterAuth.Username != "" {
cc, err := base64.StdEncoding.DecodeString(cluster.MasterAuth.ClientCertificate)
if err != nil {
return nil, fmt.Errorf("error decoding cluster masterAuth: %v", err)
}
ck, err := base64.StdEncoding.DecodeString(cluster.MasterAuth.ClientKey)
if err != nil {
return nil, fmt.Errorf("error decoding cluster masterAuth: %v", err)
}
tlsconfig.CertData = cc
tlsconfig.KeyData = ck
}
config := &rest.Config{
Host: fmt.Sprintf("https://%s/", cluster.Endpoint),
TLSClientConfig: tlsconfig,
Username: cluster.MasterAuth.Username,
Password: cluster.MasterAuth.Password,
BearerToken: token,
}
if err := rest.SetKubernetesDefaults(config); err != nil {
return nil, fmt.Errorf("error setting Kubernetes config: %v", err)
}
return kubernetes.NewForConfig(config)
}
type Cluster struct {
project string
cluster string
clientset *kubernetes.Clientset
}
func getAllClusters(ctx context.Context, config *autoconfig.Config) []*Cluster {
// Authenticate to the Google Cloud APIs.
hc, err := google.DefaultClient(ctx, container.CloudPlatformScope)
if err != nil {
log.Fatalf("Could not get authenticated client: %v", err)
}
gke, err := container.New(hc)
if err != nil {
log.Fatalf("Could not initialize GKE client: %v", err)
}
// Get a token source suitable for querying GKE clusters.
ts, err := google.DefaultTokenSource(ctx, container.CloudPlatformScope)
if err != nil {
log.Fatalf("could not get Google token source: %v", err)
}
token, err := ts.Token()
if err != nil {
log.Fatalf("could not get Google token: %v", err)
}
log.Printf("Fetching a list of all clusters")
var ret []*Cluster
for _, project := range config.GetAll("projects") {
clusters, err := gke.Projects.Zones.Clusters.List(project, "-").Do()
if err != nil {
log.Fatalf("Could not get the list of GKE clusters: %v", err)
}
for _, cluster := range clusters.Clusters {
clientset, err := getClientset(ctx, cluster, token.AccessToken)
if err != nil {
log.Printf("Error getting clientset for %s/%s: %v", project, cluster.Name, err)
continue
}
ret = append(ret, &Cluster{
project: project,
cluster: cluster.Name,
clientset: clientset,
})
log.Printf(" %s/%s", project, cluster.Name)
}
}
return ret
}