services/thumbnails/go/main.go (153 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "encoding/json" "fmt" "io" "io/ioutil" "log" "net/http" "os" "path/filepath" "cloud.google.com/go/pubsub" "cloud.google.com/go/storage" "github.com/kelseyhightower/envconfig" "gopkg.in/gographics/imagick.v2/imagick" ) type config struct { BucketThumbnails string `split_words:"true"` Port int `default:"8080"` } const ( originalDir = "/tmp/original" thumbnailDir = "/tmp/thumbnail" ) func main() { var env config envconfig.Process("", &env) http.HandleFunc("/", handleRequest(env)) os.MkdirAll(originalDir, 0755) os.MkdirAll(thumbnailDir, 0755) log.Printf("Start listening on port %d", env.Port) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", env.Port), nil)) } func handleRequest(env config) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() if r.URL.Path != "/" { http.Error(w, "404 not found.", http.StatusNotFound) return } if r.Method != "POST" { http.Error(w, "404 Not Found.", http.StatusNotFound) return } log.Printf("Got POST on /") defer r.Body.Close() pubSubMessage, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Error reading body", http.StatusInternalServerError) return } log.Printf("Body: %s", string(pubSubMessage)) var m struct { Message pubsub.Message `json:"message"` } err = json.Unmarshal(pubSubMessage, &m) if err != nil { log.Printf("Error reading pub/sub message: %v", err) http.Error(w, "Error reading pub/sub message", http.StatusInternalServerError) return } var obj struct { Bucket string `json:"bucket"` Name string `json:"name"` } err = json.Unmarshal(m.Message.Data, &obj) if err != nil { log.Printf("Error reading pub/sub data: %v", err) http.Error(w, "Error reading pub/sub data", http.StatusInternalServerError) return } storageClient, err := storage.NewClient(ctx) if err != nil { log.Printf("Error getting storage client: %v", err) http.Error(w, "Error getting storage client", http.StatusInternalServerError) return } originalFile := filepath.Join(originalDir, obj.Name) err = downloadObject(ctx, storageClient, obj.Bucket, obj.Name, originalFile) if err != nil { http.Error(w, "Error downloading image from bucket", http.StatusInternalServerError) return } defer os.Remove(originalFile) log.Printf("Downloaded picture into %s", originalFile) imagick.Initialize() defer imagick.Terminate() mw := imagick.NewMagickWand() defer mw.Destroy() err = mw.ReadImage(originalFile) if err != nil { http.Error(w, "Error reading image from Image Magick", http.StatusInternalServerError) return } err = mw.ThumbnailImage(400, 400) if err != nil { http.Error(w, "Error creating thumbnail with Image Magick", http.StatusInternalServerError) return } thumbFile := filepath.Join(thumbnailDir, obj.Name) err = mw.WriteImage(thumbFile) if err != nil { http.Error(w, "Error saving thumbnail", http.StatusInternalServerError) return } defer os.Remove(thumbFile) log.Printf("Created local thumbnail in %s", thumbFile) err = uploadObject(ctx, storageClient, thumbFile, env.BucketThumbnails, obj.Name) if err != nil { http.Error(w, "Error creating thumbnail on bucket", http.StatusInternalServerError) return } log.Printf("Uploaded thumbnail to Cloud Storage bucket %s", env.BucketThumbnails) w.WriteHeader(http.StatusNoContent) fmt.Fprintf(w, "%s processed", obj.Name) } } // downloadObject copies an object of a given bucket to a local file func downloadObject(ctx context.Context, client *storage.Client, bucket string, name string, filename string) error { src, err := client.Bucket(bucket).Object(name).NewReader(ctx) if err != nil { log.Printf("Error creating reader on object: %v", err) return err } defer src.Close() dest, err := os.Create(filename) if err != nil { log.Printf("Error creating destination file: %v", err) return err } defer dest.Close() _, err = io.Copy(dest, src) if err != nil { log.Printf("Error copying object to file: %v", err) return err } return nil } // uploadObject copies a local file into an object of a given bucket func uploadObject(ctx context.Context, client *storage.Client, filename string, bucket string, name string) error { src, err := os.Open(filename) if err != nil { log.Printf("Error opening source file: %v", err) return err } defer src.Close() dest := client.Bucket(bucket).Object(name).NewWriter(ctx) defer dest.Close() _, err = io.Copy(dest, src) if err != nil { log.Printf("Error copying file to object: %v", err) return err } return nil }