step1/src/server/main.go (120 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "fmt" "io/ioutil" "log" "net" "os" "regexp" "strings" "opentelemetry-trace-codelab-go/server/shakesapp" "cloud.google.com/go/storage" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) const ( listenPort = "5050" bucketName = "dataflow-samples" bucketPrefix = "shakespeare/" ) type serverService struct { shakesapp.UnimplementedShakespeareServiceServer healthpb.UnimplementedHealthServer } func NewServerService() *serverService { return &serverService{} } // TODO: instrument the application with Cloud Profiler agent func main() { port := listenPort if os.Getenv("PORT") != "" { port = os.Getenv("PORT") } lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) if err != nil { log.Fatalf("error %v; error listening port %v", err, port) } svc := NewServerService() srv := grpc.NewServer() shakesapp.RegisterShakespeareServiceServer(srv, svc) healthpb.RegisterHealthServer(srv, svc) if err := srv.Serve(lis); err != nil { log.Fatalf("error serving server: %v", err) } } // GetMatchCount implements a server for ShakespeareService. // // TODO: instrument the application to take the latency of the request to Cloud Storage func (s *serverService) GetMatchCount(ctx context.Context, req *shakesapp.ShakespeareRequest) (*shakesapp.ShakespeareResponse, error) { resp := &shakesapp.ShakespeareResponse{} texts, err := readFiles(ctx, bucketName, bucketPrefix) if err != nil { return resp, fmt.Errorf("fails to read files: %s", err) } for _, text := range texts { for _, line := range strings.Split(text, "\n") { line, query := strings.ToLower(line), strings.ToLower(req.Query) // TODO: Compiling and matching a regular expression on every request // might be too expensive? Consider optimizing. isMatch, err := regexp.MatchString(query, line) if err != nil { return resp, err } if isMatch { resp.MatchCount++ } } } return resp, nil } // readFiles reads the content of files within the specified bucket with the // specified prefix path in parallel and returns their content. It fails if // operations to find or read any of the files fails. func readFiles(ctx context.Context, bucketName, prefix string) ([]string, error) { type resp struct { s string err error } client, err := storage.NewClient(ctx, option.WithoutAuthentication()) if err != nil { return []string{}, fmt.Errorf("failed to create storage client: %s", err) } defer client.Close() bucket := client.Bucket(bucketName) var paths []string it := bucket.Objects(ctx, &storage.Query{Prefix: bucketPrefix}) for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { return []string{}, fmt.Errorf("failed to iterate over files in %s starting with %s: %v", bucketName, prefix, err) } if attrs.Name != "" { paths = append(paths, attrs.Name) } } resps := make(chan resp) for _, path := range paths { go func(path string) { obj := bucket.Object(path) r, err := obj.NewReader(ctx) if err != nil { resps <- resp{"", err} } defer r.Close() data, err := ioutil.ReadAll(r) resps <- resp{string(data), err} }(path) } ret := make([]string, len(paths)) for i := 0; i < len(paths); i++ { r := <-resps if r.err != nil { err = r.err } ret[i] = r.s } return ret, err } // Check is for health checking. func (s *serverService) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil } // Watch is for health checking. func (s *serverService) Watch(req *healthpb.HealthCheckRequest, server healthpb.Health_WatchServer) error { return nil }