appengine_flexible/go115_and_earlier/pubsub/pubsub.go (124 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Sample pubsub demonstrates use of the cloud.google.com/go/pubsub package from App Engine flexible environment. package main import ( "context" "encoding/json" "fmt" "html/template" "log" "net/http" "os" "sync" "cloud.google.com/go/pubsub" ) var ( topic *pubsub.Topic // Messages received by this instance. messagesMu sync.Mutex messages []string // token is used to verify push requests. token = mustGetenv("PUBSUB_VERIFICATION_TOKEN") ) const maxMessages = 10 func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, mustGetenv("GOOGLE_CLOUD_PROJECT")) if err != nil { log.Fatal(err) } defer client.Close() topicName := mustGetenv("PUBSUB_TOPIC") topic = client.Topic(topicName) // Create the topic if it doesn't exist. exists, err := topic.Exists(ctx) if err != nil { log.Fatal(err) } if !exists { log.Printf("Topic %v doesn't exist - creating it", topicName) _, err = client.CreateTopic(ctx, topicName) if err != nil { log.Fatal(err) } } http.HandleFunc("/", listHandler) http.HandleFunc("/pubsub/publish", publishHandler) http.HandleFunc("/pubsub/push", pushHandler) port := os.Getenv("PORT") if port == "" { port = "8080" log.Printf("Defaulting to port %s", port) } log.Printf("Listening on port %s", port) if err := http.ListenAndServe(":"+port, nil); err != nil { log.Fatal(err) } } func mustGetenv(k string) string { v := os.Getenv(k) if v == "" { log.Fatalf("%s environment variable not set.", k) } return v } type pushRequest struct { Message struct { Attributes map[string]string Data []byte ID string `json:"message_id"` } Subscription string } func pushHandler(w http.ResponseWriter, r *http.Request) { // Verify the token. if r.URL.Query().Get("token") != token { http.Error(w, "Bad token", http.StatusBadRequest) return } msg := &pushRequest{} if err := json.NewDecoder(r.Body).Decode(msg); err != nil { http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest) return } messagesMu.Lock() defer messagesMu.Unlock() // Limit to ten. messages = append(messages, string(msg.Message.Data)) if len(messages) > maxMessages { messages = messages[len(messages)-maxMessages:] } } func listHandler(w http.ResponseWriter, r *http.Request) { messagesMu.Lock() defer messagesMu.Unlock() if err := tmpl.Execute(w, messages); err != nil { log.Printf("Could not execute template: %v", err) } } func publishHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() msg := &pubsub.Message{ Data: []byte(r.FormValue("payload")), } if _, err := topic.Publish(ctx, msg).Get(ctx); err != nil { http.Error(w, fmt.Sprintf("Could not publish message: %v", err), 500) return } fmt.Fprint(w, "Message published.") } var tmpl = template.Must(template.New("").Parse(`<!DOCTYPE html> <html> <head> <title>Pub/Sub</title> </head> <body> <div> <p>Last ten messages received by this instance:</p> <ul> {{ range . }} <li>{{ . }}</li> {{ end }} </ul> </div> <form method="post" action="/pubsub/publish"> <textarea name="payload" placeholder="Enter message here"></textarea> <input type="submit"> </form> <p>Note: if the application is running across multiple instances, each instance will have its own list of messages.</p> </body> </html>`))