cmd/json2pubsub/function.go (364 lines of code) (raw):

package function // Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import ( "context" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "net/url" "os" "reflect" "strings" "time" "cloud.google.com/go/pubsub" secretmanager "cloud.google.com/go/secretmanager/apiv1" "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb" "github.com/GoogleCloudPlatform/functions-framework-go/functions" "github.com/google/cel-go/cel" celtypes "github.com/google/cel-go/common/types" "google.golang.org/api/option" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) var VERSION string = "0.2.0" var requestControlExpr cel.Program var extractMessage cel.Program var responseBody cel.Program var defaultResponseBody = true var pubsubTopic string var cloudProjectId string var userAgent string type GetPubsubTopicFunc func(ctx context.Context, cloudProjectId string, userAgent string, pubsubTopic string) (*pubsub.Topic, error) func getPubsubTopicReal(ctx context.Context, cloudProjectId string, userAgent string, pubsubTopic string) (*pubsub.Topic, error) { client, err := pubsub.NewClient(ctx, cloudProjectId, option.WithUserAgent(userAgent)) if err != nil { return nil, err } topic := client.Topic(pubsubTopic) return topic, nil } var GetPubsubTopic GetPubsubTopicFunc = getPubsubTopicReal func getEnvironmentVariable(name string, required bool) (string, error) { envVar := os.Getenv(name) if envVar == "" && required { log.Fatal().Msgf("Environment variable %s is required.", name) } // Check if this is a Secret Manager secret if strings.HasPrefix(envVar, "gsm:") { ctx := context.Background() client, err := secretmanager.NewClient(ctx, option.WithUserAgent(userAgent)) if err != nil { log.Fatal().Err(err).Msgf("Failed to setup Secret Manager client: %v", err) } defer client.Close() accessRequest := &secretmanagerpb.AccessSecretVersionRequest{ Name: envVar[4:], } result, err := client.AccessSecretVersion(ctx, accessRequest) if err != nil { log.Fatal().Err(err).Msgf("Failed to access Secret %s: %v", envVar[4:], err) } return string(result.Payload.Data), nil } return envVar, nil } func setControlCel(celEnv *cel.Env, controlCel string) (err error) { requestControlExpr, err = GetCelProgram(celEnv, controlCel, true) if err != nil { return err } return nil } func setMessageCel(celEnv *cel.Env, messageCel string) (err error) { extractMessage, err = GetCelProgram(celEnv, messageCel, false) if err != nil { return err } return nil } func setResponseBodyCel(celEnv *cel.Env, bodyCel string) (err error) { if bodyCel != "" { responseBody, err = GetCelProgram(celEnv, bodyCel, false) if err != nil { return err } defaultResponseBody = false } else { defaultResponseBody = true } return nil } func Startup() (port string) { port = os.Getenv("PORT") if port == "" { // Probably not running as function, configure a more console friendly logger log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) port = "8080" } log.Info().Msgf("Starting Json2Pubsub server, version %s...", VERSION) userAgent = fmt.Sprintf("google-pso-tool/json2pubsub/%s", VERSION) cloudProjectId = os.Getenv("GOOGLE_CLOUD_PROJECT") if cloudProjectId == "" { log.Fatal().Msg("Environment variable GOOGLE_CLOUD_PROJECT is not set.") } log.Info().Msgf("Using cloud project: %s", cloudProjectId) var err error pubsubTopic, err = getEnvironmentVariable("PUBSUB_TOPIC", true) if err != nil { log.Fatal().Msgf("Failed to get PUBSUB_TOPIC: %v", err) } log.Info().Msgf("Using Pub/Sub topic: %s", pubsubTopic) handlerUrl, err := getEnvironmentVariable("CUSTOM_HANDLER", false) if err != nil { log.Fatal().Err(err).Msgf("Failed to get CUSTOM_HANDLER: %v", err) } if handlerUrl == "" { handlerUrl = "/" } else { log.Info().Msgf("Using custom handler location: %s", handlerUrl) } http.HandleFunc("/", RequestHandler) celEnv, err := GetCelEnv() if err != nil { log.Fatal().Err(err).Msgf("Failed to setup CEL environment: %v", err) } controlCel, err := getEnvironmentVariable("CONTROL_CEL", true) if err != nil { log.Fatal().Err(err).Msgf("Failed to get CONTROL_CEL: %v", err) } err = setControlCel(celEnv, controlCel) if err != nil { log.Fatal().Err(err).Msgf("Failed to compile request control CEL program (%s): %v", controlCel, err) } messageCel, err := getEnvironmentVariable("MESSAGE_CEL", true) if err != nil { log.Fatal().Err(err).Msgf("Failed to get MESSAGE_CEL: %v", err) } err = setMessageCel(celEnv, messageCel) if err != nil { log.Fatal().Err(err).Msgf("Failed to compile message extract CEL program (%s): %v", messageCel, err) } responseCel, err := getEnvironmentVariable("RESPONSE_CEL", false) if err != nil { log.Fatal().Err(err).Msgf("Failed to get RESPONSE_CEL: %v", err) } err = setResponseBodyCel(celEnv, responseCel) if err != nil { log.Fatal().Err(err).Msgf("Failed to compile message extract CEL program (%s): %v", messageCel, err) } if responseCel == "" { log.Info().Msg("No response body CEL specified, using empty responses.") } return port } func init() { // Slightly ugly hack for testing if !strings.HasSuffix(os.Args[0], ".test") { Startup() } else { zerolog.SetGlobalLevel(zerolog.Disabled) } functions.HTTP("Json2Pubsub", RequestHandler) } func RequestHandler(w http.ResponseWriter, r *http.Request) { originIp, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { log.Warn().Err(err).Str("RemoteAddr", r.RemoteAddr).Msg("Unable to parse remote address") w.WriteHeader(http.StatusBadRequest) return } // Only accept POST requests if r.Method != "POST" { log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Not a POST request.") w.WriteHeader(http.StatusBadRequest) return } postBody, err := ioutil.ReadAll(r.Body) if err != nil { log.Warn().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to read request body: %v", err) w.WriteHeader(http.StatusInternalServerError) return } if len(postBody) == 0 { log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Empty request body.") w.WriteHeader(http.StatusBadRequest) return } requestHeaders := make(map[string]string, 0) for key := range r.Header { requestHeaders[strings.ToLower(key)] = r.Header.Get(key) } var postValues url.Values contentType := r.Header.Get("Content-Type") if contentType == "application/x-www-form-urlencoded" { postValues, err = url.ParseQuery(string(postBody)) if err != nil { log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Failed to parse post body for content type application/x-www-form-urlencoded") w.WriteHeader(http.StatusBadRequest) return } } var jsonBody interface{} if contentType == "application/json" || contentType == "text/json" { if json.Valid([]byte(postBody)) { json.Unmarshal([]byte(postBody), &jsonBody) } else { log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Invalid JSON body.") w.WriteHeader(http.StatusBadRequest) return } } else { jsonBody = postValues } // Set up request structure currentTime := time.Now() currentTimeUTC := currentTime.UTC() celParams := map[string]interface{}{ "origin": map[string]interface{}{ "ip": originIp, }, "request": map[string]interface{}{ "body": string(postBody), "method": r.Method, "path": r.URL.RawPath, "scheme": r.URL.Scheme, "query": r.URL.RawQuery, "json": jsonBody, "post": postValues, "headers": requestHeaders, "unixtime": currentTime.Unix(), "time": map[string]int{ "year": currentTimeUTC.Year(), "month": int(currentTimeUTC.Month()), "day": currentTimeUTC.Day(), "hour": currentTimeUTC.Hour(), "minute": currentTimeUTC.Minute(), "second": currentTimeUTC.Second(), }, }, } out, _, err := requestControlExpr.Eval(celParams) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate request control check: %v", out) w.WriteHeader(http.StatusInternalServerError) return } result, ok := out.(celtypes.Bool) if !ok || result.Equal(celtypes.False).Value().(bool) { log.Error().Str("RemoteAddr", r.RemoteAddr).Msgf("Request control check failed: %v", out) w.WriteHeader(http.StatusForbidden) return } messageOut, _, err := extractMessage.Eval(celParams) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate message extraction: %v", err) w.WriteHeader(http.StatusInternalServerError) return } var messageJson interface{} if messageOut.Type() == celtypes.StringType { _messageOut, err := messageOut.ConvertToNative(reflect.TypeOf("")) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to string: %v", err) w.WriteHeader(http.StatusInternalServerError) return } if json.Valid([]byte(_messageOut.(string))) { json.Unmarshal([]byte(_messageOut.(string)), &messageJson) } else { log.Error().Str("RemoteAddr", r.RemoteAddr).Str("json", _messageOut.(string)).Msg("Invalid JSON string from message CEL.") w.WriteHeader(http.StatusBadRequest) return } } else if messageOut.Type() == celtypes.MapType { _messageOut, err := messageOut.ConvertToNative(reflect.TypeOf(map[string]interface{}{})) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to map: %v", err) w.WriteHeader(http.StatusInternalServerError) return } messageJson = _messageOut.(map[string]interface{}) } else if messageOut.Type() == celtypes.ListType { _messageOut, err := messageOut.ConvertToNative(reflect.TypeOf([]interface{}{})) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to list: %v", err) w.WriteHeader(http.StatusInternalServerError) return } messageJson = _messageOut.([]interface{}) } if messageJson == nil { log.Error().Str("RemoteAddr", r.RemoteAddr).Msg("Failed to turn request into a Pub/Sub message.") w.WriteHeader(http.StatusBadRequest) return } messageData, err := json.Marshal(messageJson) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Unable to marshal message data to JSON: %v", err) w.WriteHeader(http.StatusInternalServerError) return } msg := &pubsub.Message{ Data: messageData, } topic, err := GetPubsubTopic(r.Context(), cloudProjectId, userAgent, pubsubTopic) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msg("Unable to get Pub/Sub client for topic.") w.WriteHeader(http.StatusInternalServerError) return } log.Info().Str("RemoteAddr", r.RemoteAddr).Msgf("Publishing a message to topic %s, len=%d bytes", topic.String(), len(msg.Data)) if _, err := topic.Publish(r.Context(), msg).Get(r.Context()); err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Unable to publish message to Pub/Sub topic: %v", err) w.WriteHeader(http.StatusServiceUnavailable) return } if !defaultResponseBody { bodyOut, _, err := responseBody.Eval(celParams) if err != nil || bodyOut == nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate response boy: %v", err) w.WriteHeader(http.StatusInternalServerError) return } if bodyOut.Type() == celtypes.StringType { _bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf("")) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to string: %v", err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write([]byte(_bodyOut.(string))) } else if bodyOut.Type() == celtypes.MapType { _bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf(map[string]interface{}{})) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to map: %v", err) w.WriteHeader(http.StatusInternalServerError) return } _bodyAsMap := _bodyOut.(map[string]interface{}) _bodyJson, err := json.Marshal(_bodyAsMap) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body map to JSON: %v", err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write(_bodyJson) } else if bodyOut.Type() == celtypes.ListType { _bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf([]interface{}{})) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to list: %v", err) w.WriteHeader(http.StatusInternalServerError) return } _bodyAsList := _bodyOut.([]interface{}) _bodyJson, err := json.Marshal(_bodyAsList) if err != nil { log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body list to JSON: %v", err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write(_bodyJson) } } else { w.WriteHeader(http.StatusOK) } }