in tools/gke-billing-export/billing.go [114:194]
func backgroundThread(ctx context.Context, table *bigquery.Table, ch <-chan *Cluster) {
for {
c, ok := <-ch
if !ok {
return
}
st := time.Now()
usage := make(map[string]*Usage)
// Fetch pods.
pods, err := c.clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
log.Printf("Error fetching pods for %s/%s: %v", c.project, c.cluster, err)
if errors.IsUnauthorized(err) {
log.Printf("Authorization has expired, exiting")
os.Exit(0)
}
continue
}
for _, pod := range pods.Items {
u := getPod(usage, c.project, c.cluster, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
u.ServiceAccount = pod.Spec.ServiceAccountName
for _, container := range pod.Spec.Containers {
if value, ok := container.Resources.Requests["cpu"]; ok {
u.ReservedCPU += value.MilliValue()
}
if value, ok := container.Resources.Requests["memory"]; ok {
u.ReservedRAM += value.Value()
}
}
}
// Fetch current usage from metrics-server.
metrics := &PodMetricsList{}
res := c.clientset.RESTClient().Get().Prefix("/apis/metrics.k8s.io/v1beta1/pods").Do()
obj, err := res.Raw()
if err != nil {
log.Printf("Error fetching metrics for %s/%s: %v", c.project, c.cluster, err)
continue
}
if err := json.Unmarshal(obj, metrics); err != nil {
log.Printf("Error fetching metrics for %s/%s: %v", c.project, c.cluster, err)
continue
}
for _, item := range metrics.Items {
u := getPod(usage, c.project, c.cluster, item.Metadata.Namespace, item.Metadata.Name)
for _, container := range item.Containers {
m, err := resource.ParseQuantity(container.Usage.Memory)
if err != nil {
log.Printf("Error parsing memory %q: %v", container.Usage.Memory, err)
} else {
u.UsedRAM += m.Value()
}
c, err := resource.ParseQuantity(container.Usage.CPU)
if err != nil {
log.Printf("Error parsing CPU %q: %v", container.Usage.CPU, err)
} else {
u.UsedCPU += c.MilliValue()
}
}
}
// Write rows to BigQuery.
var rows []*Usage
for _, row := range usage {
rows = append(rows, row)
}
if len(rows) == 0 {
continue
}
u := table.Uploader()
if err := u.Put(ctx, rows); err != nil {
log.Printf("Error writing to BigQuery: %v", err)
continue
}
log.Printf("Sent %d rows to bigquery for project %q cluster %q in %s", len(rows), c.project, c.cluster, time.Since(st))
}
}