bigquery/main.go (317 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
"os"
"regexp"
"text/template"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
cbpb "cloud.google.com/go/cloudbuild/apiv1/v2/cloudbuildpb"
"github.com/GoogleCloudPlatform/cloud-build-notifiers/lib/notifiers"
log "github.com/golang/glog"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/google"
"github.com/google/go-containerregistry/pkg/v1/remote"
"google.golang.org/protobuf/types/known/timestamppb"
)
var tableResource = regexp.MustCompile(".*/.*/.*/(.*)/.*/(.*)")
var terminalStatusCodes = map[cbpb.Build_Status]bool{
cbpb.Build_SUCCESS: true,
cbpb.Build_FAILURE: true,
cbpb.Build_INTERNAL_ERROR: true,
cbpb.Build_TIMEOUT: true,
cbpb.Build_CANCELLED: true,
cbpb.Build_EXPIRED: true,
}
// TODO(aricz)
const megaByte = int64(1000000)
func main() {
if err := notifiers.Main(&bqNotifier{bqf: &actualBQFactory{}}); err != nil {
log.Fatalf("fatal error: %v", err)
}
}
type bqNotifier struct {
bqf bqFactory
filter notifiers.EventFilter
tmpl *template.Template
client bq
br notifiers.BindingResolver
tmplView *notifiers.TemplateView
}
type bqRow struct {
ProjectID string
ID string
BuildTriggerID string
Status string
Images []*buildImage
Steps []*buildStep
CreateTime civil.DateTime
StartTime civil.DateTime
FinishTime civil.DateTime
Tags []string
Env []string
LogURL string
Substitutions []*substitution
JSON string
}
type substitution struct {
Key string
Value string
}
type buildImage struct {
SHA string
ContainerSizeMB *big.Rat
}
type buildStep struct {
Name string
ID string
Status string
Args []string
StartTime civil.DateTime
EndTime civil.DateTime
}
type actualBQ struct {
client *bigquery.Client
dataset *bigquery.Dataset
table *bigquery.Table
}
type actualBQFactory struct {
}
func (bqf *actualBQFactory) Make(ctx context.Context) (bq, error) {
projectID := os.Getenv("PROJECT_ID")
if projectID == "" {
return nil, errors.New("PROJECT_ID environment variable must be set")
}
bqClient, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("error initializing bigquery client: %v", err)
}
newClient := &actualBQ{client: bqClient}
return newClient, nil
}
func getImageSize(layers []v1.Layer) (*big.Rat, error) {
totalSum := int64(0)
for _, layer := range layers {
layerSize, err := layer.Size()
if err != nil {
return nil, fmt.Errorf("error parsing layer %v: %v", layer, err)
}
totalSum += layerSize
}
return big.NewRat(totalSum, megaByte), nil
}
func imageManifestToBuildImage(image string) (*buildImage, error) {
ref, err := name.ParseReference(image)
if err != nil {
return nil, fmt.Errorf("error parsing image reference: %v", err)
}
img, err := remote.Image(ref, remote.WithAuthFromKeychain(google.Keychain))
if err != nil {
return nil, fmt.Errorf("error obtaining image reference: %v", err)
}
sha, err := img.Digest()
layers, err := img.Layers()
// Calculating the compressed image size
containerSize, err := getImageSize(layers)
if err != nil {
return nil, err
}
return &buildImage{SHA: sha.String(), ContainerSizeMB: containerSize}, nil
}
func (n *bqNotifier) SetUp(ctx context.Context, cfg *notifiers.Config, bigQueryJson string, _ notifiers.SecretGetter, br notifiers.BindingResolver) error {
prd, err := notifiers.MakeCELPredicate(cfg.Spec.Notification.Filter)
if err != nil {
return fmt.Errorf("failed to make a CEL predicate: %v", err)
}
parsed, ok := cfg.Spec.Notification.Delivery["table"].(string)
if !ok {
return fmt.Errorf("expected table string: %v", cfg.Spec.Notification.Delivery)
}
// Initialize client
n.filter = prd
n.client, err = n.bqf.Make(ctx)
if err != nil {
return fmt.Errorf("failed to initialize bigquery client: %v", err)
}
// Extract dataset id and table id from config
rs := tableResource.FindStringSubmatch(parsed)
if len(rs) != 3 {
return fmt.Errorf("failed to parse valid table URI: %v", parsed)
}
if err = n.client.EnsureDataset(ctx, rs[1]); err != nil {
return err
}
if err = n.client.EnsureTable(ctx, rs[2]); err != nil {
return err
}
tmpl, err := template.New("bq_json_template").Parse(bigQueryJson)
n.tmpl = tmpl
n.br = br
return nil
}
func parsePBTime(time *timestamppb.Timestamp) (civil.DateTime, error) {
if time == nil {
return civil.DateTime{}, fmt.Errorf("timestamp is nil")
}
newTime := time.AsTime()
return civil.DateTimeOf(newTime), nil
}
func (n *bqNotifier) SendNotification(ctx context.Context, build *cbpb.Build) error {
if !n.filter.Apply(ctx, build) {
log.V(2).Infof("not doing BQ write for build %v", build.Id)
return nil
}
if build.BuildTriggerId == "" {
log.Warningf("build passes filter but does not have a trigger ID. Build id: %q, status: %v", build.Id, build.GetStatus())
}
if !terminalStatusCodes[build.Status] {
log.Infof("not writing to BigQuery for non-terminal build status %v", build.Status.String())
return nil
}
log.Infof("sending Big Query write for build %q (status: %q)", build.Id, build.Status)
if build.ProjectId == "" {
return fmt.Errorf("build missing project id")
}
buildImages := []*buildImage{}
shaSet := make(map[string]bool)
if build.Status == cbpb.Build_SUCCESS {
for _, image := range build.GetImages() {
buildImage, err := imageManifestToBuildImage(image)
if err != nil {
return fmt.Errorf("error parsing image manifest: %v", err)
}
if shaSet[buildImage.SHA] {
continue
}
shaSet[buildImage.SHA] = true
buildImages = append(buildImages, buildImage)
}
}
buildSteps := []*buildStep{}
createTime, err := parsePBTime(build.CreateTime)
if err != nil {
return fmt.Errorf("error parsing CreateTime: %v", err)
}
startTime, err := parsePBTime(build.StartTime)
if err != nil {
return fmt.Errorf("error parsing StartTime: %v", err)
}
finishTime, err := parsePBTime(build.FinishTime)
if err != nil {
return fmt.Errorf("error parsing FinishTime: %v", err)
}
unixZeroTimestamp := timestamppb.New(time.Unix(0, 0))
for _, step := range build.GetSteps() {
st := step.GetTiming().GetStartTime()
et := step.GetTiming().GetEndTime()
if st == nil {
st = unixZeroTimestamp
}
if et == nil {
et = unixZeroTimestamp
}
startTime, err := parsePBTime(st)
if err != nil {
return fmt.Errorf("error parsing StartTime: %v", err)
}
endTime, err := parsePBTime(et)
if err != nil {
return fmt.Errorf("error parsing EndTime: %v", err)
}
newStep := &buildStep{
Name: step.Name,
ID: step.Id,
Status: step.GetStatus().String(),
Args: step.Args,
StartTime: startTime,
EndTime: endTime,
}
buildSteps = append(buildSteps, newStep)
}
logURL, err := notifiers.AddUTMParams(build.LogUrl, notifiers.StorageMedium)
if err != nil {
return fmt.Errorf("error generating UTM params: %v", err)
}
substitutions := []*substitution{}
for key, value := range build.Substitutions {
substitutions = append(substitutions, &substitution{key, value})
}
var bindings map[string]string
if n.br != nil {
bindings, err = n.br.Resolve(ctx, nil, build)
if err != nil {
return fmt.Errorf("failed to resolve bindings: %w", err)
}
}
n.tmplView = ¬ifiers.TemplateView{
Build: ¬ifiers.BuildView{Build: build},
Params: bindings,
}
var buf bytes.Buffer
if err := n.tmpl.Execute(&buf, n.tmplView); err != nil {
return err
}
newRow := &bqRow{
ProjectID: build.ProjectId,
ID: build.Id,
BuildTriggerID: build.BuildTriggerId,
Status: build.Status.String(),
Images: buildImages,
Steps: buildSteps,
CreateTime: createTime,
StartTime: startTime,
FinishTime: finishTime,
Tags: build.Tags,
Env: build.Options.Env,
LogURL: logURL,
Substitutions: substitutions,
JSON: buf.String(),
}
return n.client.WriteRow(ctx, newRow)
}
func (bq *actualBQ) EnsureDataset(ctx context.Context, datasetName string) error {
// Check for existence of dataset, create if false
bq.dataset = bq.client.Dataset(datasetName)
_, err := bq.client.Dataset(datasetName).Metadata(ctx)
if err != nil {
log.Warningf("error obtaining dataset metadata: %v;Creating new BigQuery dataset: %q", err, datasetName)
if err := bq.dataset.Create(ctx, &bigquery.DatasetMetadata{
Name: datasetName, Description: "BigQuery Notifier Build Data",
}); err != nil {
return fmt.Errorf("error creating dataset: %v", err)
}
}
return nil
}
func (bq *actualBQ) EnsureTable(ctx context.Context, tableName string) error {
// Check for existence of table, create if false
bq.table = bq.dataset.Table(tableName)
schema, err := bigquery.InferSchema(bqRow{})
if err != nil {
return fmt.Errorf("failed to infer schema: %v", err)
}
metadata, err := bq.dataset.Table(tableName).Metadata(ctx)
if err != nil {
log.Warningf("Error obtaining table metadata: %q;Creating new BigQuery table: %q", err, tableName)
// Create table if it does not exist.
if err := bq.table.Create(ctx, &bigquery.TableMetadata{Name: tableName, Description: "BigQuery Notifier Build Data Table", Schema: schema}); err != nil {
return fmt.Errorf("failed to initialize table %v: ", err)
}
} else if len(metadata.Schema) == 0 {
log.Warningf("No schema found for table, writing new schema for table: %v", tableName)
update := bigquery.TableMetadataToUpdate{
Schema: schema,
}
if _, err := bq.table.Update(ctx, update, metadata.ETag); err != nil {
return fmt.Errorf("error: unable to update schema of table: %v", err)
}
}
return nil
}
func (bq *actualBQ) WriteRow(ctx context.Context, row *bqRow) error {
ins := bq.table.Inserter()
log.V(2).Infof("Writing row: %v", row)
if err := ins.Put(ctx, row); err != nil {
return fmt.Errorf("error inserting row into BQ: %v", err)
}
return nil
}