inventory-service/spanner/main.go (211 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Author ghaun@google.com // Simple Cloud Run Service which connects with Spanner package main import ( "context" "encoding/json" "fmt" "log" "net/http" "os" "regexp" "strings" "time" "github.com/google/uuid" "cloud.google.com/go/spanner" database "cloud.google.com/go/spanner/admin/database/apiv1" "google.golang.org/api/iterator" adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" ) // TODO(GHAUN): Create global database variable and clean up code // create Global Variable for Database // Spanner connection ID should look like: // projects/<PROJECT>/instances/<SPANNER INSTANCE>/databases/<database> var databaseName string = os.Getenv("SPANNER_CONNECTION_STRING") var dataClient *spanner.Client func main() { log.Print("Starting server...") log.Print("Creating Database if it doesn't exist") err := createDatabase(databaseName) if err != nil { if !strings.Contains(err.Error(), "AlreadyExists") { log.Fatal(err) } log.Print("Database Already Created") } log.Print("Database setup complete") ctx := context.Background() dataClient, err = spanner.NewClient(ctx, databaseName) if err != nil { log.Fatal(err) } seedDatabase(databaseName) // Setup http Handles http.HandleFunc("/", handler) http.HandleFunc("/getAvailableInventory", getAvailableInventory) http.HandleFunc("/updateInventoryItem", updateInventoryItem) // Determine port for HTTP service. port := os.Getenv("PORT") if port == "" { port = "8080" log.Printf("defaulting to port %s", port) } // Start HTTP server. log.Printf("listening on port %s", port) if err := http.ListenAndServe(":"+port, nil); err != nil { log.Fatal(err) } defer dataClient.Close() } func enableCors(w *http.ResponseWriter) { (*w).Header().Set("Access-Control-Allow-Origin", "*") } func handler(w http.ResponseWriter, r *http.Request) { enableCors(&w) fmt.Fprintf(w, "GoLang Inventory Service is running!") } func getAvailableInventory(w http.ResponseWriter, r *http.Request) { enableCors(&w) response, err := readAvailableInventory(databaseName) if err != nil { log.Fatal(err) } fmt.Fprint(w, response) } func updateInventoryItem(w http.ResponseWriter, r *http.Request) { enableCors(&w) if r.URL.Path != "/updateInventoryItem" { http.NotFound(w, r) return } switch r.Method { case "GET": w.Write([]byte("Please POST the following format for data: [{'itemID': int,'inventoryChange': int}]")) return case "OPTIONS": w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization") w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") w.WriteHeader((http.StatusOK)) return case "POST": (w).Header().Set("content-type", "application/json") d := json.NewDecoder(r.Body) d.DisallowUnknownFields() il := []struct { ItemID int `json:"itemID"` InventoryChange int `json:"inventoryChange"` }{} err := d.Decode(&il) if err != nil { log.Print(err) return } // In production code you should check and sanatize data. This is a demo however log.Print(il) // This should be a global variable since it's used more than once inventoryHistoryColumns := []string{ "ItemRowID", "ItemID", "inventoryChange", "timeStamp"} m := []*spanner.Mutation{} for _, element := range il { m = append(m, spanner.Insert( "inventoryHistory", inventoryHistoryColumns, []interface{}{uuid.New().String(), element.ItemID, element.InventoryChange, time.Now()})) } _, err = dataClient.Apply(context.Background(), m) if err != nil { log.Print(err) return } log.Print("Data added to database") w.Write([]byte(http.StatusText(http.StatusOK))) default: w.WriteHeader(http.StatusNotImplemented) w.Write([]byte(http.StatusText(http.StatusNotImplemented))) } } func createDatabase(db string) error { ctx := context.Background() matches := regexp.MustCompile("^(.*)/databases/(.*)$").FindStringSubmatch(db) if matches == nil || len(matches) != 3 { return fmt.Errorf("invalid database id %s", db) } adminClient, err := database.NewDatabaseAdminClient(ctx) if err != nil { return err } defer adminClient.Close() log.Print("Creating Database and table") op, err := adminClient.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ Parent: matches[1], CreateStatement: "CREATE DATABASE `" + matches[2] + "`", ExtraStatements: []string{ `CREATE TABLE InventoryHistory ( ItemRowID STRING (36) NOT NULL, ItemID INT64 NOT NULL, InventoryChange INT64, TimeStamp TIMESTAMP, ) PRIMARY KEY (ItemRowID)`, }, }) if err != nil { return err } if _, err := op.Wait(ctx); err != nil { return err } return nil } func seedDatabase(db string) error { // Need to count rows here and only seed if rows exist rows, _ := readAvailableInventory(databaseName) if rows != "[]" { log.Print("Database has already been seeded") return nil } log.Print("Seeding Database") // Get JSON file here for seeding // Use test for now inventoryHistoryColumns := []string{ "ItemRowID", "ItemID", "InventoryChange", "TimeStamp"} m := []*spanner.Mutation{ spanner.Insert("inventoryHistory", inventoryHistoryColumns, []interface{}{uuid.New().String(), 1, "20", time.Now()}), spanner.Insert("inventoryHistory", inventoryHistoryColumns, []interface{}{uuid.New().String(), 2, "4", time.Now()}), spanner.Insert("inventoryHistory", inventoryHistoryColumns, []interface{}{uuid.New().String(), 3, "32", time.Now()}), } _, err := dataClient.Apply(context.Background(), m) if err != nil { return err } return nil } func readAvailableInventory(db string) (string, error) { ro := dataClient.ReadOnlyTransaction() defer ro.Close() stmt := spanner.Statement{ SQL: `SELECT itemID, sum(inventoryChange) as inventory FROM inventoryHistory group by ItemID`} iter := ro.Query(context.Background(), stmt) defer iter.Stop() type inventoryList struct { ItemID int64 Inventory int64 } itemList := []inventoryList{} for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return "", err } item := inventoryList{} if err := row.Columns(&item.ItemID, &item.Inventory); err != nil { return "", err } itemList = append(itemList, item) } j, err := json.Marshal(itemList) if err != nil { return "", err } else { return string(j), err } }