cloudprober/spannerprobers.go (279 lines of code) (raw):

/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package main import ( "context" "errors" "io" "log" "os" "time" "google.golang.org/api/iterator" spanner "cloud.google.com/go/spanner/apiv1" spannerpb "google.golang.org/genproto/googleapis/spanner/v1" ) const ( database = "projects/grpc-prober-testing/instances/test-instance/databases/test-db" table = "users" testUsername = "test_username" ) func createClient() *spanner.Client { ctx := context.Background() client, _ := spanner.NewClient(ctx) if client == nil { log.Fatal("Fail to create the client.") os.Exit(1) } return client } func sessionManagementProber(client *spanner.Client, metrics map[string]int64) error { ctx := context.Background() reqCreate := &spannerpb.CreateSessionRequest{ Database: database, } start := time.Now() session, err := client.CreateSession(ctx, reqCreate) if err != nil { return err } if session == nil { return errors.New("failded to create a new session") } metrics["create_session_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // DeleteSession defer func() { start = time.Now() reqDelete := &spannerpb.DeleteSessionRequest{ Name: session.Name, } client.DeleteSession(ctx, reqDelete) metrics["delete_session_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) }() // GetSession reqGet := &spannerpb.GetSessionRequest{ Name: session.Name, } start = time.Now() respGet, err := client.GetSession(ctx, reqGet) if err != nil { return err } if reqGet == nil || respGet.Name != session.Name { return errors.New("fail to get the session") } metrics["get_session_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // ListSessions reqList := &spannerpb.ListSessionsRequest{ Database: database, } start = time.Now() it := client.ListSessions(ctx, reqList) inList := false for { resp, err := it.Next() if err == iterator.Done { break } if err != nil { return err } if resp.Name == session.Name { inList = true break } } metrics["list_sessions_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) if !inList { return errors.New("list sessions failed") } return nil } func executeSqlProber(client *spanner.Client, metrics map[string]int64) error { ctx := context.Background() session := createSession(client) defer deleteSession(client, session) reqSql := &spannerpb.ExecuteSqlRequest{ Sql: "select * FROM " + table, Session: session.Name, } // ExecuteSql start := time.Now() respSql, err1 := client.ExecuteSql(ctx, reqSql) if err1 != nil { return err1 } if respSql == nil || len(respSql.Rows) != 1 || respSql.Rows[0].Values[0].GetStringValue() != testUsername { return errors.New("execute sql failed") } metrics["execute_sql_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // ExecuteStreamingSql start = time.Now() stream, err2 := client.ExecuteStreamingSql(ctx, reqSql) if err2 != nil { return err2 } for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } if resp == nil || resp.Values[0].GetStringValue() != testUsername { return errors.New("execute streaming sql failed") } } metrics["execute_streaming_sql_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) return nil } func readProber(client *spanner.Client, metrics map[string]int64) error { ctx := context.Background() session := createSession(client) defer deleteSession(client, session) reqRead := &spannerpb.ReadRequest{ Session: session.Name, Table: table, KeySet: &spannerpb.KeySet{ All: true, }, Columns: []string{"username", "firstname", "lastname"}, } // Read start := time.Now() respRead, err1 := client.Read(ctx, reqRead) if err1 != nil { return err1 } if respRead == nil || len(respRead.Rows) != 1 || respRead.Rows[0].Values[0].GetStringValue() != testUsername { return errors.New("execute read failed") } metrics["read_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // StreamingRead start = time.Now() stream, err2 := client.StreamingRead(ctx, reqRead) if err2 != nil { return err2 } for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } if resp == nil || resp.Values[0].GetStringValue() != testUsername { return errors.New("streaming read failed") } } metrics["streaming_read_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) return nil } func transactionProber(client *spanner.Client, metrics map[string]int64) error { ctx := context.Background() session := createSession(client) reqBegin := &spannerpb.BeginTransactionRequest{ Session: session.Name, Options: &spannerpb.TransactionOptions{ Mode: &spannerpb.TransactionOptions_ReadWrite_{ ReadWrite: &spannerpb.TransactionOptions_ReadWrite{}, }, }, } // BeginTransaction start := time.Now() txn, err1 := client.BeginTransaction(ctx, reqBegin) if err1 != nil { return err1 } metrics["begin_transaction_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // Commit reqCommit := &spannerpb.CommitRequest{ Session: session.Name, Transaction: &spannerpb.CommitRequest_TransactionId{ TransactionId: txn.Id, }, } start = time.Now() _, err2 := client.Commit(ctx, reqCommit) if err2 != nil { return err2 } metrics["commit_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // Rollback txn, err1 = client.BeginTransaction(ctx, reqBegin) if err1 != nil { return err1 } reqRollback := &spannerpb.RollbackRequest{ Session: session.Name, TransactionId: txn.Id, } start = time.Now() err2 = client.Rollback(ctx, reqRollback) if err2 != nil { return err2 } metrics["rollback_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) reqDelete := &spannerpb.DeleteSessionRequest{ Name: session.Name, } client.DeleteSession(ctx, reqDelete) return nil } func partitionProber(client *spanner.Client, metrics map[string]int64) error { ctx := context.Background() session := createSession(client) defer deleteSession(client, session) selector := &spannerpb.TransactionSelector{ Selector: &spannerpb.TransactionSelector_Begin{ Begin: &spannerpb.TransactionOptions{ Mode: &spannerpb.TransactionOptions_ReadOnly_{ ReadOnly: &spannerpb.TransactionOptions_ReadOnly{}, }, }, }, } // PartitionQuery reqQuery := &spannerpb.PartitionQueryRequest{ Session: session.Name, Sql: "select * FROM " + table, Transaction: selector, } start := time.Now() _, err := client.PartitionQuery(ctx, reqQuery) if err != nil { return err } metrics["partition_query_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) // PartitionRead reqRead := &spannerpb.PartitionReadRequest{ Session: session.Name, Table: table, KeySet: &spannerpb.KeySet{ All: true, }, Columns: []string{"username", "firstname", "lastname"}, Transaction: selector, } start = time.Now() _, err = client.PartitionRead(ctx, reqRead) if err != nil { return err } metrics["partition_read_latency_ms"] = int64(time.Now().Sub(start) / time.Millisecond) return nil } func createSession(client *spanner.Client) *spannerpb.Session { ctx := context.Background() reqCreate := &spannerpb.CreateSessionRequest{ Database: database, } session, err := client.CreateSession(ctx, reqCreate) if err != nil { log.Fatal(err.Error()) return nil } if session == nil { log.Fatal("Failded to create a new session.") return nil } return session } func deleteSession(client *spanner.Client, session *spannerpb.Session) { if client == nil { return } ctx := context.Background() reqDelete := &spannerpb.DeleteSessionRequest{ Name: session.Name, } client.DeleteSession(ctx, reqDelete) }