func backgroundThread()

in tools/gke-billing-export/billing.go [114:194]


func backgroundThread(ctx context.Context, table *bigquery.Table, ch <-chan *Cluster) {
	for {
		c, ok := <-ch
		if !ok {
			return
		}
		st := time.Now()

		usage := make(map[string]*Usage)

		// Fetch pods.
		pods, err := c.clientset.CoreV1().Pods("").List(metav1.ListOptions{})
		if err != nil {
			log.Printf("Error fetching pods for %s/%s: %v", c.project, c.cluster, err)
			if errors.IsUnauthorized(err) {
				log.Printf("Authorization has expired, exiting")
				os.Exit(0)
			}
			continue
		}
		for _, pod := range pods.Items {
			u := getPod(usage, c.project, c.cluster, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
			u.ServiceAccount = pod.Spec.ServiceAccountName

			for _, container := range pod.Spec.Containers {
				if value, ok := container.Resources.Requests["cpu"]; ok {
					u.ReservedCPU += value.MilliValue()
				}
				if value, ok := container.Resources.Requests["memory"]; ok {
					u.ReservedRAM += value.Value()
				}
			}
		}

		// Fetch current usage from metrics-server.
		metrics := &PodMetricsList{}
		res := c.clientset.RESTClient().Get().Prefix("/apis/metrics.k8s.io/v1beta1/pods").Do()
		obj, err := res.Raw()
		if err != nil {
			log.Printf("Error fetching metrics for %s/%s: %v", c.project, c.cluster, err)
			continue
		}
		if err := json.Unmarshal(obj, metrics); err != nil {
			log.Printf("Error fetching metrics for %s/%s: %v", c.project, c.cluster, err)
			continue
		}
		for _, item := range metrics.Items {
			u := getPod(usage, c.project, c.cluster, item.Metadata.Namespace, item.Metadata.Name)
			for _, container := range item.Containers {
				m, err := resource.ParseQuantity(container.Usage.Memory)
				if err != nil {
					log.Printf("Error parsing memory %q: %v", container.Usage.Memory, err)
				} else {
					u.UsedRAM += m.Value()
				}

				c, err := resource.ParseQuantity(container.Usage.CPU)
				if err != nil {
					log.Printf("Error parsing CPU %q: %v", container.Usage.CPU, err)
				} else {
					u.UsedCPU += c.MilliValue()
				}
			}
		}

		// Write rows to BigQuery.
		var rows []*Usage
		for _, row := range usage {
			rows = append(rows, row)
		}
		if len(rows) == 0 {
			continue
		}
		u := table.Uploader()
		if err := u.Put(ctx, rows); err != nil {
			log.Printf("Error writing to BigQuery: %v", err)
			continue
		}
		log.Printf("Sent %d rows to bigquery for project %q cluster %q in %s", len(rows), c.project, c.cluster, time.Since(st))
	}
}