src/main.go (138 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"main/statequery"
"net/http"
"os"
"strconv"
"time"
)
// Type for global configuration data
type config struct {
port string
project string
locations []string
threshold float64
bucket string
}
func main() {
cfg := config{}
cfg.configure()
// Initialize empty resource manager cache with maxTTL
// Note that the cache is unlikely to live this long when executed in some environments.
// e.g. Cloud Run (with --min-instances 0) is likely to regularly dispose the serving
// instances. The maxTTL setting helps with executing this service on long-lived
// infrastructure, where it is required to eventually refreshed the cache entries.
cache := &statequery.Cache{}
cache.Initialize(time.Hour)
ctx := context.Background()
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
// Always respond with JSON
w.Header().Set("Content-Type", "application/json")
log.Println("starting analysis")
// Start from a clean slate and track state
state := statequery.State{}
// Retrieve all BQ reservations from current (admin) project
err := state.RetrieveReservations(ctx, cfg.project, cfg.locations)
if err != nil {
log.Fatalf("failed to retrieve reservations: %v\n", err)
}
// Abort if no reservations have been found
if len(state.Reservations) == 0 {
json.NewEncoder(w).Encode(state)
return
}
// Retrieve all assignments for each reservation
err = state.RetrieveAssignments(ctx, cfg.project, cache)
if err != nil {
log.Fatalf("failed to retrieve assignments: %v\n", err)
}
// Retrieve info for all running jobs from projects with reservations
err = state.RetrieveJobs(ctx)
if err != nil {
log.Fatalf("failed to retrieve jobs: %v\n", err)
}
// Compute utilization totals
state.ComputeUtilization(cfg.threshold)
err = state.DumpState(ctx, cfg.bucket)
if err != nil {
log.Fatalf("failed to dump state: %v\n", err)
}
// Abort if no reservation is breaching its threshold
alert := false
for _, reservation := range state.Reservations {
if reservation.ThresholdBreached {
alert = true
}
}
if !alert {
json.NewEncoder(w).Encode(state)
return
}
// Render message from template
message, err := state.RenderMessage()
if err != nil {
log.Fatalf("failed to render message: %v\n", err)
}
// Push rendered message to all defined chat webhooks
err = statequery.SendMessage(cfg.hooks(), message)
if err != nil {
log.Fatalf("failed to send message: %v\n", err)
}
// Encode state to HTTP response
json.NewEncoder(w).Encode(state)
})
log.Println("listening for connections")
http.ListenAndServe(fmt.Sprintf(":%s", cfg.port), nil)
}
func (cfg *config) configure() {
// Read PORT to listen on
cfg.port = "8080"
port := os.Getenv("PORT")
if port != "" {
cfg.port = port
}
// Set admin GCP project, which holds BQ reservations/assignments
cfg.project = os.Getenv("GOOGLE_CLOUD_PROJECT")
// Configure locations for resolution of BQ reservations
cfg.locations = []string{
"US",
"EU",
"asia-east1",
"asia-east2",
"asia-northeast1",
"asia-northeast2",
"asia-northeast3",
"asia-south1",
"asia-south2",
"asia-southeast1",
"asia-southeast2",
"australia-southeast1",
"australia-southeast2",
"europe-central2",
"europe-north1",
"europe-west1",
"europe-west2",
"europe-west3",
"europe-west4",
"europe-west5",
"europe-west6",
"northamerica-northeast1",
"northamerica-northeast2",
"southamerica-east1",
// "southamerica-west1", # Endpoint unavailable
"us-central1",
"us-east1",
"us-east4",
"us-west1",
"us-west2",
"us-west3",
"us-west4",
}
// Sets alerting threshold for utilization alarms
thres, err := strconv.ParseFloat(os.Getenv("USAGE_THRESHOLD"), 64)
if err != nil {
log.Println("failed to parse threshold from USAGE_THRESHOLD, defaulting to 0.8")
thres = 0.8
}
cfg.threshold = thres
//Bucket to dump state to
cfg.bucket = os.Getenv("STATE_BUCKET")
}
// Get a webhook url for a given chat service.
// Unlike the rest of the configuration, this is kept dynamic to ensure that always
// the latest version of the secret webhook are loaded and used.
func (cfg *config) hooks() map[string]string {
hooks := make(map[string]string)
// Read secret webhooks from ENV vars
hooks["slack"] = os.Getenv("SLACK_WEBHOOK_URL")
hooks["gchat"] = os.Getenv("GCHAT_WEBHOOK_URL")
//Override secret webhooks from secret volume mounts, if available
for service := range hooks {
file := fmt.Sprintf("/%s/webhook", service)
_, err := os.Stat(file)
if err == nil {
secret, err := os.ReadFile(file)
if err != nil {
log.Fatalf("failed to read secret from volume mount: %s\n", file)
}
hooks[service] = string(secret)
}
}
return hooks
}