spanner/spanner_leaderboard/leaderboard.go (333 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Command spanner_leaderboard contains runnable snippet code for Cloud Spanner. package main import ( "bytes" "context" "flag" "fmt" "io" "log" "math/rand" "os" "regexp" "strconv" "time" "cloud.google.com/go/spanner" database "cloud.google.com/go/spanner/admin/database/apiv1" adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb" "google.golang.org/api/iterator" ) type command func(ctx context.Context, w io.Writer, client *spanner.Client) error type uniqueRand struct { used map[int64]bool rand *rand.Rand } func newUniqueRand() uniqueRand { return uniqueRand{ used: map[int64]bool{}, rand: rand.New(rand.NewSource(time.Now().UnixNano())), } } func (r *uniqueRand) rnd(min, max int64) int64 { for { rnd := r.rand.Int63n(max) + min if !r.used[rnd] { r.used[rnd] = true return rnd } } } var ( commands = map[string]command{ "insertplayers": insertPlayers, "insertscores": insertScores, "query": query, } ) func createDatabase(ctx context.Context, w io.Writer, adminClient *database.DatabaseAdminClient, db string) error { matches := regexp.MustCompile("^(.*)/databases/(.*)$").FindStringSubmatch(db) if matches == nil || len(matches) != 3 { return fmt.Errorf("Invalid database id %s", db) } op, err := adminClient.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{ Parent: matches[1], CreateStatement: "CREATE DATABASE `" + matches[2] + "`", ExtraStatements: []string{ `CREATE TABLE Players( PlayerId INT64 NOT NULL, PlayerName STRING(2048) NOT NULL ) PRIMARY KEY(PlayerId)`, `CREATE TABLE Scores( PlayerId INT64 NOT NULL, Score INT64 NOT NULL, Timestamp TIMESTAMP NOT NULL OPTIONS(allow_commit_timestamp=true) ) PRIMARY KEY(PlayerId, Timestamp), INTERLEAVE IN PARENT Players ON DELETE NO ACTION`, }, }) if err != nil { return err } if _, err := op.Wait(ctx); err != nil { return err } fmt.Fprintf(w, "Created database [%s]\n", db) return nil } func insertPlayers(ctx context.Context, w io.Writer, client *spanner.Client) error { // Get number of players to use as an incrementing value for each PlayerName to be inserted stmt := spanner.Statement{ SQL: `SELECT Count(PlayerId) as PlayerCount FROM Players`, } iter := client.Single().Query(ctx, stmt) defer iter.Stop() row, err := iter.Next() if err != nil { return err } var numberOfPlayers int64 = 0 if err := row.Columns(&numberOfPlayers); err != nil { return err } // Intialize values for random PlayerId min := int64(1000000000) max := int64(9000000000) rnd := newUniqueRand() // Insert 100 player records into the Players table _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { stmts := []spanner.Statement{} for i := 1; i <= 100; i++ { numberOfPlayers++ playerID := rnd.rnd(min, max) playerName := fmt.Sprintf("Player %d", numberOfPlayers) stmts = append(stmts, spanner.Statement{ SQL: `INSERT INTO Players (PlayerId, PlayerName) VALUES (@playerID, @playerName)`, Params: map[string]interface{}{ "playerID": playerID, "playerName": playerName, }, }) } _, err := txn.BatchUpdate(ctx, stmts) if err != nil { return err } return nil }) fmt.Fprintf(w, "Inserted players \n") return nil } func insertScores(ctx context.Context, w io.Writer, client *spanner.Client) error { playerRecordsFound := false // Create slice for insert statements stmts := []spanner.Statement{} // Select all player records stmt := spanner.Statement{SQL: `SELECT PlayerId FROM Players`} iter := client.Single().Query(ctx, stmt) defer iter.Stop() // Insert 4 score records into the Scores table for each player in the Players table for { row, err := iter.Next() if err == iterator.Done { break } if err != nil { return err } playerRecordsFound = true var playerID int64 if err := row.ColumnByName("PlayerId", &playerID); err != nil { return err } // Intialize values for random score and date rand.Seed(time.Now().UnixNano()) min := 1000 max := 1000000 for i := 0; i < 4; i++ { // Generate random score between 1,000 and 1,000,000 score := rand.Intn(max-min) + min // Generate random day within the past two years now := time.Now() endDate := now.Unix() past := now.AddDate(0, -24, 0) startDate := past.Unix() randomDateInSeconds := rand.Int63n(endDate-startDate) + startDate randomDate := time.Unix(randomDateInSeconds, 0) // Add insert statement to stmts slice stmts = append(stmts, spanner.Statement{ SQL: `INSERT INTO Scores (PlayerId, Score, Timestamp) VALUES (@playerID, @score, @timestamp)`, Params: map[string]interface{}{ "playerID": playerID, "score": score, "timestamp": randomDate, }, }) } } if !playerRecordsFound { fmt.Fprintln(w, "No player records currently exist. First insert players then insert scores.") } else { _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { // Commit insert statements for all scores to be inserted as a single transaction _, err := txn.BatchUpdate(ctx, stmts) return err }) if err != nil { return err } fmt.Fprintln(w, "Inserted scores") } return nil } func query(ctx context.Context, w io.Writer, client *spanner.Client) error { stmt := spanner.Statement{ SQL: `SELECT p.PlayerId, p.PlayerName, s.Score, s.Timestamp FROM Players p JOIN Scores s ON p.PlayerId = s.PlayerId ORDER BY s.Score DESC LIMIT 10`} iter := client.Single().Query(ctx, stmt) defer iter.Stop() for { row, err := iter.Next() if err == iterator.Done { return nil } if err != nil { return err } var playerID, score int64 var playerName string var timestamp time.Time if err := row.Columns(&playerID, &playerName, &score, &timestamp); err != nil { return err } fmt.Fprintf(w, "PlayerId: %d PlayerName: %s Score: %s Timestamp: %s\n", playerID, playerName, formatWithCommas(score), timestamp.String()[0:10]) } } func queryWithTimespan(ctx context.Context, w io.Writer, client *spanner.Client, timespan int) error { stmt := spanner.Statement{ SQL: `SELECT p.PlayerId, p.PlayerName, s.Score, s.Timestamp FROM Players p JOIN Scores s ON p.PlayerId = s.PlayerId WHERE s.Timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL @Timespan HOUR) ORDER BY s.Score DESC LIMIT 10`, Params: map[string]interface{}{"Timespan": timespan}, } iter := client.Single().Query(ctx, stmt) defer iter.Stop() for { row, err := iter.Next() if err == iterator.Done { return nil } if err != nil { return err } var playerID, score int64 var playerName string var timestamp time.Time if err := row.Columns(&playerID, &playerName, &score, &timestamp); err != nil { return err } fmt.Fprintf(w, "PlayerId: %d PlayerName: %s Score: %s Timestamp: %s\n", playerID, playerName, formatWithCommas(score), timestamp.String()[0:10]) } } func formatWithCommas(n int64) string { numberAsString := strconv.FormatInt(n, 10) numberLength := len(numberAsString) if numberLength < 4 { return numberAsString } var buffer bytes.Buffer comma := []rune(",") bufferPosition := numberLength % 3 if (bufferPosition) > 0 { bufferPosition = 3 - bufferPosition } for i := 0; i < numberLength; i++ { if bufferPosition == 3 { buffer.WriteRune(comma[0]) bufferPosition = 0 } bufferPosition++ buffer.WriteByte(numberAsString[i]) } return buffer.String() } func createClients(ctx context.Context, db string) (*database.DatabaseAdminClient, *spanner.Client) { adminClient, err := database.NewDatabaseAdminClient(ctx) if err != nil { log.Fatal(err) } dataClient, err := spanner.NewClient(ctx, db) if err != nil { log.Fatal(err) } return adminClient, dataClient } func run(ctx context.Context, adminClient *database.DatabaseAdminClient, dataClient *spanner.Client, w io.Writer, cmd string, db string, timespan int) error { // createdatabase command if cmd == "createdatabase" { err := createDatabase(ctx, w, adminClient, db) if err != nil { fmt.Fprintf(w, "%s failed with %v", cmd, err) } return err } // querywithtimespan command if cmd == "querywithtimespan" { if timespan == 0 { flag.Usage() os.Exit(2) } err := queryWithTimespan(ctx, w, dataClient, timespan) if err != nil { fmt.Fprintf(w, "%s failed with %v", cmd, err) } return err } // insert and query commands cmdFn := commands[cmd] if cmdFn == nil { flag.Usage() os.Exit(2) } err := cmdFn(ctx, w, dataClient) if err != nil { fmt.Fprintf(w, "%s failed with %v", cmd, err) } return err } func main() { flag.Usage = func() { fmt.Fprintf(os.Stderr, `Usage: leaderboard <command> <database_name> [command_option] Command can be one of: createdatabase, insertplayers, insertscores, query, querywithtimespan Examples: leaderboard createdatabase projects/my-project/instances/my-instance/databases/example-db - Create a sample Cloud Spanner database along with sample tables in your project. leaderboard insertplayers projects/my-project/instances/my-instance/databases/example-db - Insert 100 sample Player records into the database. leaderboard insertscores projects/my-project/instances/my-instance/databases/example-db - Insert sample score data into Scores sample Cloud Spanner database table. leaderboard query projects/my-project/instances/my-instance/databases/example-db - Query players with top ten scores of all time. leaderboard querywithtimespan projects/my-project/instances/my-instance/databases/example-db 168 - Query players with top ten scores within a timespan specified in hours. `) } flag.Parse() flagCount := len(flag.Args()) if flagCount < 2 || flagCount > 3 { flag.Usage() os.Exit(2) } cmd, db := flag.Arg(0), flag.Arg(1) // If query timespan flag is specified, parse to int var timespan int = 0 if flagCount == 3 { parsedTimespan, err := strconv.Atoi(flag.Arg(2)) if err != nil { fmt.Println(err) os.Exit(2) } timespan = parsedTimespan } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() adminClient, dataClient := createClients(ctx, db) if err := run(ctx, adminClient, dataClient, os.Stdout, cmd, db, timespan); err != nil { os.Exit(1) } }