lib/notifiers/notifiers.go (464 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package notifiers import ( "bytes" "context" "encoding/json" "errors" "flag" "fmt" "html/template" "io" "io/ioutil" "net/http" "net/url" "os" "regexp" "strings" "time" cbpb "cloud.google.com/go/cloudbuild/apiv1/v2/cloudbuildpb" secretmanager "cloud.google.com/go/secretmanager/apiv1" smpb "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb" "cloud.google.com/go/storage" log "github.com/golang/glog" "github.com/google/cel-go/cel" "github.com/google/cel-go/checker/decls" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/protoadapt" "gopkg.in/yaml.v2" ) const ( cloudBuildProtoPkg = "google.devtools.cloudbuild.v1" cloudBuildTopic = "cloud-builds" defaultHTTPPort = "8080" secretRef = "secretRef" ) var ( // Set of allowed notifier Config `apiVersions`. allowedYAMLAPIVersions = map[string]bool{ "cloud-build-notifiers/v1": true, } allowedTemplateTypes = map[string]bool{ "golang": true, } ) // Flags. var ( smoketest = flag.Bool("smoketest", false, "If true, Main will simply log the notifier type and exit.") setupCheck = flag.Bool("setup_check", false, "If true, the configuration YAML is read from stdin and notifier.SetUp is called in a faked-out way. The smoketest flag takes priority over this one.") ) var ( gcsConfigPattern = regexp.MustCompile(`^gs://([[\w-_.]+)/([^\\]+$)`) ) // Config is the common type for (YAML-based) configuration files for notifications. type Config struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata *Metadata `yaml:"metadata"` Spec *Spec `yaml:"spec"` } // Metadata is a KRD-compliant data container used for metadata references. type Metadata struct { Name string `yaml:"name"` } // Spec is the data container for the fields that are relevant to the functionality of the notifier. type Spec struct { Notification *Notification `yaml:"notification"` Secrets []*Secret `yaml:"secrets"` } // Notification is the data container for the fields that are relevant to the configuration of sending the notification. type Notification struct { Filter string `yaml:"filter"` Delivery map[string]interface{} `yaml:"delivery"` Params map[string]string `yaml:"params"` Template *Template `yaml:"template"` } type Template struct { Type string `yaml:"type"` URI string `yaml:"uri"` Content string `yaml:"content"` } // TemplateView is the data container for the fields relevant to rendering a template type TemplateView struct { Build *BuildView `json:"Build"` Params map[string]string `json:"Params"` } // BuildView is the data container that contains the build type BuildView struct { *cbpb.Build } // SecretConfig is the data container used in a Spec.Notification config for referencing a secret in the Spec.Secrets list. type SecretConfig struct { LocalName string `yaml:"secretRef"` } // Secret is a data container matching the local name of a secret to its GCP SecretManager resource name. type Secret struct { LocalName string `yaml:"name"` ResourceName string `yaml:"value"` } // Copied from https://cloud.google.com/run/docs/tutorials/pubsub#looking_at_the_code. type pubSubPushMessage struct { Data []byte `json:"data,omitempty"` ID string `json:"id"` PublishTime string `json:"publishTime"` } type pubSubPushWrapper struct { Message pubSubPushMessage Subscription string `json:"subscription"` } // Notifier is the interface type that users should implement for usage in Cloud Build notifiers. type Notifier interface { SetUp(context.Context, *Config, string, SecretGetter, BindingResolver) error SendNotification(context.Context, *cbpb.Build) error } // SecretGetter allows for fetching secrets from some key store. type SecretGetter interface { GetSecret(context.Context, string) (string, error) } // EventFilter is a type that can be used to filter Builds for notifications. type EventFilter interface { // Apply returns true iff the EventFilter is able to execute successfully and matches the given Build. Apply(context.Context, *cbpb.Build) bool } // CELPredicate is an EventFilter that uses a CEL program to determine if // notifications should be sent for a given Pub/Sub message. type CELPredicate struct { prg cel.Program } // Apply returns true iff the underlying CEL program returns true for the given Build. func (c *CELPredicate) Apply(_ context.Context, build *cbpb.Build) bool { out, _, err := c.prg.Eval(map[string]interface{}{"build": build}) if err != nil { log.Errorf("failed to evaluate the CEL filter: %v", err) return false } match, ok := out.Value().(bool) if !ok { log.Errorf("failed to convert output %v of CEL filter program to a boolean: %v", out, err) return false } return match } // Main is a function that can be called by `main()` functions in notifier binaries. func Main(notifier Notifier) error { // TODO(ljr): Refactor/separate this flagged logic from the main logic via a Main/doMain refactor. ctx := context.Background() if !flag.Parsed() { flag.Parse() } if *smoketest { log.V(0).Infof("notifier smoketest: %T", notifier) return nil } if *setupCheck { log.V(2).Info("starting setup check") cfg, err := decodeConfig(os.Stdin) if err != nil { return fmt.Errorf("failed to decode YAML config from stdin: %w", err) } if out, err := yaml.Marshal(cfg); err != nil { log.Warningf("failed to re-encode config YAML: %v", err) } else { log.V(2).Infof("got re-encoded YAML from stdin:\n%s", string(out)) } if err := validateConfig(cfg); err != nil { return fmt.Errorf("failed to validate config during setup check: %w", err) } br, err := newResolver(cfg) if err != nil { return fmt.Errorf("failed to create BindingResolver during setup check: %w", err) } if err := notifier.SetUp(ctx, cfg, "", new(setupCheckSecretGetter), br); err != nil { return fmt.Errorf("failed to run notifier.SetUp during setup check: %w", err) } log.V(2).Infof("setup check successful") return nil } cfgPath, ok := GetEnv("CONFIG_PATH") if !ok { return errors.New("expected CONFIG_PATH to be non-empty") } sc, err := storage.NewClient(ctx) if err != nil { return fmt.Errorf("failed to create new GCS client: %w", err) } defer sc.Close() smc, err := secretmanager.NewClient(ctx) if err != nil { return fmt.Errorf("failed to create new SecretManager client: %w", err) } defer smc.Close() cfg, err := getGCSConfig(ctx, &actualGCSReaderFactory{sc}, cfgPath) if err != nil { return fmt.Errorf("failed to get config from GCS: %w", err) } if err := validateConfig(cfg); err != nil { return fmt.Errorf("got invalid config from path %q: %w", cfgPath, err) } log.V(2).Infof("got config from GCS (%q): %+v\n", cfgPath, cfg) tmpl, err := parseTemplate(ctx, cfg.Spec.Notification.Template, &actualGCSReaderFactory{sc}) if err != nil { return fmt.Errorf("failed to parse template from notiifer spec %q: %w", cfg.Spec.Notification.Template, err) } sm := &actualSecretManager{client: smc} br, err := newResolver(cfg) if err != nil { return fmt.Errorf("failed to construct a binding resolver: %v", err) } if err := notifier.SetUp(ctx, cfg, tmpl, sm, br); err != nil { return fmt.Errorf("failed to call SetUp on notifier: %w", err) } _, ignoreBadMessages := GetEnv("IGNORE_BAD_MESSAGES") log.V(2).Infoln("starting HTTP server...") // Our Pub/Sub push receiver. http.HandleFunc("/", newReceiver(notifier, &receiverParams{ignoreBadMessages})) // An auxilliary, healthz-style receiver. // You can call this endpoint using the curl command here: // https://cloud.google.com/run/docs/triggering/https-request#creating_private_services. startTime := time.Now() http.HandleFunc("/helloz", func(w http.ResponseWriter, _ *http.Request) { fmt.Fprintf(w, "Greetings from a Google Cloud Build notifier: %T!\nStart Time: %s\nCurrent Time: %s\n", notifier, startTime.Format(time.RFC1123), time.Now().Format(time.RFC1123)) }) var port string if p, ok := GetEnv("PORT"); ok { port = p } else { log.Warningf("PORT environment variable was not present, using %s instead", defaultHTTPPort) port = defaultHTTPPort } // Block on the HTTP's health. return http.ListenAndServe(":"+port, nil) } func parseTemplate(ctx context.Context, tmpl *Template, grf gcsReaderFactory) (string, error) { templateString := "" if tmpl != nil { if _, ok := allowedTemplateTypes[tmpl.Type]; !ok { return "", fmt.Errorf("got invalid Template Type: %v", tmpl.Type) } if tmpl.URI != "" { parsed, err := getGCSTemplate(ctx, grf, tmpl.URI) if err != nil { return "", fmt.Errorf("failed to get template from GCS: %w", err) } templateString = parsed } else { templateString = tmpl.Content } if err := validateTemplate(templateString); err != nil { return "", fmt.Errorf("got invalid template from path %q: %w", tmpl.URI, err) } } return templateString, nil } type gcsReaderFactory interface { NewReader(ctx context.Context, bucket, object string) (io.ReadCloser, error) } type actualGCSReaderFactory struct { client *storage.Client } func (a *actualGCSReaderFactory) NewReader(ctx context.Context, bucket, object string) (io.ReadCloser, error) { return a.client.Bucket(bucket).Object(object).NewReader(ctx) } type actualSecretManager struct { client *secretmanager.Client // TODO(ljr): Do we want any sort of timed cache here? } func (a *actualSecretManager) GetSecret(ctx context.Context, name string) (string, error) { // See https://github.com/GoogleCloudPlatform/golang-samples/blob/master/secretmanager/access_secret_version.go# for an example usage. res, err := a.client.AccessSecretVersion(ctx, &smpb.AccessSecretVersionRequest{Name: name}) if err != nil { return "", fmt.Errorf("failed to get secret named %q: %w", name, err) } return string(res.GetPayload().GetData()), nil } // setupCheckSecretGetter is a faked-out SecretGetter that is only used by the setup check functionality in Main. type setupCheckSecretGetter struct{} func (c *setupCheckSecretGetter) GetSecret(_ context.Context, name string) (string, error) { return fmt.Sprintf("[SECRET VALUE FOR %q]", name), nil } // getGCSConfig fetches the YAML Config file from the given GCS path and returns the parsed Config. func getGCSConfig(ctx context.Context, grf gcsReaderFactory, path string) (*Config, error) { if !gcsConfigPattern.MatchString(path) { return nil, fmt.Errorf("expected path %q to match pattern %v", path, gcsConfigPattern) } // if trm := strings.TrimPrefix(path, "gs://"); trm != path { // // path started with the prefix // path = trm // } else { // return nil, fmt.Errorf("expected %q to start with `gs://`", path) // } // split := strings.SplitN(path, "/", 2) // log.V(2).Infof("got path split: %+v", split) // if len(split) != 2 { // return nil, fmt.Errorf("path has incorrect format (expected form: `[gs://]bucket/path/to/object`): %q => %s", path, strings.Join(split, ", ")) // } split := gcsConfigPattern.FindStringSubmatch(path) if len(split) != 3 { return nil, fmt.Errorf("path has incorrect format (expected form: `[gs://]bucket/path/to/object`): %q => %s", path, strings.Join(split, ", ")) } bucket, object := split[1], split[2] r, err := grf.NewReader(ctx, bucket, object) if err != nil { return nil, fmt.Errorf("failed to get reader for (bucket=%q, object=%q): %w", bucket, object, err) } defer r.Close() cfg, err := decodeConfig(r) if err != nil { return nil, fmt.Errorf("failed to parse configuration from YAML at %q: %w", path, err) } return cfg, nil } // getGCSConfig fetches the Template file from the given GCS path and returns the parsed Config. func getGCSTemplate(ctx context.Context, grf gcsReaderFactory, path string) (string, error) { if trm := strings.TrimPrefix(path, "gs://"); trm != path { // path started with the prefix path = trm } else { return "", fmt.Errorf("expected %q to start with `gs://`", path) } split := strings.SplitN(path, "/", 2) log.V(2).Infof("got path split: %+v", split) if len(split) != 2 { return "", fmt.Errorf("path has incorrect format (expected form: `[gs://]bucket/path/to/object`): %q => %s", path, strings.Join(split, ", ")) } bucket, object := split[0], split[1] r, err := grf.NewReader(ctx, bucket, object) if err != nil { return "", fmt.Errorf("failed to get reader for (bucket=%q, object=%q): %w", bucket, object, err) } defer r.Close() tmpl, err := decodeTemplate(r) if err != nil { return "", fmt.Errorf("failed to parse template at %q: %w", path, err) } return tmpl, nil } func decodeConfig(r io.Reader) (*Config, error) { cfg := new(Config) dcd := yaml.NewDecoder(r) dcd.SetStrict(true) return cfg, dcd.Decode(cfg) } func decodeTemplate(r io.Reader) (string, error) { buf := new(bytes.Buffer) _, err := buf.ReadFrom(r) if err != nil { return "", fmt.Errorf("failed to parse template from Template file: %w", err) } return buf.String(), nil } // validateConfig checks the following (or errors): // - apiVersion is one of allowedYAMLAPIVersions. // - user substitution names match the subNamePattern regexp. func validateConfig(cfg *Config) error { if allowed := allowedYAMLAPIVersions[cfg.APIVersion]; !allowed { return fmt.Errorf("expected `apiVersion` %q to be one of the following:\n%v", cfg.APIVersion, allowedYAMLAPIVersions) } if cfg.Spec == nil { return errors.New("expected config.spec to be present") } if cfg.Spec.Notification == nil { return errors.New("expected config.spec.notification to be present") } return nil } func validateTemplate(s string) error { _, err := template.New("").Funcs(template.FuncMap{ "replace": func(s, old, new string) string { return strings.ReplaceAll(s, old, new) }, }).Parse(s) return err } // MakeCELPredicate returns a CELPredicate for the given filter string of CEL code. func MakeCELPredicate(filter string) (*CELPredicate, error) { env, err := cel.NewEnv( // Declare the `build` variable for useage in CEL programs. cel.Declarations(decls.NewIdent("build", decls.NewObjectType(cloudBuildProtoPkg+".Build"), nil)), // Register the `Build` type in the environment. cel.Types(new(cbpb.Build)), // `Container` is necessary for better (enum) scoping // (i.e with this, we don't need to use the fully qualified proto path in our programs). cel.Container(cloudBuildProtoPkg), ) if err != nil { return nil, fmt.Errorf("failed to create a CEL env: %w", err) } ast, issues := env.Compile(filter) if issues != nil && issues.Err() != nil { return nil, fmt.Errorf("failed to compile CEL filter %q: %w", filter, issues.Err()) } if !proto.Equal(ast.ResultType(), decls.Bool) { return nil, fmt.Errorf("expected CEL filter %q to have a boolean result type, but was %v", filter, ast.ResultType()) } prg, err := env.Program(ast, cel.EvalOptions(cel.OptOptimize)) if err != nil { return nil, fmt.Errorf("failed to create CEL program from filter %q: %w", filter, err) } return &CELPredicate{prg}, nil } // GetEnv fetches, logs, and returns the given environment variable. The returned boolean is true iff the value is non-empty. func GetEnv(name string) (string, bool) { val := os.Getenv(name) if val == "" { log.V(2).Infof("env var %q is empty", name) } else { log.V(2).Infof("env var %q is %q", name, val) } return val, val != "" } type receiverParams struct { ignoreBadMessages bool } // newReceiver returns a Pub/Sub push HTTP receiving http.HandlerFunc that calls the given notifier. func newReceiver(notifier Notifier, params *receiverParams) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() var pspw pubSubPushWrapper body, err := ioutil.ReadAll(r.Body) if err != nil { log.Errorf("failed to read request message: %v", err) http.Error(w, "Bad request body", http.StatusBadRequest) return } if err := json.Unmarshal(body, &pspw); err != nil { log.Errorf("failed to unmarshal body %q: %v", body, err) http.Error(w, "Bad pubsub.Message JSON", http.StatusBadRequest) return } log.V(2).Infof("got PubSub message with ID %q from subscription %q", pspw.Message.ID, pspw.Subscription) build := new(cbpb.Build) // Be as lenient as possible in unmarshalling. // `Unmarshal` will fail if we get a payload with a field that is unknown to the current proto version unless `DiscardUnknown` is set. uo := protojson.UnmarshalOptions{ AllowPartial: true, DiscardUnknown: true, } bv2 := protoadapt.MessageV2Of(build) if err := uo.Unmarshal(pspw.Message.Data, bv2); err != nil { if params.ignoreBadMessages { log.Warningf("not attempting to handle unmarshal-able Pub/Sub message id=%q data=%q publishTime=%q which gave error: %v", pspw.Message.ID, string(pspw.Message.Data), pspw.Message.PublishTime, err) return } log.Errorf("failed to unmarshal PubSub message id=%q data=%q publishTime=%q into a Build: %v", pspw.Message.ID, string(pspw.Message.Data), pspw.Message.PublishTime, err) http.Error(w, "Bad Cloud Build Pub/Sub data", http.StatusBadRequest) return } build = protoadapt.MessageV1Of(bv2).(*cbpb.Build) log.V(2).Infof("got PubSub Build payload:\n%+v\nattempting to send notification", prototext.Format(build)) if err := notifier.SendNotification(ctx, build); err != nil { log.Errorf("failed to run SendNotification: %v", err) http.Error(w, "failed to send notification", http.StatusInternalServerError) return } log.V(2).Infof("acking PubSub message %q with Build payload:\n%v", pspw.Message.ID, prototext.Format(build)) } } // GetSecretRef is a helper function for getting a Secret's local reference name from the given config. func GetSecretRef(config map[string]interface{}, fieldName string) (string, error) { field, ok := config[fieldName] if !ok { return "", fmt.Errorf("field name %q not present in notification config %v", fieldName, config) } m, ok := field.(map[interface{}]interface{}) if !ok { return "", fmt.Errorf("expected secret field %q to be a map[interface{}]interface{} object", fieldName) } ref, ok := m[secretRef] if !ok { return "", fmt.Errorf("expected field %q to be of the form `secretRef: <some-ref>`", fieldName) } sRef, ok := ref.(string) if !ok { return "", fmt.Errorf("expected field %q of parent %q to have a string value", secretRef, fieldName) } return sRef, nil } // FindSecretResourceName is a helper function that returns the Secret's resource name that is associated with the given local reference name. func FindSecretResourceName(secrets []*Secret, ref string) (string, error) { for _, s := range secrets { if s.LocalName == ref { return s.ResourceName, nil } } return "", fmt.Errorf("failed to find Secret with reference name %q in the given secret list", ref) } // UTMMedium is an enum that corresponds to a strict set of values for `utm_medium`. type UTMMedium string const ( // EmailMedium is for Build log URLs that are sent via email. EmailMedium UTMMedium = "email" // StorageMedium is for Build log URLS that are sent to a storage medium (i.e. BigQuery). StorageMedium = "storage" // ChatMedium is for Build log URLs that are sent over chat applications. ChatMedium = "chat" // HTTPMedium is for Build log URLs that are sent over HTTP(S) communication (that does not belong to one of the other mediums). HTTPMedium = "http" // OtherMedium is for Build log URLs that sent are over a medium that does not correspond to one of the above mediums. OtherMedium = "other" ) // AddUTMParams adds UTM campaign tracking parameters to the given Build log URL and returns the new version. // The UTM parameters are added to any existing ones, so any existing params will not be ovewritten. func AddUTMParams(logURL string, medium UTMMedium) (string, error) { u, err := url.Parse(logURL) if err != nil { return "", fmt.Errorf("failed to parse URL %q: %w", logURL, err) } // Use ParseQuery to fail if we get malformed params to start with, since it should never happen. vals, err := url.ParseQuery(u.RawQuery) if err != nil { return "", fmt.Errorf("failed to parse query from %q: %w", logURL, err) } var m string switch medium { case EmailMedium, StorageMedium, ChatMedium, HTTPMedium, OtherMedium: m = string(medium) default: return "", fmt.Errorf("unknown UTM medium: %q", medium) } // Use `Add` instead of `Set` so we don't override any existing params. vals.Add("utm_campaign", "google-cloud-build-notifiers") vals.Add("utm_medium", m) vals.Add("utm_source", "google-cloud-build") u.RawQuery = vals.Encode() return u.String(), nil }