e2e-examples/gcs/main.go (308 lines of code) (raw):

package main import ( "context" "flag" "fmt" "hash/crc32" "io" "io/ioutil" "os" "sort" "strings" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "cloud.google.com/go/storage" gcspb "github.com/GoogleCloudPlatform/grpc-gcp-go/e2e-examples/gcs/google.golang.org/genproto/googleapis/storage/v1" wrappers "github.com/golang/protobuf/ptypes/wrappers" _ "google.golang.org/grpc/balancer/grpclb" grpcgoogle "google.golang.org/grpc/credentials/google" "google.golang.org/grpc/credentials/oauth" ) const ( target = "storage.googleapis.com:443" scope = "https://www.googleapis.com/auth/cloud-platform" ) var ( dp = flag.Bool("dp", false, "whether use directpath") corp = flag.Bool("corp", false, "whether calling from corp machine") useHttp = flag.Bool("http", false, "whether to use http client") objectName = flag.String("obj", "a", "gcs object name") bucketName = flag.String("bkt", "gcs-grpc-team-weiranf", "gcs bucket name") numCalls = flag.Int("calls", 1, "num of calls") uploadSize = flag.Int("upload", 0, "upload size in kb") cookie = flag.String("cookie", "", "cookie header") method = flag.String("method", "media", "method names") size = flag.Int("size", 0, "write size in kb") ) func upload(client *storage.Client, kb int) { ctx := context.Background() obj := client.Bucket(*bucketName).Object(*objectName) w := obj.NewWriter(ctx) msg := strings.Repeat("x", kb*1024) if _, err := fmt.Fprint(w, msg); err != nil { fmt.Println("Failed to write message to object: %v", err) } if err := w.Close(); err != nil { fmt.Println("object writer failed closing: %v", err) os.Exit(1) } } func getGrpcClient() gcspb.StorageClient { var grpcOpts []grpc.DialOption endpoint := target if *dp { endpoint = "dns:///" + target grpcOpts = []grpc.DialOption{ grpc.WithCredentialsBundle( grpcgoogle.NewComputeEngineCredentials(), ), grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`), } } else if *corp { // client is calling from corp machine keyFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") perRPC, err := oauth.NewServiceAccountFromFile(keyFile, scope) if err != nil { fmt.Println("Failed to create credentials: %v", err) os.Exit(1) } grpcOpts = []grpc.DialOption{ grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), grpc.WithPerRPCCredentials(perRPC), } } else { // client is calling from GCE grpcOpts = []grpc.DialOption{ grpc.WithCredentialsBundle( grpcgoogle.NewComputeEngineCredentials(), ), } } cc, err := grpc.Dial(endpoint, grpcOpts...) if err != nil { fmt.Println("Failed to create clientconn: %v", err) os.Exit(1) } return gcspb.NewStorageClient(cc) } func getHttpClient() *storage.Client { ctx := context.Background() httpClient, err := storage.NewClient(ctx) if err != nil { fmt.Println("Failed to create http client: %v", err) os.Exit(1) } return httpClient } func makeGrpcRequest(client gcspb.StorageClient) []int { fmt.Println("========================== start grpc calls ===============================") res := []int{} switch *method { case "media": req := gcspb.GetObjectMediaRequest{ Bucket: *bucketName, Object: *objectName, } for i := 0; i < *numCalls; i++ { ctx := context.Background() if i == *numCalls-1 { md := metadata.Pairs("cookie", *cookie) ctx = metadata.NewOutgoingContext(ctx, md) } start := time.Now() stream, err := client.GetObjectMedia(ctx, &req) if err != nil { fmt.Println("GetObjectMedia got error: ", err) os.Exit(1) } for { _, err := stream.Recv() if err == io.EOF { break } if err != nil { fmt.Println("stream.Recv() got error: ", err) os.Exit(1) } //fmt.Printf("rsp: %+v\n", rsp) } total := time.Since(start).Milliseconds() res = append(res, int(total)) fmt.Println("total time in ms for GetObjectMedia: ", total) } case "metadata": req := gcspb.GetObjectRequest{ Bucket: *bucketName, Object: *objectName, } for i := 0; i < *numCalls; i++ { ctx := context.Background() start := time.Now() _, err := client.GetObject(ctx, &req) if err != nil { fmt.Println("GetObject got error: ", err) os.Exit(1) } //fmt.Printf("-----> GetObject result: %+v\n", obj) total := time.Since(start).Milliseconds() res = append(res, int(total)) fmt.Println("total time in ms for GetObjectMedia: ", total) } case "write": totalBytes := *size * 1024 for i := 0; i < *numCalls; i++ { ctx := context.Background() start := time.Now() stream, err := client.InsertObject(ctx) if err != nil { fmt.Println("InsertObject got error: ", err) os.Exit(1) } offset := 0 isFirst := true isLast := false for offset < totalBytes { //fmt.Printf("-----> offset: %v\n", offset) var add int if offset+int(gcspb.ServiceConstants_MAX_WRITE_CHUNK_BYTES) <= totalBytes { add = int(gcspb.ServiceConstants_MAX_WRITE_CHUNK_BYTES) } else { add = totalBytes - offset } if offset+add == totalBytes { isLast = true } buf := make([]byte, add, add) //fmt.Printf("-----> buf: %v\n", buf) req := getInsertRequest(isFirst, isLast, int64(offset), buf) //fmt.Printf("-----> req: %+v\n", req) if err := stream.Send(req); err != nil { fmt.Println("stream.Send got error: ", err) } isFirst = false offset += add } stream.CloseAndRecv() total := time.Since(start).Milliseconds() //fmt.Printf("-----> GetObject result: %+v\n", obj) res = append(res, int(total)) fmt.Println("total time in ms for streaming write: ", total) } } return res } func getInsertRequest(isFirst bool, isLast bool, offset int64, buf []byte) *gcspb.InsertObjectRequest { req := &gcspb.InsertObjectRequest{} if isFirst { req.FirstMessage = &gcspb.InsertObjectRequest_InsertObjectSpec{ InsertObjectSpec: &gcspb.InsertObjectSpec{ Resource: &gcspb.Object{ Bucket: *bucketName, Name: *objectName, }, }, } } crc32c := crc32.MakeTable(crc32.Castagnoli) req.Data = &gcspb.InsertObjectRequest_ChecksummedData{ ChecksummedData: &gcspb.ChecksummedData{ Content: buf, Crc32C: &wrappers.UInt32Value{ Value: crc32.Checksum(buf, crc32c), }, }, } req.WriteOffset = offset if isLast { req.FinishWrite = true } return req } func makeJsonRequest(client *storage.Client) []int { fmt.Println("========================== start http calls ===============================") res := []int{} switch *method { case "media": for i := 0; i < *numCalls; i++ { start := time.Now() obj := client.Bucket(*bucketName).Object(*objectName) rc, err := obj.NewReader(context.Background()) if err != nil { fmt.Println("Failed to create object reader: %v", err) os.Exit(1) } defer rc.Close() _, err = ioutil.ReadAll(rc) if err != nil { fmt.Println("Failed to read data from object: %v", err) os.Exit(1) } total := time.Since(start).Milliseconds() res = append(res, int(total)) //fmt.Printf("http object data: %s\n", data) fmt.Println("total time in ms for read: ", total) } case "metadata": for i := 0; i < *numCalls; i++ { ctx := context.Background() start := time.Now() obj := client.Bucket(*bucketName).Object(*objectName) _, err := obj.Attrs(ctx) if err != nil { fmt.Println("obj.Attrs() got error: ", err) os.Exit(1) } //fmt.Printf("-----> obj.Attrs() result: %+v\n", attr) total := time.Since(start).Milliseconds() res = append(res, int(total)) fmt.Println("total time in ms for read attributes: ", total) } case "write": for i := 0; i < *numCalls; i++ { ctx := context.Background() start := time.Now() msg := strings.Repeat("x", *size*1024) obj := client.Bucket(*bucketName).Object(*objectName) w := obj.NewWriter(ctx) if _, err := fmt.Fprint(w, msg); err != nil { fmt.Println("Failed to write message to object: %v", err) } if err := w.Close(); err != nil { fmt.Println("object writer failed closing: %v", err) os.Exit(1) } total := time.Since(start).Milliseconds() res = append(res, int(total)) fmt.Println("total time in ms for http write: ", total) } } return res } func printResult(res []int) { sort.Ints(res) n := len(res) sum := 0 for _, r := range res { sum += r } fmt.Printf( "\n\t\tAvg\tMin\tp50\tp90\tp99\tMax\n"+ "Time(ms)\t%v\t%v\t%v\t%v\t%v\t%v\n", sum/n, res[0], res[int(float64(n)*0.5)], res[int(float64(n)*0.9)], res[int(float64(n)*0.99)], res[n-1], ) } func main() { flag.Parse() if *uploadSize > 0 { httpClient := getHttpClient() upload(httpClient, *uploadSize) return } var res []int if *useHttp { httpClient := getHttpClient() res = makeJsonRequest(httpClient) } else { grpcClient := getGrpcClient() res = makeGrpcRequest(grpcClient) } printResult(res) }